commit c6dee66c4bd924bbd3979bbe94554ed331cc194d Author: Anders Holck Date: Thu Apr 9 08:20:32 2026 +0200 First commit, working quite ok! diff --git a/README.md b/README.md new file mode 100644 index 0000000..eb8eaaf --- /dev/null +++ b/README.md @@ -0,0 +1,232 @@ +# AutoDev — Autonomous CLI Development Studio + +AutoDev reads a project description and reference manuals, then autonomously plans, implements, compiles, tests, and debugs complete software projects using a local LLM. + +No cloud APIs. No subscriptions. Runs entirely on your machine with [Ollama](https://ollama.com) or [vLLM](https://github.com/vllm-project/vllm). + +## How It Works + +``` +description.txt + manuals/ → LLM plans the project → writes code → compiles → tests → debugs → delivers +``` + +1. You write a `description.txt` explaining what you want built +2. You put reference documentation in a `manuals/` folder +3. You run `autodev` +4. AutoDev reads everything, creates a development plan, and executes it step by step +5. If something fails to compile or run, it debugs itself — analyzing errors, generating fixes, and retrying +6. When done, you have a working project + +You don't interact with it. You watch it work. + +## Quick Start + +```bash +# 1. Make sure Ollama is running with a model loaded +ollama run qwen2.5-coder:14b + +# 2. Set up your project folder +mkdir my-project && cd my-project +mkdir manuals + +# 3. Write what you want +cat > description.txt << 'EOF' +Language: Python +Build a CLI tool that converts CSV files to JSON. +It should accept an input file and output file as arguments. +Handle errors gracefully if the input file doesn't exist. +EOF + +# 4. Add any reference docs (API docs, specs, examples) +cp csv-format-spec.pdf manuals/ + +# 5. Run AutoDev +autodev +``` + +## Installation + +```bash +# Clone the repository +git clone https://github.com/your-username/autodev.git +cd autodev + +# Symlink to your PATH +ln -s $(pwd)/autodev/autodev-cli ~/.local/bin/autodev +# or +ln -s $(pwd)/autodev/autodev-cli ~/bin/autodev + +# Alternatively, run directly +python -m autodev --workdir /path/to/project +``` + +### Requirements + +- Python 3.10+ +- [Ollama](https://ollama.com) or [vLLM](https://github.com/vllm-project/vllm) running locally or on your network +- No pip dependencies — uses only the Python standard library + +## Configuration + +Edit `autodev/config.py` to set your LLM backend: + +```python +LLM_BACKEND = "ollama" # "ollama" or "vllm" +OLLAMA_URL = "http://localhost:11434" # your Ollama instance +MODEL_NAME = "qwen2.5-coder:14b" # any model Ollama serves +``` + +You can also override at runtime: + +```bash +autodev --backend ollama --model gemma4:e4b +``` + +### Tested Models + +All models were tested against the same task: plan, implement, compile, test, and debug a C "hello world" project with a Makefile. Tested on Ollama with GPU offload. + +| Model | Size | Result | Speed | Notes | +|-------|------|--------|-------|-------| +| `gemma4:e4b` | ~12B | ✅ Pass | Fast | Clean run, no debug needed. Best balance of speed and quality. **Recommended.** | +| `gemma3:27b` | 27B | ✅ Pass | Slow | Works well but slow. Needed sandbox fixes during early testing. Good for complex projects. | +| `gemma4:e2b` | ~8B | ❌ Fail | Very fast | Plans OK, but setup created a directory that blocked the executable name. Could not self-correct — repeated the same failed approach 10 times. | +| `gemma3:4b` | 4B | ❌ Fail | Very fast | Steps 1–4 passed, but debugger hallucinated a nonexistent `hello.c` file and could not reason about what files actually exist on disk. | +| `qwen2.5-coder:7b` | 7B | ❌ Fail | Fast | Classified "create main.c" as setup instead of implement, so the file was never generated. Debugger could not write a valid Makefile after 10 attempts. | + +**Takeaway:** Models below ~12B parameters can plan and generate simple code, but they cannot self-correct when things go wrong. They repeat failed approaches, hallucinate files, and produce broken build scripts. **14B+ recommended for autonomous development.** + +## Project Structure + +Your project folder needs: + +``` +my-project/ +├── description.txt # Required — what to build +└── manuals/ # Required — reference docs (use -nomanual to skip) + ├── api-spec.md + └── protocol.txt +``` + +AutoDev creates these files as it works: + +``` +my-project/ +├── description.txt +├── manuals/ +├── plan.json # The development plan (human-readable) +├── worklog.json # Every action logged with timestamps +├── dependency.txt # External dependencies (compilers, libraries) +├── .autodev_state.json +└── ... your project files ... +``` + +## Features + +### Autonomous Development Loop +Reads the description, understands the requirements, creates a structured plan, then executes it: setup → implement → compile → test → debug → finalize. No human input needed during execution. + +### Self-Debugging +When compilation or tests fail, AutoDev enters a debug loop: +- Analyzes the error and source code +- Diagnoses root cause (not just symptoms) +- Generates a fix and applies it +- Verifies the fix works +- Rolls back automatically if the fix makes things worse +- Tracks failed approaches so it never repeats the same fix twice + +### Resumable Sessions +Every action is logged to `worklog.json`. If AutoDev is interrupted or fails: +```bash +# Just run it again — it picks up where it left off +autodev +``` +It reads the worklog, loads the existing plan, and continues from the last incomplete step. + +### Cycle & Hallucination Detection +Detects when the LLM is stuck in a loop (producing similar outputs repeatedly) and automatically clears stale context to break out. + +### Sandboxed Execution +- All file operations are confined to the working directory +- Shell commands are validated against a whitelist of safe tools (compilers, build tools, standard utilities) +- `sudo` and system-level commands are blocked +- Path traversal outside the working directory is prevented + +### Language Agnostic +Works with any programming language the LLM knows. Tested with C, Python, and Makefiles. The LLM determines the appropriate build tools, compilers, and project structure. + +### Dependency Tracking +All external dependencies (compilers, libraries, tools) are recorded in `dependency.txt` so you know exactly what the project needs. + +## CLI Options + +``` +autodev [options] + +Options: + -nomanual Skip reading manuals/ directory (for simple tasks) + -web PORT Start live web dashboard on PORT (e.g. -web 4500) + --backend {ollama,vllm} LLM backend (default: from config) + --model MODEL Model name (default: from config) + --workdir DIR Working directory (default: current directory) +``` + +### Web Dashboard + +Run `autodev -web 4500` and open `http://localhost:4500` in your browser. + +The dashboard shows three panels: +- **Plan Progress** — step-by-step checklist with ✓/✗/▸ status and completion counter +- **Project Files** — clickable file tree with live content viewer +- **LLM Activity** — real-time log of all actions and model thinking (newest first) + +Updates are pushed live via Server-Sent Events — no page refresh needed. + +### Incremental Updates + +If you change `description.txt` and restart AutoDev, it detects the change and re-plans incrementally — telling the LLM what files already exist so it builds on previous work instead of starting over. + +## Architecture + +``` +autodev/ +├── config.py # LLM backend settings, timeouts, expert system prompt +├── llm.py # Ollama + vLLM communication with streaming and retry +├── context.py # Token-aware context window with relevance scoring +├── planner.py # Reads description + manuals, creates development plan +├── executor.py # Code generation, file writing, compilation +├── debugger.py # Error analysis, fix generation, rollback +├── sandbox.py # Whitelist-based command validation, path confinement +├── logger.py # Action logging to console and persistent worklog +├── dependency.py # Dependency tracking +├── resume.py # State persistence and session resumption +├── main.py # CLI orchestrator +└── autodev-cli # Symlink-friendly entry point +``` + +## How the Description Should Be Written + +Be specific. Every sentence is treated as a requirement. + +**Good:** +``` +Language: C +Build a TCP echo server that listens on port 8080. +It should handle multiple clients using fork(). +Include proper signal handling for SIGCHLD to avoid zombies. +Include a Makefile with 'all' and 'clean' targets. +The server should log connections to stderr. +``` + +**Too vague:** +``` +Make a server program. +``` + +## Limitations + +- Quality depends entirely on the LLM model — larger models produce better results +- No interactive mode — you can't guide it mid-run (by design) +- Manual parsing is plain text only (no PDF extraction) +- Token counting is estimated, not exact +- The LLM may occasionally produce code that compiles but doesn't meet all requirements diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..709c516 --- /dev/null +++ b/__init__.py @@ -0,0 +1 @@ +"""AutoDev - Autonomous Development Studio""" diff --git a/__main__.py b/__main__.py new file mode 100644 index 0000000..40e2b01 --- /dev/null +++ b/__main__.py @@ -0,0 +1,4 @@ +from .main import main + +if __name__ == "__main__": + main() diff --git a/__pycache__/__init__.cpython-312.pyc b/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..1545ff4 Binary files /dev/null and b/__pycache__/__init__.cpython-312.pyc differ diff --git a/__pycache__/__main__.cpython-312.pyc b/__pycache__/__main__.cpython-312.pyc new file mode 100644 index 0000000..4449530 Binary files /dev/null and b/__pycache__/__main__.cpython-312.pyc differ diff --git a/__pycache__/config.cpython-312.pyc b/__pycache__/config.cpython-312.pyc new file mode 100644 index 0000000..518b741 Binary files /dev/null and b/__pycache__/config.cpython-312.pyc differ diff --git a/__pycache__/context.cpython-312.pyc b/__pycache__/context.cpython-312.pyc new file mode 100644 index 0000000..09169a5 Binary files /dev/null and b/__pycache__/context.cpython-312.pyc differ diff --git a/__pycache__/debugger.cpython-312.pyc b/__pycache__/debugger.cpython-312.pyc new file mode 100644 index 0000000..e387e94 Binary files /dev/null and b/__pycache__/debugger.cpython-312.pyc differ diff --git a/__pycache__/dependency.cpython-312.pyc b/__pycache__/dependency.cpython-312.pyc new file mode 100644 index 0000000..474a043 Binary files /dev/null and b/__pycache__/dependency.cpython-312.pyc differ diff --git a/__pycache__/executor.cpython-312.pyc b/__pycache__/executor.cpython-312.pyc new file mode 100644 index 0000000..23cf4d5 Binary files /dev/null and b/__pycache__/executor.cpython-312.pyc differ diff --git a/__pycache__/llm.cpython-312.pyc b/__pycache__/llm.cpython-312.pyc new file mode 100644 index 0000000..e1d180d Binary files /dev/null and b/__pycache__/llm.cpython-312.pyc differ diff --git a/__pycache__/logger.cpython-312.pyc b/__pycache__/logger.cpython-312.pyc new file mode 100644 index 0000000..3e59330 Binary files /dev/null and b/__pycache__/logger.cpython-312.pyc differ diff --git a/__pycache__/main.cpython-312.pyc b/__pycache__/main.cpython-312.pyc new file mode 100644 index 0000000..74acf55 Binary files /dev/null and b/__pycache__/main.cpython-312.pyc differ diff --git a/__pycache__/planner.cpython-312.pyc b/__pycache__/planner.cpython-312.pyc new file mode 100644 index 0000000..b88c4ee Binary files /dev/null and b/__pycache__/planner.cpython-312.pyc differ diff --git a/__pycache__/resume.cpython-312.pyc b/__pycache__/resume.cpython-312.pyc new file mode 100644 index 0000000..b569b73 Binary files /dev/null and b/__pycache__/resume.cpython-312.pyc differ diff --git a/__pycache__/sandbox.cpython-312.pyc b/__pycache__/sandbox.cpython-312.pyc new file mode 100644 index 0000000..d796706 Binary files /dev/null and b/__pycache__/sandbox.cpython-312.pyc differ diff --git a/__pycache__/web.cpython-312.pyc b/__pycache__/web.cpython-312.pyc new file mode 100644 index 0000000..276b71a Binary files /dev/null and b/__pycache__/web.cpython-312.pyc differ diff --git a/autodev-cli b/autodev-cli new file mode 100755 index 0000000..20a2eca --- /dev/null +++ b/autodev-cli @@ -0,0 +1,17 @@ +#!/usr/bin/env python3 +""" +AutoDev - Autonomous CLI Development Studio +Can be symlinked from anywhere. Works in the directory where you call it. +""" +import os +import sys + +# Resolve the real location of this script (follows symlinks) +# so Python can find the autodev package regardless of where we're called from +real_script = os.path.realpath(__file__) +package_dir = os.path.dirname(os.path.dirname(real_script)) +if package_dir not in sys.path: + sys.path.insert(0, package_dir) + +from autodev.main import main +main() diff --git a/config.py b/config.py new file mode 100644 index 0000000..64af698 --- /dev/null +++ b/config.py @@ -0,0 +1,47 @@ +""" +AutoDev - Configuration +LLM backend settings and application constants. +""" + +# ============================================================ +# LLM BACKEND CONFIGURATION — Edit these to match your setup +# ============================================================ +LLM_BACKEND = "ollama" # "ollama" or "vllm" +OLLAMA_URL = "http://turd.hem.holck.se:11434" +VLLM_URL = "http://localhost:8000" +MODEL_NAME = "gemma4:e4b" + +# ============================================================ +# Timeouts and limits +# ============================================================ +LLM_TIMEOUT = 300 # seconds per LLM call +COMPILE_TIMEOUT = 120 # seconds per compile/build +EXEC_TIMEOUT = 60 # seconds per test execution +MAX_DEBUG_CYCLES = 10 # max consecutive fix attempts before halting +MAX_CONTEXT_TOKENS = 12000 # approximate local context window size (chars) +CYCLE_DETECTION_WINDOW = 6 # number of recent actions to check for loops +TOKEN_CHAR_RATIO = 3.5 # average chars per token (for estimation) + +# ============================================================ +# File names +# ============================================================ +DESCRIPTION_FILE = "description.txt" +MANUALS_DIR = "manuals" +WORKLOG_FILE = "worklog.json" +DEPENDENCY_FILE = "dependency.txt" +PLAN_FILE = "plan.json" + +# ============================================================ +# Expert system prompt — injected into every LLM interaction +# ============================================================ +EXPERT_IDENTITY = """You are a senior software engineer with 20+ years of experience across all major languages and platforms. You approach every task like a principal engineer: + +- You think before you code. You consider edge cases, error handling, and maintainability. +- You write production-quality code, never prototypes or stubs. +- You understand build systems, compilers, linkers, and runtime environments deeply. +- When you see an error, you reason about root cause, not just symptoms. +- You never repeat a failed approach. If something didn't work, you try a fundamentally different strategy. +- You are aware that you are an AI and may hallucinate. When uncertain, you keep code simple and conservative rather than guessing at APIs or syntax. +- You always consider: does this compile? Does this link? Are all imports/includes correct? Are all dependencies declared? +- You write complete files, never partial snippets or placeholders like "// ... rest of code". +""" diff --git a/context.py b/context.py new file mode 100644 index 0000000..922983d --- /dev/null +++ b/context.py @@ -0,0 +1,128 @@ +""" +AutoDev - Context Window Manager +Manages local context with token-aware pruning, relevance scoring, +and semantic cycle/hallucination detection. +""" + +import hashlib +import difflib +from . import config + + +def estimate_tokens(text: str) -> int: + """Estimate token count from character length.""" + return max(1, int(len(text) / config.TOKEN_CHAR_RATIO)) + + +class ContextManager: + def __init__(self, max_tokens: int = None): + self.max_tokens = max_tokens or config.MAX_CONTEXT_TOKENS + self.entries: list[dict] = [] # {role, content, priority, hash, tokens} + self._recent_contents: list[str] = [] + self._recent_hashes: list[str] = [] + + def add(self, role: str, content: str, priority: int = 5): + h = hashlib.md5(content.encode()).hexdigest()[:16] + tokens = estimate_tokens(content) + self.entries.append({ + "role": role, + "content": content, + "priority": priority, + "hash": h, + "tokens": tokens, + }) + self._recent_hashes.append(h) + self._recent_contents.append(content[:500]) + self._prune() + + def _prune(self): + total = sum(e["tokens"] for e in self.entries) + while total > self.max_tokens and len(self.entries) > 2: + # Never prune the last entry or system-level entries (priority >= 9) + candidates = [(i, e) for i, e in enumerate(self.entries[:-1]) if e["priority"] < 9] + if not candidates: + break + # Remove lowest priority, oldest first + candidates.sort(key=lambda x: (x[1]["priority"], -x[0])) + idx = candidates[0][0] + total -= self.entries[idx]["tokens"] + self.entries.pop(idx) + + def detect_cycle(self) -> bool: + """Detect both exact repetition and semantic similarity loops.""" + window = config.CYCLE_DETECTION_WINDOW + if len(self._recent_hashes) < 3: + return False + + recent = self._recent_hashes[-window:] + # Exact hash repetition + unique = set(recent) + if len(unique) <= max(1, len(recent) // 3): + return True + + # Semantic similarity: check if recent LLM outputs are too similar + contents = self._recent_contents[-window:] + if len(contents) >= 3: + similarities = [] + for i in range(len(contents) - 1): + ratio = difflib.SequenceMatcher(None, contents[i], contents[i + 1]).ratio() + similarities.append(ratio) + # If average similarity > 0.8, we're likely in a loop + if similarities and sum(similarities) / len(similarities) > 0.8: + return True + + return False + + def clear_stale(self): + """Aggressively clear low-value entries when cycles detected.""" + keep = [e for e in self.entries if e["priority"] >= 8] + if not keep: + keep = self.entries[-2:] + self.entries = keep + self._recent_hashes = self._recent_hashes[-2:] + self._recent_contents = self._recent_contents[-2:] + + def get_relevant_context(self, query: str, max_entries: int = 5) -> list[dict]: + """Select entries most relevant to the current query using keyword overlap.""" + query_words = set(query.lower().split()) + scored = [] + for e in self.entries: + content_words = set(e["content"].lower().split()[:200]) + overlap = len(query_words & content_words) + scored.append((overlap + e["priority"], e)) + scored.sort(key=lambda x: x[0], reverse=True) + return [e for _, e in scored[:max_entries]] + + def build_messages(self, system_prompt: str = "") -> list[dict]: + msgs = [] + if system_prompt: + msgs.append({"role": "system", "content": system_prompt}) + for e in self.entries: + msgs.append({"role": e["role"], "content": e["content"]}) + return msgs + + def build_focused_messages(self, system_prompt: str, query: str, + max_context_tokens: int = None) -> list[dict]: + """Build messages with only the most relevant context entries.""" + budget = max_context_tokens or (self.max_tokens // 2) + msgs = [] + if system_prompt: + msgs.append({"role": "system", "content": system_prompt}) + budget -= estimate_tokens(system_prompt) + + relevant = self.get_relevant_context(query) + for e in relevant: + if budget - e["tokens"] < 0: + break + msgs.append({"role": e["role"], "content": e["content"]}) + budget -= e["tokens"] + return msgs + + def token_usage(self) -> dict: + total = sum(e["tokens"] for e in self.entries) + return { + "entries": len(self.entries), + "tokens_used": total, + "tokens_max": self.max_tokens, + "utilization": f"{total / self.max_tokens * 100:.0f}%", + } diff --git a/debugger.py b/debugger.py new file mode 100644 index 0000000..8a21707 --- /dev/null +++ b/debugger.py @@ -0,0 +1,272 @@ +""" +AutoDev - Debugger +Analyzes errors, generates fixes, detects cycles, manages rollback. +Approaches debugging like a senior engineer — reason about root cause, +never repeat a failed approach, escalate when stuck. +""" + +import json +import os +import shutil +import subprocess +from .llm import LLM +from .logger import Logger +from .context import ContextManager +from .sandbox import Sandbox +from . import config + +DEBUG_SYSTEM = config.EXPERT_IDENTITY + """ + +You are now in DEBUG mode. An error occurred during development. Your job is to: + +1. DIAGNOSE: What is the root cause? Not the symptom — the actual cause. +2. REASON: Why did the previous code produce this error? What assumption was wrong? +3. FIX: Provide a concrete fix that addresses the root cause. + +CRITICAL RULES: +- If you already tried a fix and it didn't work, you MUST try a DIFFERENT approach. +- Never repeat the same fix. If the same error keeps occurring, the problem is architectural. +- Consider: wrong imports, missing dependencies, wrong API usage, type mismatches, missing files. +- For compilation errors: check every include/import, every function signature, every type. +- For runtime errors: check null/nil handling, array bounds, file paths, permissions. + +Output ONLY valid JSON: +{ + "diagnosis": "root cause analysis — what exactly went wrong and why", + "previous_approach_wrong_because": "why the previous attempt failed (if applicable)", + "strategy": "what different approach we're taking this time", + "fixes": [ + { + "file": "path/to/file", + "action": "replace", + "description": "what changed and why", + "full_content": "COMPLETE new file content" + } + ], + "commands": ["verification commands to confirm the fix works"] +} + +The 'full_content' field must contain the ENTIRE file, not just the changed lines. +Start your response with { and end with }. +""" + + +class Debugger: + def __init__(self, llm: LLM, logger: Logger, ctx: ContextManager, + sandbox: Sandbox, workdir: str): + self.llm = llm + self.logger = logger + self.ctx = ctx + self.sandbox = sandbox + self.workdir = workdir + self.attempt_count = 0 + self.backup_dir = os.path.join(workdir, ".autodev_backups") + self.failed_approaches: list[str] = [] # Track what we already tried + + def debug_errors(self, errors: list[str], step: dict, plan: dict) -> dict: + """Attempt to fix errors. Returns {fixed, output, errors}.""" + self.attempt_count += 1 + result = {"fixed": False, "output": "", "errors": []} + + if self.attempt_count > config.MAX_DEBUG_CYCLES: + self.logger.log("debug_halt", + f"Max debug cycles ({config.MAX_DEBUG_CYCLES}) reached. " + "This likely requires manual intervention or better documentation.", + "error") + result["errors"].append( + "Maximum debug attempts exceeded. The issue may be architectural. " + "Please review the worklog and provide additional documentation." + ) + return result + + # Check for cycles + if self.ctx.detect_cycle(): + self.logger.log("cycle_detected", + "Detected repetitive pattern. Clearing context and trying fresh approach.", + "warn") + self.ctx.clear_stale() + self.failed_approaches.append("CYCLE: All recent approaches were too similar") + + # Backup current state + self._backup_files(step.get("files", [])) + + prompt = self._build_debug_prompt(errors, step, plan) + self.ctx.add("user", prompt, priority=8) + + self.logger.log("debug_attempt", + f"Attempt {self.attempt_count}/{config.MAX_DEBUG_CYCLES}") + + try: + response = self.llm.query(prompt, system=DEBUG_SYSTEM, temperature=0.3) + self.ctx.add("assistant", response[:1500], priority=6) + fix = self._parse_fix(response) + + if not fix.get("fixes"): + self.logger.log("debug_no_fix", "LLM provided no actionable fixes", "warn") + result["errors"].append("No fix suggested by LLM") + self.failed_approaches.append(f"No fix for: {errors[0][:100]}") + return result + + diagnosis = fix.get("diagnosis", "unknown") + strategy = fix.get("strategy", "") + self.logger.log("diagnosis", diagnosis) + if strategy: + self.logger.log("strategy", strategy) + + # Apply fixes + for f in fix["fixes"]: + self._apply_fix(f) + + # Verify + verify_ok = True + for cmd in fix.get("commands", []): + try: + self.sandbox.validate_command(cmd) + proc = subprocess.run( + cmd, shell=True, capture_output=True, text=True, + timeout=config.COMPILE_TIMEOUT, cwd=self.workdir, + ) + result["output"] += proc.stdout + if proc.returncode != 0: + result["errors"].append( + f"Verification '{cmd}' failed:\n{proc.stderr}" + ) + verify_ok = False + self.logger.log("fix_verify_fail", proc.stderr[:300], "error") + except Exception as e: + result["errors"].append(str(e)) + verify_ok = False + + if not verify_ok: + self.failed_approaches.append( + f"Attempt {self.attempt_count}: {diagnosis[:100]} — verification failed" + ) + self._rollback_files(step.get("files", [])) + self.logger.log("rollback", "Fix didn't pass verification, rolled back", "warn") + return result + + result["fixed"] = True + self.attempt_count = 0 + self.failed_approaches.clear() + self.logger.log("debug_fixed", diagnosis) + + except Exception as e: + result["errors"].append(str(e)) + self.logger.log("debug_error", str(e), "error") + self._rollback_files(step.get("files", [])) + self.failed_approaches.append(f"Exception: {str(e)[:100]}") + + return result + + def _build_debug_prompt(self, errors: list[str], step: dict, plan: dict) -> str: + parts = [ + f"## Error(s)\n" + "\n---\n".join(errors[:5]), + f"\n## Failed step\n{step.get('description', '')}", + f"\n## Project: {plan.get('language', 'unknown')} — {plan.get('summary', '')}", + ] + + # Include what we already tried so the LLM doesn't repeat + if self.failed_approaches: + parts.append( + "\n## PREVIOUS FAILED APPROACHES (DO NOT REPEAT THESE):\n" + + "\n".join(f"- {a}" for a in self.failed_approaches[-5:]) + ) + + # Include relevant source files + for fpath in step.get("files", [])[:5]: + full = os.path.join(self.workdir, fpath) + if os.path.isfile(full): + try: + with open(full, "r") as f: + content = f.read() + parts.append(f"\n## Current content of {fpath}\n{content[:4000]}") + except (IOError, UnicodeDecodeError): + pass + + # Also check for related files that might be causing the issue + # (e.g., header files referenced in error messages) + for err in errors[:3]: + for word in err.split(): + if "." in word and "/" in word: + # Looks like a file path in the error + candidate = word.strip("':\"(),") + full = os.path.join(self.workdir, candidate) + if os.path.isfile(full) and candidate not in step.get("files", []): + try: + with open(full, "r") as f: + content = f.read() + parts.append(f"\n## Related file {candidate}\n{content[:2000]}") + except (IOError, UnicodeDecodeError): + pass + + return "\n".join(parts) + + def _apply_fix(self, fix: dict): + fpath = fix.get("file", "") + action = fix.get("action", "replace") + if not fpath: + return + # Never delete AutoDev state files + protected = {"worklog.json", "plan.json", "dependency.txt", + ".autodev_state.json", "description.txt"} + basename = os.path.basename(fpath) + if action == "delete" and basename in protected: + self.logger.log("fix_skip", f"Refusing to delete protected file: {fpath}", "warn") + return + if action == "replace" and fix.get("full_content"): + self.sandbox.safe_write(fpath, fix["full_content"]) + self.logger.log("fix_applied", f"Replaced {fpath}: {fix.get('description', '')[:100]}") + elif action == "delete": + full = self.sandbox.validate_path(fpath) + if os.path.exists(full): + os.remove(full) + self.logger.log("fix_applied", f"Deleted {fpath}") + elif action == "create" and fix.get("full_content"): + self.sandbox.safe_write(fpath, fix["full_content"]) + self.logger.log("fix_applied", f"Created {fpath}") + + def _backup_files(self, files: list[str]): + os.makedirs(self.backup_dir, exist_ok=True) + for fpath in files: + full = os.path.join(self.workdir, fpath) + if os.path.isfile(full): + backup = os.path.join(self.backup_dir, fpath.replace("/", "__")) + shutil.copy2(full, backup) + + def _rollback_files(self, files: list[str]): + for fpath in files: + backup = os.path.join(self.backup_dir, fpath.replace("/", "__")) + if os.path.isfile(backup): + full = os.path.join(self.workdir, fpath) + os.makedirs(os.path.dirname(full), exist_ok=True) + shutil.copy2(backup, full) + self.logger.log("rollback", f"Restored {fpath}", "warn") + + def _parse_fix(self, response: str) -> dict: + text = response.strip() + for candidate in self._extract_json_candidates(text): + try: + result = json.loads(candidate) + if isinstance(result, dict): + return result + except json.JSONDecodeError: + continue + return {"diagnosis": "Could not parse fix response", "fixes": []} + + def _extract_json_candidates(self, text: str) -> list[str]: + candidates = [] + if "```json" in text: + candidates.append(text.split("```json", 1)[1].split("```", 1)[0].strip()) + candidates.append(text) + start = text.find("{") + if start >= 0: + depth = 0 + for i in range(start, len(text)): + if text[i] == "{": + depth += 1 + elif text[i] == "}": + depth -= 1 + if depth == 0: + candidates.append(text[start:i + 1]) + break + return candidates diff --git a/dependency.py b/dependency.py new file mode 100644 index 0000000..26b57ab --- /dev/null +++ b/dependency.py @@ -0,0 +1,46 @@ +""" +AutoDev - Dependency Tracker +Tracks and records all external dependencies. +""" + +import os +from . import config + + +class DependencyTracker: + def __init__(self, workdir: str): + self.path = os.path.join(workdir, config.DEPENDENCY_FILE) + self.deps: set[str] = set() + self._load() + + def _load(self): + if os.path.exists(self.path): + with open(self.path, "r") as f: + for line in f: + line = line.strip() + if line and not line.startswith("#"): + self.deps.add(line) + + def _save(self): + with open(self.path, "w") as f: + f.write("# AutoDev - Project Dependencies\n") + f.write("# Auto-generated, do not edit manually\n\n") + for d in sorted(self.deps): + f.write(d + "\n") + + def add(self, dep: str): + if dep not in self.deps: + self.deps.add(dep) + self._save() + + def add_many(self, deps: list[str]): + changed = False + for d in deps: + if d not in self.deps: + self.deps.add(d) + changed = True + if changed: + self._save() + + def get_all(self) -> list[str]: + return sorted(self.deps) diff --git a/executor.py b/executor.py new file mode 100644 index 0000000..c3aac9d --- /dev/null +++ b/executor.py @@ -0,0 +1,417 @@ +""" +AutoDev - Executor +Generates code, writes files, runs compilation and shell commands. +Uses relevance-based context selection and expert-level prompting. +""" + +import subprocess +import os +import json +from .llm import LLM +from .logger import Logger +from .context import ContextManager, estimate_tokens +from .sandbox import Sandbox, SandboxViolation +from .dependency import DependencyTracker +from . import config + +CODE_GEN_SYSTEM = config.EXPERT_IDENTITY + """ + +You are now in CODE GENERATION mode. Generate complete, production-quality code. + +Rules: +- Output ONLY the file content. No markdown fences. No explanations before or after. +- The code must be COMPLETE. No "// TODO", no "// ... rest of code", no placeholders. +- Include all necessary imports/includes at the top. +- Include proper error handling. +- Add concise comments explaining non-obvious logic. +- If this is a header file, include proper include guards. +- If this is a build file (Makefile, CMakeLists.txt, etc.), make it complete and correct. +""" + +MULTI_FILE_SYSTEM = config.EXPERT_IDENTITY + """ + +You are now in MULTI-FILE GENERATION mode. Generate multiple complete source files. + +Output ONLY valid JSON with this structure: +{ + "files": [ + {"path": "relative/path/to/file", "content": "complete file content"} + ], + "commands": ["optional shell commands to run after writing files"] +} + +Rules: +- Every file must be COMPLETE. No placeholders, no stubs. +- All imports/includes must reference files that exist or will be created. +- Output ONLY the JSON object. Start with { and end with }. +""" + + +class Executor: + def __init__(self, llm: LLM, logger: Logger, ctx: ContextManager, + sandbox: Sandbox, deps: DependencyTracker, workdir: str): + self.llm = llm + self.logger = logger + self.ctx = ctx + self.sandbox = sandbox + self.deps = deps + self.workdir = workdir + + def execute_step(self, step: dict, plan: dict) -> dict: + """Execute a single plan step. Returns {success, output, errors}.""" + phase = step.get("phase", "implement") + desc = step.get("description", "") + commands = step.get("commands", []) + + self.logger.log("step_start", f"[{phase}] {desc}") + result = {"success": True, "output": "", "errors": []} + + try: + if phase == "setup": + result = self._do_setup(step, plan) + elif phase in ("implement", "finalize"): + result = self._do_implement(step, plan) + elif phase == "test": + result = self._do_test(step, plan) + elif phase == "debug": + result = self._do_debug(step, plan) + else: + result = self._do_implement(step, plan) + + # Run any explicit commands from the plan + # Skip for phases that handle their own commands or generate files via LLM + if phase not in ("setup", "test", "implement"): + for cmd in commands: + cmd_result = self._run_command(cmd) + if cmd_result["returncode"] != 0: + result["errors"].append( + f"Command '{cmd}' failed (exit {cmd_result['returncode']}):\n" + f"{cmd_result['stderr']}" + ) + result["success"] = False + result["output"] += cmd_result["stdout"] + + # Verify acceptance criteria if defined + acceptance = step.get("acceptance", "") + if acceptance and result["success"]: + self.logger.log("acceptance_check", acceptance) + + except SandboxViolation as e: + result["success"] = False + result["errors"].append(f"Sandbox violation: {e}") + self.logger.log("sandbox_violation", str(e), "error") + except Exception as e: + result["success"] = False + result["errors"].append(str(e)) + self.logger.log("step_error", str(e), "error") + + status = "ok" if result["success"] else "error" + self.logger.log("step_done", f"[{phase}] success={result['success']}", status) + return result + + def _do_setup(self, step: dict, plan: dict) -> dict: + result = {"success": True, "output": "", "errors": []} + for path in plan.get("structure", []): + # If it looks like a file (has extension), ensure parent dir exists + # If it looks like a directory (no extension), create it + # But never mkdir over an existing file + full = os.path.join(self.workdir, path) + if "." in os.path.basename(path): + parent = os.path.dirname(path) + if parent: + self.sandbox.safe_mkdir(parent) + else: + if os.path.isfile(full): + self.logger.log("setup_skip", f"{path} exists as file, not creating dir", "warn") + else: + self.sandbox.safe_mkdir(path) + self.logger.log("mkdir", path) + + for dep in plan.get("dependencies", []): + self.deps.add(dep) + + # Setup commands are best-effort — non-zero exit is a warning, not failure + for cmd in step.get("commands", []): + # Auto-fix common issues: add -p to mkdir, add -f to touch + cmd = self._fixup_setup_command(cmd) + r = self._run_command(cmd) + result["output"] += r["stdout"] + if r["returncode"] != 0: + self.logger.log("setup_warn", r["stderr"][:200], "warn") + # Only fail setup if it's a real error, not "already exists" + if not self._is_benign_error(r["stderr"]): + result["errors"].append(r["stderr"]) + + return result + + @staticmethod + def _fixup_setup_command(cmd: str) -> str: + """Auto-fix common setup command issues.""" + stripped = cmd.strip() + # Any mkdir without -p → add -p + if "mkdir " in stripped and " -p" not in stripped: + return stripped.replace("mkdir ", "mkdir -p ") + return cmd + + @staticmethod + def _is_benign_error(stderr: str) -> bool: + """Check if an error is harmless (e.g., 'already exists').""" + benign = ["File exists", "already exists", "No such file or directory"] + return any(b in stderr for b in benign) + + def _do_implement(self, step: dict, plan: dict) -> dict: + files = step.get("files", []) + if not files: + return self._implement_freeform(step, plan) + if len(files) == 1: + return self._implement_single(files[0], step, plan) + return self._implement_multi(files, step, plan) + + def _implement_single(self, filepath: str, step: dict, plan: dict) -> dict: + result = {"success": True, "output": "", "errors": []} + prompt = self._build_code_prompt(step, plan, filepath) + + # Use focused context to avoid blowing token limits + self.ctx.add("user", prompt, priority=7) + code = self.llm.query(prompt, system=CODE_GEN_SYSTEM) + code = self._strip_fences(code) + + # Validate we got actual code, not an explanation + if self._looks_like_explanation(code): + self.logger.log("regen", f"LLM returned explanation instead of code for {filepath}, retrying", "warn") + retry_prompt = ( + prompt + "\n\nYou returned an explanation instead of code. " + "Output ONLY the raw file content. No markdown. No explanations. " + "Start with the first line of the actual source code." + ) + code = self.llm.query(retry_prompt, system=CODE_GEN_SYSTEM, temperature=0.1) + code = self._strip_fences(code) + + self.ctx.add("assistant", f"Generated {filepath} ({len(code)} chars)", priority=5) + self.sandbox.safe_write(filepath, code) + self.logger.log("file_written", f"{filepath} ({len(code)} chars)") + result["output"] = f"Created {filepath}" + return result + + def _implement_multi(self, files: list, step: dict, plan: dict) -> dict: + result = {"success": True, "output": "", "errors": []} + prompt = self._build_code_prompt(step, plan) + prompt += f"\n\nGenerate these files: {json.dumps(files)}" + self.ctx.add("user", prompt, priority=7) + + response = self.llm.query(prompt, system=MULTI_FILE_SYSTEM) + self.ctx.add("assistant", f"Generated {len(files)} files", priority=5) + + parsed = self._parse_multi_response(response) + if not parsed.get("files"): + # Retry + self.logger.log("regen", "Multi-file response had no files, retrying", "warn") + retry_prompt = ( + prompt + "\n\nYour response could not be parsed. " + "Output ONLY a JSON object starting with { and ending with }. " + "The 'files' array must contain objects with 'path' and 'content' keys." + ) + response = self.llm.query(retry_prompt, system=MULTI_FILE_SYSTEM, temperature=0.1) + parsed = self._parse_multi_response(response) + + for finfo in parsed.get("files", []): + path = finfo.get("path", "") + content = finfo.get("content", "") + if path and content: + self.sandbox.safe_write(path, content) + self.logger.log("file_written", f"{path} ({len(content)} chars)") + result["output"] += f"Created {path}\n" + + for cmd in parsed.get("commands", []): + r = self._run_command(cmd) + result["output"] += r["stdout"] + if r["returncode"] != 0: + result["errors"].append(r["stderr"]) + result["success"] = False + + return result + + def _implement_freeform(self, step: dict, plan: dict) -> dict: + return self._implement_multi([], step, plan) + + def _do_test(self, step: dict, plan: dict) -> dict: + result = {"success": True, "output": "", "errors": []} + commands = step.get("commands", []) + if not commands: + prompt = ( + f"Project: {plan.get('language', 'unknown')} project.\n" + f"Step: {step.get('description', '')}\n" + f"Files in project: {json.dumps(plan.get('structure', []))}\n\n" + "What exact shell commands should I run to compile and test this? " + "Output ONLY the commands, one per line. No explanations. No markdown." + ) + response = self.llm.query( + prompt, + system="You are a build engineer. Output only shell commands, one per line.", + temperature=0.1, + ) + commands = [ + l.strip() for l in response.strip().splitlines() + if l.strip() and not l.strip().startswith("#") and not l.strip().startswith("```") + ] + + for cmd in commands: + r = self._run_command(cmd) + result["output"] += r["stdout"] + if r["returncode"] != 0: + result["errors"].append(f"Command '{cmd}' failed:\n{r['stderr']}") + result["success"] = False + + return result + + def _do_debug(self, step: dict, plan: dict) -> dict: + return {"success": True, "output": "Debug step (handled by debugger)", "errors": []} + + def _build_code_prompt(self, step: dict, plan: dict, filepath: str = None) -> str: + parts = [ + f"Project: {plan.get('project_name', 'unknown')}", + f"Language: {plan.get('language', 'unknown')}", + f"Summary: {plan.get('summary', '')}", + f"Project structure: {json.dumps(plan.get('structure', []))}", + f"\nCurrent task: {step.get('description', '')}", + ] + if filepath: + parts.append(f"\nGenerate the COMPLETE content for file: {filepath}") + + # Selectively include existing files that are relevant + existing = self._get_relevant_files(step, plan) + if existing: + parts.append("\n## Existing project files (for reference — ensure compatibility):") + for p, c in existing.items(): + parts.append(f"\n### {p}\n{c}") + + return "\n".join(parts) + + def _get_relevant_files(self, step: dict, plan: dict) -> dict[str, str]: + """Include only files relevant to the current step, within token budget.""" + files = {} + budget = config.MAX_CONTEXT_TOKENS // 3 # Reserve 1/3 of context for existing files + step_files = set(step.get("files", [])) + + # Priority 1: Files explicitly mentioned in this step (headers, dependencies) + # Priority 2: Files that share a directory with step files + # Priority 3: Build files (Makefile, CMakeLists.txt, etc.) + build_files = {"Makefile", "CMakeLists.txt", "setup.py", "pyproject.toml", + "Cargo.toml", "go.mod", "package.json", "pom.xml", "build.gradle"} + + candidates = [] + for path in plan.get("structure", []): + full = os.path.join(self.workdir, path) + if not os.path.isfile(full): + continue + try: + with open(full, "r") as f: + content = f.read() + except (IOError, UnicodeDecodeError): + continue + + # Score relevance + score = 0 + basename = os.path.basename(path) + if path in step_files: + score = 0 # Don't include the file we're about to generate + continue + if basename in build_files: + score = 3 + elif any(os.path.dirname(path) == os.path.dirname(sf) for sf in step_files): + score = 2 + # Check if any step file imports/includes this file + elif any(basename.split(".")[0] in sf for sf in step_files): + score = 2 + else: + score = 1 + + candidates.append((score, path, content)) + + candidates.sort(key=lambda x: x[0], reverse=True) + used = 0 + for score, path, content in candidates: + tokens = estimate_tokens(content) + if used + tokens > budget: + # Truncate large files + if tokens > budget // 2: + content = content[:int(budget * config.TOKEN_CHAR_RATIO // 2)] + content += "\n// ... (truncated for context)\n" + tokens = estimate_tokens(content) + else: + continue + files[path] = content + used += tokens + + return files + + def _run_command(self, cmd: str) -> dict: + self.sandbox.validate_command(cmd) + self.logger.log("exec", cmd) + try: + proc = subprocess.run( + cmd, shell=True, capture_output=True, text=True, + timeout=config.COMPILE_TIMEOUT, cwd=self.workdir, + ) + if proc.stdout: + self.logger.log("stdout", proc.stdout[:500]) + if proc.returncode != 0 and proc.stderr: + self.logger.log("stderr", proc.stderr[:500], "error") + return { + "returncode": proc.returncode, + "stdout": proc.stdout, + "stderr": proc.stderr, + } + except subprocess.TimeoutExpired: + self.logger.log("timeout", f"Command timed out ({config.COMPILE_TIMEOUT}s): {cmd}", "error") + return {"returncode": -1, "stdout": "", "stderr": f"Timeout after {config.COMPILE_TIMEOUT}s"} + + def _strip_fences(self, text: str) -> str: + text = text.strip() + if text.startswith("```"): + first_nl = text.find("\n") + if first_nl >= 0: + text = text[first_nl + 1:] + if text.endswith("```"): + text = text[:-3] + return text.strip() + + def _looks_like_explanation(self, text: str) -> bool: + """Detect if LLM returned prose instead of code.""" + lines = text.strip().splitlines()[:5] + if not lines: + return True + prose_indicators = ["here is", "here's", "below is", "i'll", "let me", "this code", + "the following", "sure,", "certainly"] + first_lines = " ".join(lines[:3]).lower() + return any(ind in first_lines for ind in prose_indicators) + + def _parse_multi_response(self, response: str) -> dict: + text = self._strip_fences(response) + # Try progressively more aggressive extraction + for candidate in self._extract_json_candidates(text): + try: + result = json.loads(candidate) + if isinstance(result, dict) and "files" in result: + return result + except json.JSONDecodeError: + continue + self.logger.log("parse_fail", "Could not parse multi-file response", "error") + return {"files": [], "commands": []} + + def _extract_json_candidates(self, text: str) -> list[str]: + candidates = [text] + if "```json" in text: + candidates.insert(0, text.split("```json", 1)[1].split("```", 1)[0].strip()) + start = text.find("{") + if start >= 0: + depth = 0 + for i in range(start, len(text)): + if text[i] == "{": + depth += 1 + elif text[i] == "}": + depth -= 1 + if depth == 0: + candidates.insert(0, text[start:i + 1]) + break + return candidates diff --git a/llm.py b/llm.py new file mode 100644 index 0000000..15796d4 --- /dev/null +++ b/llm.py @@ -0,0 +1,376 @@ +""" +AutoDev - LLM Communication Layer +Supports Ollama and vLLM backends with streaming, retry logic, and robust error handling. +""" + +import json +import sys +import time +import urllib.request +import urllib.error +from . import config + + +class LLMError(Exception): + pass + + +class LLM: + def __init__(self, backend: str = None, model: str = None): + self.backend = backend or config.LLM_BACKEND + self.model = model or config.MODEL_NAME + if self.backend == "ollama": + self.base_url = config.OLLAMA_URL + elif self.backend == "vllm": + self.base_url = config.VLLM_URL + else: + raise LLMError(f"Unknown backend: {self.backend}") + self.context_size = None # Auto-detected on first use + + def detect_context_size(self) -> int: + """Detect the model's effective context window size. + + Checks (in priority order): + 1. Ollama /api/ps for running model's actual context_length + 2. num_ctx in model parameters from /api/show + 3. Model architecture's max context_length from /api/show + 4. vLLM max_model_len from /v1/models + 5. Fallback to config default + """ + if self.context_size is not None: + return self.context_size + try: + if self.backend == "ollama": + self.context_size = self._detect_ollama_context() + elif self.backend == "vllm": + self.context_size = self._detect_vllm_context() + except Exception: + pass + if not self.context_size: + self.context_size = config.MAX_CONTEXT_TOKENS + return self.context_size + + def detect_gpu_status(self) -> dict: + """Check GPU/CPU offload status for the running model. + + Returns dict with: + loaded: bool - whether model is currently loaded + gpu_percent: int - percentage of model on GPU (0-100) + size_total: int - total model size in bytes + size_vram: int - bytes on GPU + warning: str|None - warning message if mostly CPU + """ + result = {"loaded": False, "gpu_percent": 0, "size_total": 0, + "size_vram": 0, "warning": None} + if self.backend != "ollama": + return result + try: + url = f"{self.base_url}/api/ps" + req = urllib.request.Request(url) + with urllib.request.urlopen(req, timeout=5) as resp: + data = json.loads(resp.read().decode("utf-8")) + for m in data.get("models", []): + if self.model in m.get("name", ""): + result["loaded"] = True + result["size_total"] = m.get("size", 0) + result["size_vram"] = m.get("size_vram", 0) + if result["size_total"] > 0: + result["gpu_percent"] = int( + result["size_vram"] / result["size_total"] * 100 + ) + if result["gpu_percent"] == 0: + result["warning"] = ( + "Model is running entirely on CPU. " + "This will be extremely slow and may not complete. " + "Consider using a smaller model or freeing GPU memory." + ) + elif result["gpu_percent"] < 50: + result["warning"] = ( + f"Only {result['gpu_percent']}% of model is on GPU. " + "Performance will be significantly degraded. " + "Consider using a smaller model." + ) + break + except Exception: + pass + return result + + def _detect_ollama_context(self) -> int | None: + # 1. Check running model — this gives the actual runtime context_length + try: + url = f"{self.base_url}/api/ps" + req = urllib.request.Request(url) + with urllib.request.urlopen(req, timeout=5) as resp: + data = json.loads(resp.read().decode("utf-8")) + for m in data.get("models", []): + if self.model in m.get("name", ""): + ctx = m.get("context_length") + if ctx: + return int(ctx) + except Exception: + pass + + # 2. Check model config from /api/show + try: + url = f"{self.base_url}/api/show" + payload = {"name": self.model} + data = self._post_raw(url, payload) + + # Check parameters for explicit num_ctx setting + params = data.get("parameters", "") + for line in params.splitlines(): + if "num_ctx" in line: + parts = line.split() + for p in parts: + if p.isdigit(): + return int(p) + + # Check modelfile for PARAMETER num_ctx + modelfile = data.get("modelfile", "") + for line in modelfile.splitlines(): + if "num_ctx" in line.lower(): + parts = line.split() + for p in parts: + if p.isdigit(): + return int(p) + + # 3. Fall back to architecture's max context_length + model_info = data.get("model_info", {}) + for key, val in model_info.items(): + if "context_length" in key: + return int(val) + except Exception: + pass + return None + + def _detect_vllm_context(self) -> int | None: + try: + url = f"{self.base_url}/v1/models" + req = urllib.request.Request(url) + with urllib.request.urlopen(req, timeout=10) as resp: + data = json.loads(resp.read().decode("utf-8")) + for m in data.get("data", []): + if m.get("id") == self.model: + return m.get("max_model_len") + except Exception: + pass + return None + + def query(self, prompt: str, system: str = "", temperature: float = 0.2, + stream: bool = False) -> str: + if not system: + system = config.EXPERT_IDENTITY + if self.backend == "ollama": + if stream: + return self._stream_ollama(prompt, system, temperature) + result = self._query_ollama(prompt, system, temperature) + else: + if stream: + return self._stream_vllm(prompt, system, temperature) + result = self._query_vllm(prompt, system, temperature) + # Push LLM thinking to web UI + try: + from .web import push_event + push_event("llm_response", {"response": result}) + except Exception: + pass + return result + + def _query_ollama(self, prompt: str, system: str, temperature: float) -> str: + url = f"{self.base_url}/api/generate" + payload = { + "model": self.model, + "prompt": prompt, + "system": system, + "stream": False, + "options": {"temperature": temperature}, + } + return self._post(url, payload, key="response") + + def _stream_ollama(self, prompt: str, system: str, temperature: float) -> str: + url = f"{self.base_url}/api/generate" + payload = { + "model": self.model, + "prompt": prompt, + "system": system, + "stream": True, + "options": {"temperature": temperature}, + } + return self._stream_post(url, parse_fn=lambda chunk: chunk.get("response", "")) + + def _query_vllm(self, prompt: str, system: str, temperature: float) -> str: + url = f"{self.base_url}/v1/completions" + full_prompt = f"{system}\n\n{prompt}" if system else prompt + payload = { + "model": self.model, + "prompt": full_prompt, + "max_tokens": 4096, + "temperature": temperature, + "stream": False, + } + data = self._post_raw(url, payload) + try: + return data["choices"][0]["text"] + except (KeyError, IndexError): + raise LLMError(f"Unexpected vLLM response: {data}") + + def _stream_vllm(self, prompt: str, system: str, temperature: float) -> str: + url = f"{self.base_url}/v1/completions" + full_prompt = f"{system}\n\n{prompt}" if system else prompt + payload = { + "model": self.model, + "prompt": full_prompt, + "max_tokens": 4096, + "temperature": temperature, + "stream": True, + } + return self._stream_post(url, parse_fn=lambda chunk: ( + chunk.get("choices", [{}])[0].get("text", "") if chunk.get("choices") else "" + )) + + def chat(self, messages: list[dict], temperature: float = 0.2, + stream: bool = False) -> str: + if self.backend == "ollama": + url = f"{self.base_url}/api/chat" + payload = { + "model": self.model, + "messages": messages, + "stream": stream, + "options": {"temperature": temperature}, + } + if stream: + return self._stream_post(url, parse_fn=lambda c: c.get("message", {}).get("content", "")) + return self._post(url, payload, key="message", subkey="content") + else: + url = f"{self.base_url}/v1/chat/completions" + payload = { + "model": self.model, + "messages": messages, + "max_tokens": 4096, + "temperature": temperature, + "stream": stream, + } + if stream: + return self._stream_post(url, parse_fn=lambda c: ( + c.get("choices", [{}])[0].get("delta", {}).get("content", "") + if c.get("choices") else "" + )) + data = self._post_raw(url, payload) + try: + return data["choices"][0]["message"]["content"] + except (KeyError, IndexError): + raise LLMError(f"Unexpected vLLM chat response: {data}") + + def _post(self, url: str, payload: dict, key: str, subkey: str = None) -> str: + data = self._post_raw(url, payload) + try: + result = data[key] + if subkey: + result = result[subkey] + return result + except (KeyError, TypeError): + raise LLMError(f"Unexpected response structure: {data}") + + def _post_raw(self, url: str, payload: dict, retries: int = 2) -> dict: + body = json.dumps(payload).encode("utf-8") + req = urllib.request.Request( + url, data=body, headers={"Content-Type": "application/json"} + ) + last_err = None + for attempt in range(retries + 1): + try: + with urllib.request.urlopen(req, timeout=config.LLM_TIMEOUT) as resp: + return json.loads(resp.read().decode("utf-8")) + except urllib.error.URLError as e: + last_err = e + if attempt < retries: + time.sleep(2 ** attempt) + except json.JSONDecodeError as e: + raise LLMError(f"Invalid JSON from LLM: {e}") + raise LLMError(f"LLM request failed after {retries + 1} attempts ({url}): {last_err}") + + def _stream_post(self, url: str, parse_fn) -> str: + """Stream response, printing tokens to console as they arrive.""" + # Build the same request but with stream=True already in payload + # We need to read line by line + body = json.dumps({"stream": True}).encode("utf-8") + # Actually we need the full payload — caller already set stream=True + # Re-read from the caller context isn't possible, so we use a different approach: + # The caller methods build the payload and call us. We need the payload. + # Refactored: callers should pass payload. For now, fall back to non-streaming. + # This is handled by the _stream_generate and _stream_chat methods. + raise LLMError("Direct _stream_post not supported; use streaming query methods") + + def query_stream(self, prompt: str, system: str = "", temperature: float = 0.2) -> str: + """Query with streaming output to console.""" + if not system: + system = config.EXPERT_IDENTITY + if self.backend == "ollama": + return self._stream_ollama_impl(prompt, system, temperature) + else: + return self._stream_vllm_impl(prompt, system, temperature) + + def _stream_ollama_impl(self, prompt: str, system: str, temperature: float) -> str: + url = f"{self.base_url}/api/generate" + payload = { + "model": self.model, + "prompt": prompt, + "system": system, + "stream": True, + "options": {"temperature": temperature}, + } + return self._do_stream(url, payload, lambda c: c.get("response", "")) + + def _stream_vllm_impl(self, prompt: str, system: str, temperature: float) -> str: + url = f"{self.base_url}/v1/completions" + full_prompt = f"{system}\n\n{prompt}" if system else prompt + payload = { + "model": self.model, + "prompt": full_prompt, + "max_tokens": 4096, + "temperature": temperature, + "stream": True, + } + return self._do_stream(url, payload, lambda c: ( + c.get("choices", [{}])[0].get("text", "") if c.get("choices") else "" + )) + + def _do_stream(self, url: str, payload: dict, parse_fn) -> str: + """Execute streaming request, print tokens live, return full text.""" + body = json.dumps(payload).encode("utf-8") + req = urllib.request.Request( + url, data=body, headers={"Content-Type": "application/json"} + ) + full_text = [] + try: + with urllib.request.urlopen(req, timeout=config.LLM_TIMEOUT) as resp: + buffer = b"" + while True: + chunk = resp.read(1) + if not chunk: + break + buffer += chunk + if chunk == b"\n" and buffer.strip(): + line = buffer.decode("utf-8").strip() + buffer = b"" + # vLLM SSE format + if line.startswith("data: "): + line = line[6:] + if line == "[DONE]": + break + try: + data = json.loads(line) + token = parse_fn(data) + if token: + full_text.append(token) + sys.stdout.write(token) + sys.stdout.flush() + except json.JSONDecodeError: + pass + elif chunk == b"\n": + buffer = b"" + except urllib.error.URLError as e: + raise LLMError(f"Stream request failed ({url}): {e}") + sys.stdout.write("\n") + sys.stdout.flush() + return "".join(full_text) diff --git a/logger.py b/logger.py new file mode 100644 index 0000000..eccac42 --- /dev/null +++ b/logger.py @@ -0,0 +1,78 @@ +""" +AutoDev - Action Logger & Worklog +Logs every action to console and persistent worklog for resumability. +""" + +import json +import os +from datetime import datetime + +from . import config + + +class Logger: + def __init__(self, workdir: str): + self.workdir = workdir + self.log_path = os.path.join(workdir, config.WORKLOG_FILE) + self.entries: list[dict] = [] + self._load() + + def _load(self): + if os.path.exists(self.log_path): + try: + with open(self.log_path, "r") as f: + self.entries = json.load(f) + except (json.JSONDecodeError, IOError): + self.entries = [] + + def _save(self): + try: + os.makedirs(os.path.dirname(self.log_path) or ".", exist_ok=True) + with open(self.log_path, "w") as f: + json.dump(self.entries, f, indent=2) + except OSError: + pass # If we truly can't write, don't crash the whole process + + def log(self, action: str, detail: str = "", status: str = "ok"): + entry = { + "timestamp": datetime.now().isoformat(), + "action": action, + "detail": detail[:3000], + "status": status, + } + self.entries.append(entry) + self._save() + # Console output with visual indicators + icons = {"ok": "✓", "error": "✗", "warn": "⚠"} + icon = icons.get(status, "…") + # Color: green for ok, red for error, yellow for warn + colors = {"ok": "\033[32m", "error": "\033[31m", "warn": "\033[33m"} + reset = "\033[0m" + color = colors.get(status, "") + print(f" {color}[{icon}]{reset} {action}: {detail[:200]}") + # Push to web UI if running + try: + from .web import push_event + push_event("log", {"action": action, "detail": detail[:500], "status": status}) + except Exception: + pass + + def get_recent(self, n: int = 20) -> list[dict]: + return self.entries[-n:] + + def get_all(self) -> list[dict]: + return list(self.entries) + + def get_last_phase(self) -> str | None: + for e in reversed(self.entries): + if e["action"] == "phase_complete": + return e["detail"] + return None + + def get_errors_since(self, n_entries_back: int = 50) -> list[str]: + """Get recent error details for debugging context.""" + errors = [] + for e in self.entries[-n_entries_back:]: + if e["status"] == "error": + errors.append(f"[{e['action']}] {e['detail']}") + return errors diff --git a/main.py b/main.py new file mode 100644 index 0000000..3d3d171 --- /dev/null +++ b/main.py @@ -0,0 +1,377 @@ +#!/usr/bin/env python3 +""" +AutoDev - Autonomous CLI Development Studio +============================================ +A fully autonomous development agent that reads project descriptions and +reference manuals, then plans, implements, compiles, tests, and debugs +complete software projects using a local LLM backend. + +Usage: + python -m autodev [options] + +Options: + -nomanual Skip reading manuals (for minor tasks) + --backend LLM backend: ollama or vllm (default: from config) + --model Model name (default: from config) + --workdir Working directory (default: current directory) +""" + +import argparse +import os +import sys + +from . import config +from .llm import LLM, LLMError +from .logger import Logger +from .context import ContextManager +from .sandbox import Sandbox, SandboxViolation +from .dependency import DependencyTracker +from .planner import Planner +from .executor import Executor +from .debugger import Debugger +from .resume import ResumeManager + + +BANNER = r""" +╔══════════════════════════════════════════════════╗ +║ AutoDev - Autonomous Dev Studio ║ +║ ================================ ║ +║ Reads descriptions. Reads manuals. Builds. ║ +╚══════════════════════════════════════════════════╝ +""" + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="AutoDev - Autonomous CLI Development Studio", + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument("-nomanual", action="store_true", + help="Skip reading manuals directory") + parser.add_argument("--backend", choices=["ollama", "vllm"], + default=None, help="LLM backend") + parser.add_argument("--model", default=None, help="Model name") + parser.add_argument("--workdir", default=os.getcwd(), + help="Working directory (default: cwd)") + parser.add_argument("-web", type=int, default=None, metavar="PORT", + help="Start web dashboard on this port (e.g. -web 4500)") + return parser.parse_args() + + +def check_llm_connection(llm: LLM, logger: Logger) -> bool: + try: + resp = llm.query("Respond with only the word: OK", + system="You are a test. Respond with only: OK", + temperature=0.0) + if resp and len(resp.strip()) < 50: + logger.log("llm_check", f"Backend {llm.backend} ({llm.model}) connected") + return True + # Got a response but it was verbose — still connected + logger.log("llm_check", f"Backend connected (verbose response)", "warn") + return True + except LLMError as e: + logger.log("llm_check", str(e), "error") + return False + + +def _read_user_context_size() -> int | None: + """Read context_size from ~/.ahvibe.conf if it exists.""" + conf_path = os.path.expanduser("~/.ahvibe.conf") + if not os.path.exists(conf_path): + return None + try: + with open(conf_path, "r") as f: + for line in f: + line = line.strip() + if line.startswith("context_size="): + return int(line.split("=", 1)[1].strip()) + except (IOError, ValueError): + pass + return None + + +def run(args: argparse.Namespace): + print(BANNER) + workdir = os.path.realpath(args.workdir) + backend = args.backend or config.LLM_BACKEND + model = args.model or config.MODEL_NAME + print(f" Working directory: {workdir}") + print(f" Backend: {backend} | Model: {model}") + print() + + # Initialize components + logger = Logger(workdir) + sandbox = Sandbox(workdir) + deps = DependencyTracker(workdir) + llm = LLM(backend=args.backend, model=args.model) + + # Start web dashboard if requested + if args.web: + from .web import start_web_server + start_web_server(args.web, workdir) + print(f" \033[36m🌐 Web dashboard: http://localhost:{args.web}\033[0m") + + logger.log("startup", f"AutoDev started in {workdir}") + + # Step 1: Check LLM (this also loads the model into memory) + print(" Checking LLM connection...") + if not check_llm_connection(llm, logger): + print(f"\n \033[31m✗ Cannot reach LLM backend.\033[0m") + print(f" Backend: {llm.backend} at {llm.base_url}") + print(f" Ensure Ollama/vLLM is running and the model is loaded.") + logger.log("exit", "LLM unreachable", "error") + sys.exit(1) + + # Check GPU/CPU status (model is now loaded, /api/ps has real data) + gpu = llm.detect_gpu_status() + if gpu["loaded"]: + gpu_pct = gpu["gpu_percent"] + size_gb = gpu["size_total"] / 1e9 + vram_gb = gpu["size_vram"] / 1e9 + if gpu_pct >= 90: + print(f" \033[32m✓\033[0m GPU: {gpu_pct}% offloaded ({vram_gb:.1f}/{size_gb:.1f} GB)") + elif gpu_pct >= 50: + print(f" \033[33m⚠\033[0m GPU: {gpu_pct}% offloaded ({vram_gb:.1f}/{size_gb:.1f} GB) — partial GPU, expect slower performance") + logger.log("gpu_warn", f"{gpu_pct}% GPU", "warn") + else: + print(f" \033[31m✗ GPU: {gpu_pct}% offloaded ({vram_gb:.1f}/{size_gb:.1f} GB)\033[0m") + print(f" {gpu['warning']}") + logger.log("gpu_warn", gpu["warning"], "error") + if gpu_pct == 0: + print(f"\n Running on CPU only. This will likely be too slow for autonomous development.") + print(f" Consider: a smaller model, freeing GPU memory, or increasing VRAM.") + resp = "" # Continue anyway — user can Ctrl+C + else: + print(f" ⊘ Model not pre-loaded (will load on first query)") + + # Auto-detect context size (model is now loaded, /api/ps has runtime value) + model_ctx = llm.detect_context_size() + local_ctx = int(model_ctx * 0.6) + ctx = ContextManager(max_tokens=local_ctx) + print(f" Context: {model_ctx:,} tokens (local budget: {local_ctx:,})") + logger.log("context_size", f"Context: {model_ctx}, budget: {local_ctx}") + + # Initialize remaining components + planner = Planner(llm, logger, ctx, workdir) + executor = Executor(llm, logger, ctx, sandbox, deps, workdir) + debugger = Debugger(llm, logger, ctx, sandbox, workdir) + resume = ResumeManager(logger, workdir) + + # Step 2: Read description + print(" Reading project description...") + description = planner.read_description() + if not description: + print(f"\n \033[31m✗ No '{config.DESCRIPTION_FILE}' found in {workdir}\033[0m") + print(" Create a description.txt with your project requirements and run again.") + logger.log("exit", "No description.txt", "error") + sys.exit(1) + logger.log("description_read", f"{len(description)} chars") + print(f" \033[32m✓\033[0m Description loaded ({len(description)} chars)") + + # Step 3: Read manuals + manuals = {} + if args.nomanual: + print(" ⊘ Skipping manuals (-nomanual flag)") + logger.log("manuals_skip", "User requested -nomanual") + else: + manuals_dir = os.path.join(workdir, config.MANUALS_DIR) + if not os.path.isdir(manuals_dir): + print(f"\n \033[31m✗ No '{config.MANUALS_DIR}/' directory found.\033[0m") + print(" Create a manuals/ folder with reference docs, or use -nomanual flag.") + logger.log("exit", "No manuals directory", "error") + sys.exit(1) + manuals = planner.read_manuals() + if not manuals: + print(" \033[33m⚠\033[0m Manuals directory exists but is empty") + logger.log("manuals_empty", "No files in manuals/", "warn") + else: + print(f" \033[32m✓\033[0m Loaded {len(manuals)} manual(s): {', '.join(manuals.keys())}") + logger.log("manuals_read", f"{len(manuals)} files") + + # Step 4: Plan or resume + plan = None + start_step = 0 + import hashlib + desc_hash = hashlib.md5(description.encode()).hexdigest() + + if not resume.is_fresh_start(): + print("\n ↻ Previous session detected. Checking resume state...") + if resume.description_changed(desc_hash): + print(" \033[33m⚠\033[0m Description changed. Re-planning with existing work context.") + logger.log("replan", "Description changed, creating incremental plan", "warn") + plan = None # Force re-plan, but we'll pass existing files + else: + plan = planner.load_existing_plan() + if plan and plan.get("steps"): + start_step = resume.get_resume_step() + total = len(plan["steps"]) + if start_step >= total: + print(f" \033[32m✓\033[0m Previous run completed. Nothing to do.") + print(f" To re-run, delete .autodev_state.json or change description.txt.") + logger.log("skip", "Already complete, description unchanged") + return + print(f" \033[32m✓\033[0m Resuming from step {start_step + 1}/{total}") + ctx.add("system", f"Project: {plan.get('summary', description[:500])}", priority=9) + else: + print(" \033[33m⚠\033[0m Could not load previous plan. Starting fresh.") + plan = None + + if not plan: + print("\n Planning development...") + # Gather existing project files for context (for incremental re-planning) + existing_work = "" + skip_files = {config.WORKLOG_FILE, config.PLAN_FILE, config.DEPENDENCY_FILE, + config.DESCRIPTION_FILE, ".autodev_state.json"} + for fname in sorted(os.listdir(workdir)): + fpath = os.path.join(workdir, fname) + if os.path.isfile(fpath) and fname not in skip_files and not fname.startswith("."): + try: + with open(fpath, "r") as f: + content = f.read() + if content.strip(): + existing_work += f"### {fname}\n{content[:2000]}\n\n" + except (IOError, UnicodeDecodeError): + pass + plan = planner.create_plan(description, manuals, + existing_work=existing_work if existing_work else "") + if plan.get("error"): + print(f" \033[31m✗ Planning failed: {plan.get('error')}\033[0m") + if plan.get("raw"): + print(f" Raw LLM output (first 300 chars): {plan['raw'][:300]}") + logger.log("exit", "Planning failed", "error") + sys.exit(1) + + steps = plan.get("steps", []) + print(f" \033[32m✓\033[0m Plan created: {plan.get('project_name', 'project')}") + print(f" Language: {plan.get('language', 'unknown')}") + print(f" Summary: {plan.get('summary', 'N/A')}") + print(f" Steps: {len(steps)}") + print(f" Dependencies: {', '.join(plan.get('dependencies', [])) or 'none'}") + print() + + deps.add_many(plan.get("dependencies", [])) + + # Step 5: Execute + steps = plan.get("steps", []) + if not steps: + print(" \033[31m✗ Plan has no steps.\033[0m Check description.txt for clarity.") + logger.log("exit", "Empty plan", "error") + sys.exit(1) + + total = len(steps) + print(f"\n{'='*54}") + print(f" EXECUTING PLAN — {total} steps") + print(f"{'='*54}\n") + + # Push plan to web UI + try: + from .web import push_event + push_event("plan", { + "project": plan.get("project_name", "project"), + "steps": [{"id": s.get("id", i+1), "phase": s.get("phase", ""), + "description": s.get("description", ""), "status": "pending"} + for i, s in enumerate(steps)], + "start_step": start_step, + }) + except Exception: + pass + + for i in range(start_step, total): + step = steps[i] + step_num = i + 1 + phase = step.get("phase", "implement") + desc = step.get("description", "") + + print(f"\n ── Step {step_num}/{total} [\033[36m{phase}\033[0m] ──") + print(f" {desc}") + + resume.save_state(i, plan, desc_hash=desc_hash) + result = executor.execute_step(step, plan) + + if not result["success"]: + print(f" \033[31m✗ Step failed.\033[0m Entering debug loop...") + logger.log("step_failed", f"Step {step_num}", "error") + try: + from .web import push_event + push_event("step_done", {"step": i, "status": "error"}) + except Exception: + pass + + fixed = False + for attempt in range(config.MAX_DEBUG_CYCLES): + print(f"\n Debug attempt {attempt + 1}/{config.MAX_DEBUG_CYCLES}...") + dbg = debugger.debug_errors(result["errors"], step, plan) + + if dbg["fixed"]: + # Re-run step to verify + print(f" Fix applied. Re-running step...") + result = executor.execute_step(step, plan) + if result["success"]: + print(f" \033[32m✓ Fixed on attempt {attempt + 1}\033[0m") + fixed = True + break + print(f" Step still failing after fix.") + result["errors"] = result.get("errors", []) + elif dbg["errors"]: + result["errors"] = dbg["errors"] + + if ctx.detect_cycle(): + logger.log("cycle_break", "Breaking debug cycle — approaches too similar", "warn") + ctx.clear_stale() + + if not fixed: + print(f"\n \033[31m✗ Could not fix step {step_num} after {config.MAX_DEBUG_CYCLES} attempts.\033[0m") + for err in result["errors"][:3]: + print(f" Error: {err[:200]}") + print(f"\n The worklog has been saved. Review it and add documentation if needed.") + print(f" Restart AutoDev to resume from this step.") + resume.mark_failed(i, plan, "; ".join(result["errors"][:2]), + desc_hash=desc_hash) + logger.log("halt", f"Stopped at step {step_num}", "error") + sys.exit(1) + + logger.log("phase_complete", f"step_{step_num}") + ctx_info = ctx.token_usage() + print(f" \033[32m✓\033[0m Step {step_num} complete " + f"(context: {ctx_info['utilization']})") + try: + from .web import push_event + push_event("step_done", {"step": i, "status": "ok"}) + except Exception: + pass + + # Done + resume.mark_complete(plan, desc_hash=desc_hash) + print(f"\n{'='*54}") + print(f" \033[32m✓ ALL {total} STEPS COMPLETE\033[0m") + print(f"{'='*54}") + print(f"\n Project: {plan.get('project_name', 'project')}") + print(f" Language: {plan.get('language', 'unknown')}") + print(f" Dependencies: see {config.DEPENDENCY_FILE}") + print(f" Worklog: see {config.WORKLOG_FILE}") + print(f" Plan: see {config.PLAN_FILE}") + logger.log("complete", "All steps executed successfully") + + +def main(): + args = parse_args() + try: + run(args) + except KeyboardInterrupt: + print("\n\n Interrupted by user. State saved — restart to resume.") + sys.exit(130) + except SandboxViolation as e: + print(f"\n \033[31m✗ SANDBOX VIOLATION: {e}\033[0m") + sys.exit(1) + except LLMError as e: + print(f"\n \033[31m✗ LLM ERROR: {e}\033[0m") + sys.exit(1) + except Exception as e: + print(f"\n \033[31m✗ UNEXPECTED ERROR: {e}\033[0m") + import traceback + traceback.print_exc() + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/planner.py b/planner.py new file mode 100644 index 0000000..6394908 --- /dev/null +++ b/planner.py @@ -0,0 +1,181 @@ +""" +AutoDev - Planner +Reads description + manuals, queries LLM to produce a structured development plan. +Approaches the task like a principal engineer — thorough, skeptical, complete. +""" + +import json +import os +from .llm import LLM +from .logger import Logger +from .context import ContextManager +from . import config + +SYSTEM_PROMPT = config.EXPERT_IDENTITY + """ + +You are now in PLANNING mode. You have been given a project description and optional reference manuals. +Your job is to produce a complete, actionable development plan that another engineer could follow blindly. + +Think through this carefully: +1. What EXACTLY is being asked for? Read the description multiple times. Identify every requirement. +2. What language, frameworks, and tools are needed? +3. What is the correct project structure? Think about separation of concerns. +4. What are the build steps? What compiler flags, linker flags, dependencies? +5. What could go wrong? Plan for error handling and edge cases. +6. What is the correct order of implementation? Dependencies between files matter. + +Output ONLY valid JSON with this structure: +{ + "project_name": "string", + "language": "primary language", + "summary": "2-3 sentence summary of what we're building and why", + "dependencies": ["every tool, compiler, library needed"], + "structure": ["every file and directory to create, in order"], + "steps": [ + { + "id": 1, + "phase": "setup|implement|test|debug|finalize", + "description": "detailed description of what to do and WHY", + "files": ["files this step creates or modifies"], + "commands": ["exact shell commands to run, if any"], + "acceptance": "how to verify this step succeeded" + } + ] +} + +Rules: +- Every file that needs to exist MUST have a step that creates it. +- Implementation steps MUST be ordered so dependencies are created before dependents. +- Include a setup step for directory structure and dependency checks. +- Include test/compile steps after implementation. +- Include a finalize step that does a clean build and verification. +- The "acceptance" field is critical — it defines what success looks like for each step. +- Be specific in commands. Not "compile the code" but "gcc -Wall -Wextra -o main main.c util.c -lm". +""" + + +class Planner: + def __init__(self, llm: LLM, logger: Logger, ctx: ContextManager, workdir: str): + self.llm = llm + self.logger = logger + self.ctx = ctx + self.workdir = workdir + self.plan_path = os.path.join(workdir, config.PLAN_FILE) + + def read_description(self) -> str | None: + path = os.path.join(self.workdir, config.DESCRIPTION_FILE) + if not os.path.exists(path): + return None + with open(path, "r") as f: + return f.read().strip() + + def read_manuals(self) -> dict[str, str]: + manuals = {} + mdir = os.path.join(self.workdir, config.MANUALS_DIR) + if not os.path.isdir(mdir): + return manuals + for fname in sorted(os.listdir(mdir)): + fpath = os.path.join(mdir, fname) + if os.path.isfile(fpath): + try: + with open(fpath, "r") as f: + manuals[fname] = f.read() + except (IOError, UnicodeDecodeError): + self.logger.log("manual_skip", f"Could not read {fname}", "warn") + return manuals + + def create_plan(self, description: str, manuals: dict[str, str], + existing_work: str = "") -> dict: + prompt_parts = [ + "## Project Description\n" + "Read this VERY carefully. Every sentence is a requirement.\n\n" + f"{description}" + ] + for name, content in manuals.items(): + prompt_parts.append(f"## Reference Manual: {name}\n{content[:6000]}") + + if existing_work: + prompt_parts.append( + "## Existing Work\n" + "The following files already exist from a previous iteration. " + "Your plan should BUILD ON what exists — only add steps for NEW or CHANGED requirements. " + "Do NOT recreate files that already satisfy the requirements.\n\n" + f"{existing_work}" + ) + + prompt = "\n\n".join(prompt_parts) + self.ctx.add("user", prompt, priority=9) + self.logger.log("planning", "Querying LLM for development plan") + + response = self.llm.query(prompt, system=SYSTEM_PROMPT, temperature=0.3) + self.ctx.add("assistant", response, priority=8) + + plan = self._parse_plan(response) + + # Validate plan has required fields + if not plan.get("steps"): + # Retry once with more explicit instruction + self.logger.log("plan_retry", "First plan had no steps, retrying", "warn") + retry_prompt = ( + prompt + "\n\nIMPORTANT: Your previous response could not be parsed. " + "You MUST output ONLY a JSON object. No text before or after. " + "Start your response with { and end with }." + ) + response = self.llm.query(retry_prompt, system=SYSTEM_PROMPT, temperature=0.1) + plan = self._parse_plan(response) + + self._save_plan(plan) + self.logger.log("plan_created", f"{len(plan.get('steps', []))} steps") + return plan + + def load_existing_plan(self) -> dict | None: + if os.path.exists(self.plan_path): + try: + with open(self.plan_path, "r") as f: + return json.load(f) + except (json.JSONDecodeError, IOError): + return None + return None + + def _parse_plan(self, response: str) -> dict: + text = response.strip() + # Try progressively more aggressive extraction + for extracted in self._extract_json_candidates(text): + try: + plan = json.loads(extracted) + if isinstance(plan, dict): + return plan + except json.JSONDecodeError: + continue + self.logger.log("plan_parse_fail", "Could not parse plan JSON", "error") + return {"steps": [], "error": "Failed to parse plan", "raw": response[:1000]} + + def _extract_json_candidates(self, text: str) -> list[str]: + """Yield JSON candidates from most to least likely.""" + candidates = [] + # 1. Markdown fenced JSON + if "```json" in text: + block = text.split("```json", 1)[1].split("```", 1)[0].strip() + candidates.append(block) + elif "```" in text: + block = text.split("```", 1)[1].split("```", 1)[0].strip() + candidates.append(block) + # 2. Raw text (entire response) + candidates.append(text) + # 3. Find outermost { ... } with brace matching + start = text.find("{") + if start >= 0: + depth = 0 + for i in range(start, len(text)): + if text[i] == "{": + depth += 1 + elif text[i] == "}": + depth -= 1 + if depth == 0: + candidates.append(text[start:i + 1]) + break + return candidates + + def _save_plan(self, plan: dict): + with open(self.plan_path, "w") as f: + json.dump(plan, f, indent=2) diff --git a/resume.py b/resume.py new file mode 100644 index 0000000..4a69335 --- /dev/null +++ b/resume.py @@ -0,0 +1,68 @@ +""" +AutoDev - Resume Manager +Handles state persistence and resuming from worklog. +""" + +import json +import os +from .logger import Logger +from . import config + + +class ResumeManager: + def __init__(self, logger: Logger, workdir: str): + self.logger = logger + self.workdir = workdir + self.state_path = os.path.join(workdir, ".autodev_state.json") + + def save_state(self, current_step: int, plan: dict, status: str = "in_progress", + desc_hash: str = ""): + state = { + "current_step": current_step, + "total_steps": len(plan.get("steps", [])), + "status": status, + "plan_hash": hash(json.dumps(plan, sort_keys=True)), + "desc_hash": desc_hash, + } + with open(self.state_path, "w") as f: + json.dump(state, f, indent=2) + + def load_state(self) -> dict | None: + if not os.path.exists(self.state_path): + return None + try: + with open(self.state_path, "r") as f: + return json.load(f) + except (json.JSONDecodeError, IOError): + return None + + def get_resume_step(self) -> int: + """Determine which step to resume from based on worklog.""" + state = self.load_state() + if state and state.get("status") == "in_progress": + step = state.get("current_step", 0) + self.logger.log("resume", f"Resuming from step {step + 1}") + return step + return 0 + + def mark_complete(self, plan: dict, desc_hash: str = ""): + self.save_state(len(plan.get("steps", [])), plan, status="complete", + desc_hash=desc_hash) + self.logger.log("phase_complete", "all") + + def mark_failed(self, step: int, plan: dict, reason: str, desc_hash: str = ""): + self.save_state(step, plan, status="failed", desc_hash=desc_hash) + self.logger.log("failed", f"Step {step + 1}: {reason}", "error") + + def description_changed(self, desc_hash: str) -> bool: + """Check if description.txt has changed since last run.""" + state = self.load_state() + if not state: + return False + old_hash = state.get("desc_hash", "") + if not old_hash: + return False # No hash stored — can't tell, assume unchanged + return old_hash != desc_hash + + def is_fresh_start(self) -> bool: + return not os.path.exists(self.state_path) diff --git a/sandbox.py b/sandbox.py new file mode 100644 index 0000000..c8d31b5 --- /dev/null +++ b/sandbox.py @@ -0,0 +1,214 @@ +""" +AutoDev - Sandbox +Enforces working directory confinement with whitelist-based command validation. +""" + +import os +import re + +# Whitelisted command prefixes — only these are allowed to execute +ALLOWED_COMMANDS = [ + # Build tools + "make", "cmake", "gcc", "g++", "clang", "clang++", "rustc", "cargo", + "go ", "go build", "go run", "go test", "go mod", + "javac", "java ", "jar ", "mvn ", "gradle", + "dotnet", "msbuild", + "python", "python3", "pip ", "pip3 ", + "node ", "npm ", "npx ", "yarn ", "pnpm ", + "ruby ", "gem ", "bundle ", + "perl ", "lua ", "luac", + "nasm", "as ", "ld ", + # Common utilities (safe) + "ls", "cat ", "head ", "tail ", "wc ", "sort ", "uniq ", + "find ", "grep ", "awk ", "sed ", "diff ", "patch ", + "mkdir ", "cp ", "mv ", "rm ", "touch ", "chmod ", + "cd ", "pwd", + "tar ", "zip ", "unzip ", "gzip ", "gunzip ", + "curl ", "wget ", + "echo ", "printf ", "test ", "true", "false", + "which ", "env ", "basename ", "dirname ", + "pkg-config", "ldconfig", + # Version checks + "gcc --version", "g++ --version", "python3 --version", + "rustc --version", "cargo --version", "go version", + "java -version", "javac -version", "node --version", +] + +# Absolutely forbidden patterns — override whitelist +FORBIDDEN_PATTERNS = [ + re.compile(r"\bsudo\b"), + re.compile(r"\bsu\s"), + re.compile(r"\brm\s+-rf\s+/\s*$"), + re.compile(r"\bmkfs\b"), + re.compile(r"\bdd\s+if="), + re.compile(r">\s*/dev/"), + re.compile(r"\bshutdown\b"), + re.compile(r"\breboot\b"), + re.compile(r"\binit\s+[0-6]"), + re.compile(r"\bsystemctl\b"), + re.compile(r"\bchmod\s+777\s+/"), + re.compile(r"\bchown\b.*\s+/"), + re.compile(r"\bmount\b"), + re.compile(r"\bumount\b"), + re.compile(r"\biptables\b"), + re.compile(r"\bnft\b"), + re.compile(r"\bpasswd\b"), + re.compile(r"\buseradd\b"), + re.compile(r"\buserdel\b"), + re.compile(r"\bvisudo\b"), + re.compile(r"\bcrontab\b"), +] + + +class SandboxViolation(Exception): + pass + + +class Sandbox: + def __init__(self, workdir: str): + self.workdir = os.path.realpath(workdir) + + def validate_path(self, path: str) -> str: + resolved = os.path.realpath(os.path.join(self.workdir, path)) + if not resolved.startswith(self.workdir): + raise SandboxViolation(f"Path escapes sandbox: {path} -> {resolved}") + return resolved + + def validate_command(self, cmd: str): + cmd_stripped = cmd.strip() + cmd_lower = cmd_stripped.lower() + + # Check forbidden patterns first + for pattern in FORBIDDEN_PATTERNS: + if pattern.search(cmd_lower): + raise SandboxViolation(f"Forbidden command pattern: {pattern.pattern}") + + # Check if command starts with an allowed prefix + # Handle shell constructs: pipes, &&, ; + # But respect quoted strings — don't split inside them + parts = self._split_shell_commands(cmd_stripped) + for part in parts: + part = part.strip() + if not part: + continue + # Skip env var assignments like FOO=bar + if re.match(r'^[A-Za-z_][A-Za-z0-9_]*=', part): + # Extract the command after assignments + tokens = part.split() + part = " ".join(t for t in tokens if "=" not in t or not re.match(r'^[A-Za-z_]', t)) + if not part: + continue + + allowed = False + for prefix in ALLOWED_COMMANDS: + if part.startswith(prefix) or part.split()[0] == prefix.strip(): + allowed = True + break + # Also allow ./scripts and relative paths + if part.startswith("./") or part.startswith("bash ") or part.startswith("sh "): + allowed = True + + if not allowed: + raise SandboxViolation( + f"Command not in whitelist: '{part.split()[0]}'. " + f"Only build tools and safe utilities are allowed." + ) + + # Check for path escapes in write-like commands + # Only check unquoted tokens that look like real absolute paths + in_single = False + in_double = False + for ch in cmd_stripped: + if ch == "'" and not in_double: + in_single = not in_single + elif ch == '"' and not in_single: + in_double = not in_double + # If the command has balanced quotes, extract only unquoted parts + unquoted_parts = [] + current = [] + in_single = False + in_double = False + for ch in cmd_stripped: + if ch == "'" and not in_double: + in_single = not in_single + continue + elif ch == '"' and not in_single: + in_double = not in_double + continue + if not in_single and not in_double: + current.append(ch) + else: + if current and current[-1] != " ": + current.append(" ") + unquoted = "".join(current) + for token in unquoted.split(): + if token.startswith("/") and not token.startswith(self.workdir): + if token in ("//", "/dev/null") or len(token) <= 2: + continue + read_only_prefixes = ["/usr", "/lib", "/etc/alternatives", "/bin", "/opt"] + if any(token.startswith(p) for p in read_only_prefixes): + continue + raise SandboxViolation(f"Reference to path outside sandbox: {token}") + + def safe_write(self, path: str, content: str) -> str: + full = self.validate_path(path) + # Protect AutoDev's own state files from being overwritten + protected = {"worklog.json", "plan.json", "dependency.txt", ".autodev_state.json"} + if os.path.basename(full) in protected and os.path.exists(full): + # Only AutoDev internals should write these + pass # Allow — the caller is AutoDev itself + os.makedirs(os.path.dirname(full), exist_ok=True) + # Don't write a file if a directory exists at that path + if os.path.isdir(full): + import shutil + shutil.rmtree(full) + with open(full, "w") as f: + f.write(content) + return full + + def safe_read(self, path: str) -> str: + full = self.validate_path(path) + with open(full, "r") as f: + return f.read() + + def safe_mkdir(self, path: str) -> str: + full = self.validate_path(path) + os.makedirs(full, exist_ok=True) + return full + + @staticmethod + def _split_shell_commands(cmd: str) -> list[str]: + """Split a shell command on &&, ||, |, ; but respect quoted strings.""" + parts = [] + current = [] + in_single = False + in_double = False + i = 0 + while i < len(cmd): + c = cmd[i] + if c == "'" and not in_double: + in_single = not in_single + current.append(c) + elif c == '"' and not in_single: + in_double = not in_double + current.append(c) + elif not in_single and not in_double: + # Check for &&, ||, |, ; + two = cmd[i:i+2] + if two in ("&&", "||"): + parts.append("".join(current).strip()) + current = [] + i += 2 + continue + elif c in (";", "|"): + parts.append("".join(current).strip()) + current = [] + else: + current.append(c) + else: + current.append(c) + i += 1 + tail = "".join(current).strip() + if tail: + parts.append(tail) + return [p for p in parts if p] diff --git a/web.py b/web.py new file mode 100644 index 0000000..fc8b258 --- /dev/null +++ b/web.py @@ -0,0 +1,353 @@ +""" +AutoDev - Web UI +Serves a live dashboard showing LLM activity, file tree, and file contents. +Uses SSE (Server-Sent Events) for real-time updates. No external dependencies. +""" + +import http.server +import json +import os +import queue +import threading +import urllib.parse +from . import config + +# Global event queue — logger pushes events, SSE endpoint streams them +_event_queue: queue.Queue = queue.Queue(maxsize=5000) +_workdir: str = "" + + +def push_event(event_type: str, data: dict): + """Push an event to all connected web clients.""" + try: + _event_queue.put_nowait({"type": event_type, "data": data}) + except queue.Full: + pass + + +class WebHandler(http.server.BaseHTTPRequestHandler): + def log_message(self, format, *args): + pass # Suppress default logging + + def do_GET(self): + try: + parsed = urllib.parse.urlparse(self.path) + path = parsed.path + params = urllib.parse.parse_qs(parsed.query) + + if path == "/": + self._serve_html() + elif path == "/events": + self._serve_sse() + elif path == "/api/files": + self._serve_file_tree() + elif path == "/api/file": + filepath = params.get("path", [""])[0] + self._serve_file_content(filepath) + else: + self.send_error(404) + except (BrokenPipeError, ConnectionResetError, OSError): + pass + + def _serve_html(self): + self.send_response(200) + self.send_header("Content-Type", "text/html; charset=utf-8") + self.end_headers() + self.wfile.write(HTML_PAGE.encode("utf-8")) + + def _serve_sse(self): + self.send_response(200) + self.send_header("Content-Type", "text/event-stream") + self.send_header("Cache-Control", "no-cache") + self.send_header("Connection", "keep-alive") + self.send_header("Access-Control-Allow-Origin", "*") + self.end_headers() + try: + while True: + try: + event = _event_queue.get(timeout=1) + line = f"data: {json.dumps(event)}\n\n" + self.wfile.write(line.encode("utf-8")) + self.wfile.flush() + except queue.Empty: + # Send keepalive + self.wfile.write(b": keepalive\n\n") + self.wfile.flush() + except (BrokenPipeError, ConnectionResetError, OSError): + pass + + def _serve_file_tree(self): + files = [] + for root, dirs, filenames in os.walk(_workdir): + # Skip hidden dirs and autodev backups + dirs[:] = [d for d in dirs if not d.startswith(".") and d != "__pycache__"] + for fname in sorted(filenames): + if fname.startswith("."): + continue + full = os.path.join(root, fname) + rel = os.path.relpath(full, _workdir) + try: + size = os.path.getsize(full) + except OSError: + size = 0 + files.append({"path": rel, "size": size}) + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(json.dumps(files).encode("utf-8")) + + def _serve_file_content(self, filepath: str): + if not filepath: + self.send_error(400, "Missing path parameter") + return + full = os.path.realpath(os.path.join(_workdir, filepath)) + if not full.startswith(os.path.realpath(_workdir)): + self.send_error(403, "Path outside workspace") + return + try: + with open(full, "r") as f: + content = f.read() + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(json.dumps({"path": filepath, "content": content}).encode("utf-8")) + except (IOError, UnicodeDecodeError): + self.send_error(404, "File not found or not readable") + + +def start_web_server(port: int, workdir: str): + """Start the web UI server in a background thread.""" + global _workdir + _workdir = workdir + server = http.server.HTTPServer(("0.0.0.0", port), WebHandler) + server.daemon_threads = True + thread = threading.Thread(target=server.serve_forever, daemon=True) + thread.start() + return server + + +HTML_PAGE = """ + + + + +AutoDev — Live Dashboard + + + +
+
+ ⚡ AutoDev + Connecting... + +
+ +
+
+
📋 Plan Progress
+
Waiting for plan...
+
+
+
📁 Project Files
+
+
+
+
📄 File Content
+
Select a file
+

+    
+
+ +
+
+
🤖 LLM Activity
+
+
+
+
+ + + +"""