384 lines
16 KiB
Python
384 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
AutoDev - Autonomous CLI Development Studio
|
|
============================================
|
|
A fully autonomous development agent that reads project descriptions and
|
|
reference manuals, then plans, implements, compiles, tests, and debugs
|
|
complete software projects using a local LLM backend.
|
|
|
|
Usage:
|
|
python -m autodev [options]
|
|
|
|
Options:
|
|
-nomanual Skip reading manuals (for minor tasks)
|
|
--backend LLM backend: ollama or vllm (default: from config)
|
|
--model Model name (default: from config)
|
|
--workdir Working directory (default: current directory)
|
|
"""
|
|
|
|
import argparse
|
|
import os
|
|
import sys
|
|
|
|
from . import config
|
|
from .llm import LLM, LLMError
|
|
from .logger import Logger
|
|
from .context import ContextManager
|
|
from .sandbox import Sandbox, SandboxViolation
|
|
from .dependency import DependencyTracker
|
|
from .planner import Planner
|
|
from .executor import Executor
|
|
from .debugger import Debugger
|
|
from .resume import ResumeManager
|
|
|
|
|
|
BANNER = r"""
|
|
╔══════════════════════════════════════════════════╗
|
|
║ AutoDev - Autonomous Dev Studio ║
|
|
║ ================================ ║
|
|
║ Reads descriptions. Reads manuals. Builds. ║
|
|
╚══════════════════════════════════════════════════╝
|
|
"""
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="AutoDev - Autonomous CLI Development Studio",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
)
|
|
parser.add_argument("-nomanual", action="store_true",
|
|
help="Skip reading manuals directory")
|
|
parser.add_argument("--backend", choices=["ollama", "vllm"],
|
|
default=None, help="LLM backend")
|
|
parser.add_argument("--model", default=None, help="Model name")
|
|
parser.add_argument("--workdir", default=os.getcwd(),
|
|
help="Working directory (default: cwd)")
|
|
parser.add_argument("-web", type=int, default=None, metavar="PORT",
|
|
help="Start web dashboard on this port (e.g. -web 4500)")
|
|
return parser.parse_args()
|
|
|
|
|
|
def check_llm_connection(llm: LLM, logger: Logger) -> bool:
|
|
try:
|
|
resp = llm.query("Respond with only the word: OK",
|
|
system="You are a test. Respond with only: OK",
|
|
temperature=0.0)
|
|
if resp and len(resp.strip()) < 50:
|
|
logger.log("llm_check", f"Backend {llm.backend} ({llm.model}) connected")
|
|
return True
|
|
# Got a response but it was verbose — still connected
|
|
logger.log("llm_check", f"Backend connected (verbose response)", "warn")
|
|
return True
|
|
except LLMError as e:
|
|
logger.log("llm_check", str(e), "error")
|
|
return False
|
|
|
|
|
|
def _read_user_context_size() -> int | None:
|
|
"""Read context_size from ~/.ahvibe.conf if it exists."""
|
|
conf_path = os.path.expanduser("~/.ahvibe.conf")
|
|
if not os.path.exists(conf_path):
|
|
return None
|
|
try:
|
|
with open(conf_path, "r") as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line.startswith("context_size="):
|
|
return int(line.split("=", 1)[1].strip())
|
|
except (IOError, ValueError):
|
|
pass
|
|
return None
|
|
|
|
|
|
def run(args: argparse.Namespace):
|
|
print(BANNER)
|
|
workdir = os.path.realpath(args.workdir)
|
|
backend = args.backend or config.LLM_BACKEND
|
|
model = args.model or config.MODEL_NAME
|
|
print(f" Working directory: {workdir}")
|
|
print(f" Backend: {backend} | Model: {model}")
|
|
print()
|
|
|
|
# Initialize components
|
|
logger = Logger(workdir)
|
|
sandbox = Sandbox(workdir)
|
|
deps = DependencyTracker(workdir)
|
|
llm = LLM(backend=args.backend, model=args.model)
|
|
|
|
# Start web dashboard if requested
|
|
_web_server = None
|
|
if args.web:
|
|
from .web import start_web_server
|
|
_web_server = start_web_server(args.web, workdir)
|
|
print(f" \033[36m🌐 Web dashboard: http://localhost:{args.web}\033[0m")
|
|
|
|
logger.log("startup", f"AutoDev started in {workdir}")
|
|
|
|
# Step 1: Check LLM (this also loads the model into memory)
|
|
print(" Checking LLM connection...")
|
|
if not check_llm_connection(llm, logger):
|
|
print(f"\n \033[31m✗ Cannot reach LLM backend.\033[0m")
|
|
print(f" Backend: {llm.backend} at {llm.base_url}")
|
|
print(f" Ensure Ollama/vLLM is running and the model is loaded.")
|
|
logger.log("exit", "LLM unreachable", "error")
|
|
sys.exit(1)
|
|
|
|
# Check GPU/CPU status (model is now loaded, /api/ps has real data)
|
|
gpu = llm.detect_gpu_status()
|
|
if gpu["loaded"]:
|
|
gpu_pct = gpu["gpu_percent"]
|
|
size_gb = gpu["size_total"] / 1e9
|
|
vram_gb = gpu["size_vram"] / 1e9
|
|
if gpu_pct >= 90:
|
|
print(f" \033[32m✓\033[0m GPU: {gpu_pct}% offloaded ({vram_gb:.1f}/{size_gb:.1f} GB)")
|
|
elif gpu_pct >= 50:
|
|
print(f" \033[33m⚠\033[0m GPU: {gpu_pct}% offloaded ({vram_gb:.1f}/{size_gb:.1f} GB) — partial GPU, expect slower performance")
|
|
logger.log("gpu_warn", f"{gpu_pct}% GPU", "warn")
|
|
else:
|
|
print(f" \033[31m✗ GPU: {gpu_pct}% offloaded ({vram_gb:.1f}/{size_gb:.1f} GB)\033[0m")
|
|
print(f" {gpu['warning']}")
|
|
logger.log("gpu_warn", gpu["warning"], "error")
|
|
if gpu_pct == 0:
|
|
print(f"\n Running on CPU only. This will likely be too slow for autonomous development.")
|
|
print(f" Consider: a smaller model, freeing GPU memory, or increasing VRAM.")
|
|
resp = "" # Continue anyway — user can Ctrl+C
|
|
else:
|
|
print(f" ⊘ Model not pre-loaded (will load on first query)")
|
|
|
|
# Auto-detect context size (model is now loaded, /api/ps has runtime value)
|
|
model_ctx = llm.detect_context_size()
|
|
local_ctx = int(model_ctx * 0.6)
|
|
ctx = ContextManager(max_tokens=local_ctx)
|
|
print(f" Context: {model_ctx:,} tokens (local budget: {local_ctx:,})")
|
|
logger.log("context_size", f"Context: {model_ctx}, budget: {local_ctx}")
|
|
|
|
# Initialize remaining components
|
|
planner = Planner(llm, logger, ctx, workdir)
|
|
executor = Executor(llm, logger, ctx, sandbox, deps, workdir)
|
|
debugger = Debugger(llm, logger, ctx, sandbox, workdir)
|
|
resume = ResumeManager(logger, workdir)
|
|
|
|
# Step 2: Read description
|
|
print(" Reading project description...")
|
|
description = planner.read_description()
|
|
if not description:
|
|
print(f"\n \033[31m✗ No '{config.DESCRIPTION_FILE}' found in {workdir}\033[0m")
|
|
print(" Create a description.txt with your project requirements and run again.")
|
|
logger.log("exit", "No description.txt", "error")
|
|
sys.exit(1)
|
|
logger.log("description_read", f"{len(description)} chars")
|
|
print(f" \033[32m✓\033[0m Description loaded ({len(description)} chars)")
|
|
|
|
# Step 3: Read manuals
|
|
manuals = {}
|
|
if args.nomanual:
|
|
print(" ⊘ Skipping manuals (-nomanual flag)")
|
|
logger.log("manuals_skip", "User requested -nomanual")
|
|
else:
|
|
manuals_dir = os.path.join(workdir, config.MANUALS_DIR)
|
|
if not os.path.isdir(manuals_dir):
|
|
print(f"\n \033[31m✗ No '{config.MANUALS_DIR}/' directory found.\033[0m")
|
|
print(" Create a manuals/ folder with reference docs, or use -nomanual flag.")
|
|
logger.log("exit", "No manuals directory", "error")
|
|
sys.exit(1)
|
|
manuals = planner.read_manuals()
|
|
if not manuals:
|
|
print(" \033[33m⚠\033[0m Manuals directory exists but is empty")
|
|
logger.log("manuals_empty", "No files in manuals/", "warn")
|
|
else:
|
|
print(f" \033[32m✓\033[0m Loaded {len(manuals)} manual(s): {', '.join(manuals.keys())}")
|
|
logger.log("manuals_read", f"{len(manuals)} files")
|
|
|
|
# Step 4: Plan or resume
|
|
plan = None
|
|
start_step = 0
|
|
import hashlib
|
|
desc_hash = hashlib.md5(description.encode()).hexdigest()
|
|
|
|
if not resume.is_fresh_start():
|
|
print("\n ↻ Previous session detected. Checking resume state...")
|
|
if resume.description_changed(desc_hash):
|
|
print(" \033[33m⚠\033[0m Description changed. Re-planning with existing work context.")
|
|
logger.log("replan", "Description changed, creating incremental plan", "warn")
|
|
plan = None # Force re-plan, but we'll pass existing files
|
|
else:
|
|
plan = planner.load_existing_plan()
|
|
if plan and plan.get("steps"):
|
|
start_step = resume.get_resume_step()
|
|
total = len(plan["steps"])
|
|
if start_step >= total:
|
|
print(f" \033[32m✓\033[0m Previous run completed. Nothing to do.")
|
|
print(f" To re-run, delete .autodev_state.json or change description.txt.")
|
|
logger.log("skip", "Already complete, description unchanged")
|
|
return
|
|
print(f" \033[32m✓\033[0m Resuming from step {start_step + 1}/{total}")
|
|
ctx.add("system", f"Project: {plan.get('summary', description[:500])}", priority=9)
|
|
else:
|
|
print(" \033[33m⚠\033[0m Could not load previous plan. Starting fresh.")
|
|
plan = None
|
|
|
|
if not plan:
|
|
print("\n Planning development...")
|
|
# Gather existing project files for context (for incremental re-planning)
|
|
existing_work = ""
|
|
skip_files = {config.WORKLOG_FILE, config.PLAN_FILE, config.DEPENDENCY_FILE,
|
|
config.DESCRIPTION_FILE, ".autodev_state.json"}
|
|
for fname in sorted(os.listdir(workdir)):
|
|
fpath = os.path.join(workdir, fname)
|
|
if os.path.isfile(fpath) and fname not in skip_files and not fname.startswith("."):
|
|
try:
|
|
with open(fpath, "r") as f:
|
|
content = f.read()
|
|
if content.strip():
|
|
existing_work += f"### {fname}\n{content[:2000]}\n\n"
|
|
except (IOError, UnicodeDecodeError):
|
|
pass
|
|
plan = planner.create_plan(description, manuals,
|
|
existing_work=existing_work if existing_work else "")
|
|
if plan.get("error"):
|
|
print(f" \033[31m✗ Planning failed: {plan.get('error')}\033[0m")
|
|
if plan.get("raw"):
|
|
print(f" Raw LLM output (first 300 chars): {plan['raw'][:300]}")
|
|
logger.log("exit", "Planning failed", "error")
|
|
sys.exit(1)
|
|
|
|
steps = plan.get("steps", [])
|
|
print(f" \033[32m✓\033[0m Plan created: {plan.get('project_name', 'project')}")
|
|
print(f" Language: {plan.get('language', 'unknown')}")
|
|
print(f" Summary: {plan.get('summary', 'N/A')}")
|
|
print(f" Steps: {len(steps)}")
|
|
print(f" Dependencies: {', '.join(plan.get('dependencies', [])) or 'none'}")
|
|
print()
|
|
|
|
deps.add_many(plan.get("dependencies", []))
|
|
|
|
# Step 5: Execute
|
|
steps = plan.get("steps", [])
|
|
if not steps:
|
|
print(" \033[31m✗ Plan has no steps.\033[0m Check description.txt for clarity.")
|
|
logger.log("exit", "Empty plan", "error")
|
|
sys.exit(1)
|
|
|
|
total = len(steps)
|
|
print(f"\n{'='*54}")
|
|
print(f" EXECUTING PLAN — {total} steps")
|
|
print(f"{'='*54}\n")
|
|
|
|
# Push plan to web UI
|
|
try:
|
|
from .web import push_event
|
|
push_event("plan", {
|
|
"project": plan.get("project_name", "project"),
|
|
"steps": [{"id": s.get("id", i+1), "phase": s.get("phase", ""),
|
|
"description": s.get("description", ""), "status": "pending"}
|
|
for i, s in enumerate(steps)],
|
|
"start_step": start_step,
|
|
})
|
|
except Exception:
|
|
pass
|
|
|
|
for i in range(start_step, total):
|
|
step = steps[i]
|
|
step_num = i + 1
|
|
phase = step.get("phase", "implement")
|
|
desc = step.get("description", "")
|
|
|
|
print(f"\n ── Step {step_num}/{total} [\033[36m{phase}\033[0m] ──")
|
|
print(f" {desc}")
|
|
|
|
resume.save_state(i, plan, desc_hash=desc_hash)
|
|
result = executor.execute_step(step, plan)
|
|
|
|
if not result["success"]:
|
|
print(f" \033[31m✗ Step failed.\033[0m Entering debug loop...")
|
|
logger.log("step_failed", f"Step {step_num}", "error")
|
|
try:
|
|
from .web import push_event
|
|
push_event("step_done", {"step": i, "status": "error"})
|
|
except Exception:
|
|
pass
|
|
|
|
fixed = False
|
|
for attempt in range(config.MAX_DEBUG_CYCLES):
|
|
print(f"\n Debug attempt {attempt + 1}/{config.MAX_DEBUG_CYCLES}...")
|
|
dbg = debugger.debug_errors(result["errors"], step, plan)
|
|
|
|
if dbg["fixed"]:
|
|
# Re-run step to verify
|
|
print(f" Fix applied. Re-running step...")
|
|
result = executor.execute_step(step, plan)
|
|
if result["success"]:
|
|
print(f" \033[32m✓ Fixed on attempt {attempt + 1}\033[0m")
|
|
fixed = True
|
|
break
|
|
print(f" Step still failing after fix.")
|
|
result["errors"] = result.get("errors", [])
|
|
elif dbg["errors"]:
|
|
result["errors"] = dbg["errors"]
|
|
|
|
if ctx.detect_cycle():
|
|
logger.log("cycle_break", "Breaking debug cycle — approaches too similar", "warn")
|
|
ctx.clear_stale()
|
|
|
|
if not fixed:
|
|
print(f"\n \033[31m✗ Could not fix step {step_num} after {config.MAX_DEBUG_CYCLES} attempts.\033[0m")
|
|
for err in result["errors"][:3]:
|
|
print(f" Error: {err[:200]}")
|
|
print(f"\n The worklog has been saved. Review it and add documentation if needed.")
|
|
print(f" Restart AutoDev to resume from this step.")
|
|
resume.mark_failed(i, plan, "; ".join(result["errors"][:2]),
|
|
desc_hash=desc_hash)
|
|
logger.log("halt", f"Stopped at step {step_num}", "error")
|
|
sys.exit(1)
|
|
|
|
logger.log("phase_complete", f"step_{step_num}")
|
|
ctx_info = ctx.token_usage()
|
|
print(f" \033[32m✓\033[0m Step {step_num} complete "
|
|
f"(context: {ctx_info['utilization']})")
|
|
try:
|
|
from .web import push_event
|
|
push_event("step_done", {"step": i, "status": "ok"})
|
|
except Exception:
|
|
pass
|
|
|
|
# Done
|
|
resume.mark_complete(plan, desc_hash=desc_hash)
|
|
print(f"\n{'='*54}")
|
|
print(f" \033[32m✓ ALL {total} STEPS COMPLETE\033[0m")
|
|
print(f"{'='*54}")
|
|
print(f"\n Project: {plan.get('project_name', 'project')}")
|
|
print(f" Language: {plan.get('language', 'unknown')}")
|
|
print(f" Dependencies: see {config.DEPENDENCY_FILE}")
|
|
print(f" Worklog: see {config.WORKLOG_FILE}")
|
|
print(f" Plan: see {config.PLAN_FILE}")
|
|
logger.log("complete", "All steps executed successfully")
|
|
|
|
# Shut down web server
|
|
if _web_server:
|
|
try:
|
|
from .web import stop_web_server
|
|
stop_web_server()
|
|
_web_server.server_close()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def main():
|
|
args = parse_args()
|
|
try:
|
|
run(args)
|
|
except KeyboardInterrupt:
|
|
print("\n\n Interrupted by user. State saved — restart to resume.")
|
|
except SandboxViolation as e:
|
|
print(f"\n \033[31m✗ SANDBOX VIOLATION: {e}\033[0m")
|
|
except LLMError as e:
|
|
print(f"\n \033[31m✗ LLM ERROR: {e}\033[0m")
|
|
except Exception as e:
|
|
print(f"\n \033[31m✗ UNEXPECTED ERROR: {e}\033[0m")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|