First commit, working quite ok!

This commit is contained in:
2026-04-09 08:20:32 +02:00
commit c6dee66c4b
30 changed files with 2811 additions and 0 deletions
+232
View File
@@ -0,0 +1,232 @@
# AutoDev — Autonomous CLI Development Studio
AutoDev reads a project description and reference manuals, then autonomously plans, implements, compiles, tests, and debugs complete software projects using a local LLM.
No cloud APIs. No subscriptions. Runs entirely on your machine with [Ollama](https://ollama.com) or [vLLM](https://github.com/vllm-project/vllm).
## How It Works
```
description.txt + manuals/ → LLM plans the project → writes code → compiles → tests → debugs → delivers
```
1. You write a `description.txt` explaining what you want built
2. You put reference documentation in a `manuals/` folder
3. You run `autodev`
4. AutoDev reads everything, creates a development plan, and executes it step by step
5. If something fails to compile or run, it debugs itself — analyzing errors, generating fixes, and retrying
6. When done, you have a working project
You don't interact with it. You watch it work.
## Quick Start
```bash
# 1. Make sure Ollama is running with a model loaded
ollama run qwen2.5-coder:14b
# 2. Set up your project folder
mkdir my-project && cd my-project
mkdir manuals
# 3. Write what you want
cat > description.txt << 'EOF'
Language: Python
Build a CLI tool that converts CSV files to JSON.
It should accept an input file and output file as arguments.
Handle errors gracefully if the input file doesn't exist.
EOF
# 4. Add any reference docs (API docs, specs, examples)
cp csv-format-spec.pdf manuals/
# 5. Run AutoDev
autodev
```
## Installation
```bash
# Clone the repository
git clone https://github.com/your-username/autodev.git
cd autodev
# Symlink to your PATH
ln -s $(pwd)/autodev/autodev-cli ~/.local/bin/autodev
# or
ln -s $(pwd)/autodev/autodev-cli ~/bin/autodev
# Alternatively, run directly
python -m autodev --workdir /path/to/project
```
### Requirements
- Python 3.10+
- [Ollama](https://ollama.com) or [vLLM](https://github.com/vllm-project/vllm) running locally or on your network
- No pip dependencies — uses only the Python standard library
## Configuration
Edit `autodev/config.py` to set your LLM backend:
```python
LLM_BACKEND = "ollama" # "ollama" or "vllm"
OLLAMA_URL = "http://localhost:11434" # your Ollama instance
MODEL_NAME = "qwen2.5-coder:14b" # any model Ollama serves
```
You can also override at runtime:
```bash
autodev --backend ollama --model gemma4:e4b
```
### Tested Models
All models were tested against the same task: plan, implement, compile, test, and debug a C "hello world" project with a Makefile. Tested on Ollama with GPU offload.
| Model | Size | Result | Speed | Notes |
|-------|------|--------|-------|-------|
| `gemma4:e4b` | ~12B | ✅ Pass | Fast | Clean run, no debug needed. Best balance of speed and quality. **Recommended.** |
| `gemma3:27b` | 27B | ✅ Pass | Slow | Works well but slow. Needed sandbox fixes during early testing. Good for complex projects. |
| `gemma4:e2b` | ~8B | ❌ Fail | Very fast | Plans OK, but setup created a directory that blocked the executable name. Could not self-correct — repeated the same failed approach 10 times. |
| `gemma3:4b` | 4B | ❌ Fail | Very fast | Steps 14 passed, but debugger hallucinated a nonexistent `hello.c` file and could not reason about what files actually exist on disk. |
| `qwen2.5-coder:7b` | 7B | ❌ Fail | Fast | Classified "create main.c" as setup instead of implement, so the file was never generated. Debugger could not write a valid Makefile after 10 attempts. |
**Takeaway:** Models below ~12B parameters can plan and generate simple code, but they cannot self-correct when things go wrong. They repeat failed approaches, hallucinate files, and produce broken build scripts. **14B+ recommended for autonomous development.**
## Project Structure
Your project folder needs:
```
my-project/
├── description.txt # Required — what to build
└── manuals/ # Required — reference docs (use -nomanual to skip)
├── api-spec.md
└── protocol.txt
```
AutoDev creates these files as it works:
```
my-project/
├── description.txt
├── manuals/
├── plan.json # The development plan (human-readable)
├── worklog.json # Every action logged with timestamps
├── dependency.txt # External dependencies (compilers, libraries)
├── .autodev_state.json
└── ... your project files ...
```
## Features
### Autonomous Development Loop
Reads the description, understands the requirements, creates a structured plan, then executes it: setup → implement → compile → test → debug → finalize. No human input needed during execution.
### Self-Debugging
When compilation or tests fail, AutoDev enters a debug loop:
- Analyzes the error and source code
- Diagnoses root cause (not just symptoms)
- Generates a fix and applies it
- Verifies the fix works
- Rolls back automatically if the fix makes things worse
- Tracks failed approaches so it never repeats the same fix twice
### Resumable Sessions
Every action is logged to `worklog.json`. If AutoDev is interrupted or fails:
```bash
# Just run it again — it picks up where it left off
autodev
```
It reads the worklog, loads the existing plan, and continues from the last incomplete step.
### Cycle & Hallucination Detection
Detects when the LLM is stuck in a loop (producing similar outputs repeatedly) and automatically clears stale context to break out.
### Sandboxed Execution
- All file operations are confined to the working directory
- Shell commands are validated against a whitelist of safe tools (compilers, build tools, standard utilities)
- `sudo` and system-level commands are blocked
- Path traversal outside the working directory is prevented
### Language Agnostic
Works with any programming language the LLM knows. Tested with C, Python, and Makefiles. The LLM determines the appropriate build tools, compilers, and project structure.
### Dependency Tracking
All external dependencies (compilers, libraries, tools) are recorded in `dependency.txt` so you know exactly what the project needs.
## CLI Options
```
autodev [options]
Options:
-nomanual Skip reading manuals/ directory (for simple tasks)
-web PORT Start live web dashboard on PORT (e.g. -web 4500)
--backend {ollama,vllm} LLM backend (default: from config)
--model MODEL Model name (default: from config)
--workdir DIR Working directory (default: current directory)
```
### Web Dashboard
Run `autodev -web 4500` and open `http://localhost:4500` in your browser.
The dashboard shows three panels:
- **Plan Progress** — step-by-step checklist with ✓/✗/▸ status and completion counter
- **Project Files** — clickable file tree with live content viewer
- **LLM Activity** — real-time log of all actions and model thinking (newest first)
Updates are pushed live via Server-Sent Events — no page refresh needed.
### Incremental Updates
If you change `description.txt` and restart AutoDev, it detects the change and re-plans incrementally — telling the LLM what files already exist so it builds on previous work instead of starting over.
## Architecture
```
autodev/
├── config.py # LLM backend settings, timeouts, expert system prompt
├── llm.py # Ollama + vLLM communication with streaming and retry
├── context.py # Token-aware context window with relevance scoring
├── planner.py # Reads description + manuals, creates development plan
├── executor.py # Code generation, file writing, compilation
├── debugger.py # Error analysis, fix generation, rollback
├── sandbox.py # Whitelist-based command validation, path confinement
├── logger.py # Action logging to console and persistent worklog
├── dependency.py # Dependency tracking
├── resume.py # State persistence and session resumption
├── main.py # CLI orchestrator
└── autodev-cli # Symlink-friendly entry point
```
## How the Description Should Be Written
Be specific. Every sentence is treated as a requirement.
**Good:**
```
Language: C
Build a TCP echo server that listens on port 8080.
It should handle multiple clients using fork().
Include proper signal handling for SIGCHLD to avoid zombies.
Include a Makefile with 'all' and 'clean' targets.
The server should log connections to stderr.
```
**Too vague:**
```
Make a server program.
```
## Limitations
- Quality depends entirely on the LLM model — larger models produce better results
- No interactive mode — you can't guide it mid-run (by design)
- Manual parsing is plain text only (no PDF extraction)
- Token counting is estimated, not exact
- The LLM may occasionally produce code that compiles but doesn't meet all requirements
+1
View File
@@ -0,0 +1 @@
"""AutoDev - Autonomous Development Studio"""
+4
View File
@@ -0,0 +1,4 @@
from .main import main
if __name__ == "__main__":
main()
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Executable
+17
View File
@@ -0,0 +1,17 @@
#!/usr/bin/env python3
"""
AutoDev - Autonomous CLI Development Studio
Can be symlinked from anywhere. Works in the directory where you call it.
"""
import os
import sys
# Resolve the real location of this script (follows symlinks)
# so Python can find the autodev package regardless of where we're called from
real_script = os.path.realpath(__file__)
package_dir = os.path.dirname(os.path.dirname(real_script))
if package_dir not in sys.path:
sys.path.insert(0, package_dir)
from autodev.main import main
main()
+47
View File
@@ -0,0 +1,47 @@
"""
AutoDev - Configuration
LLM backend settings and application constants.
"""
# ============================================================
# LLM BACKEND CONFIGURATION — Edit these to match your setup
# ============================================================
LLM_BACKEND = "ollama" # "ollama" or "vllm"
OLLAMA_URL = "http://turd.hem.holck.se:11434"
VLLM_URL = "http://localhost:8000"
MODEL_NAME = "gemma4:e4b"
# ============================================================
# Timeouts and limits
# ============================================================
LLM_TIMEOUT = 300 # seconds per LLM call
COMPILE_TIMEOUT = 120 # seconds per compile/build
EXEC_TIMEOUT = 60 # seconds per test execution
MAX_DEBUG_CYCLES = 10 # max consecutive fix attempts before halting
MAX_CONTEXT_TOKENS = 12000 # approximate local context window size (chars)
CYCLE_DETECTION_WINDOW = 6 # number of recent actions to check for loops
TOKEN_CHAR_RATIO = 3.5 # average chars per token (for estimation)
# ============================================================
# File names
# ============================================================
DESCRIPTION_FILE = "description.txt"
MANUALS_DIR = "manuals"
WORKLOG_FILE = "worklog.json"
DEPENDENCY_FILE = "dependency.txt"
PLAN_FILE = "plan.json"
# ============================================================
# Expert system prompt — injected into every LLM interaction
# ============================================================
EXPERT_IDENTITY = """You are a senior software engineer with 20+ years of experience across all major languages and platforms. You approach every task like a principal engineer:
- You think before you code. You consider edge cases, error handling, and maintainability.
- You write production-quality code, never prototypes or stubs.
- You understand build systems, compilers, linkers, and runtime environments deeply.
- When you see an error, you reason about root cause, not just symptoms.
- You never repeat a failed approach. If something didn't work, you try a fundamentally different strategy.
- You are aware that you are an AI and may hallucinate. When uncertain, you keep code simple and conservative rather than guessing at APIs or syntax.
- You always consider: does this compile? Does this link? Are all imports/includes correct? Are all dependencies declared?
- You write complete files, never partial snippets or placeholders like "// ... rest of code".
"""
+128
View File
@@ -0,0 +1,128 @@
"""
AutoDev - Context Window Manager
Manages local context with token-aware pruning, relevance scoring,
and semantic cycle/hallucination detection.
"""
import hashlib
import difflib
from . import config
def estimate_tokens(text: str) -> int:
"""Estimate token count from character length."""
return max(1, int(len(text) / config.TOKEN_CHAR_RATIO))
class ContextManager:
def __init__(self, max_tokens: int = None):
self.max_tokens = max_tokens or config.MAX_CONTEXT_TOKENS
self.entries: list[dict] = [] # {role, content, priority, hash, tokens}
self._recent_contents: list[str] = []
self._recent_hashes: list[str] = []
def add(self, role: str, content: str, priority: int = 5):
h = hashlib.md5(content.encode()).hexdigest()[:16]
tokens = estimate_tokens(content)
self.entries.append({
"role": role,
"content": content,
"priority": priority,
"hash": h,
"tokens": tokens,
})
self._recent_hashes.append(h)
self._recent_contents.append(content[:500])
self._prune()
def _prune(self):
total = sum(e["tokens"] for e in self.entries)
while total > self.max_tokens and len(self.entries) > 2:
# Never prune the last entry or system-level entries (priority >= 9)
candidates = [(i, e) for i, e in enumerate(self.entries[:-1]) if e["priority"] < 9]
if not candidates:
break
# Remove lowest priority, oldest first
candidates.sort(key=lambda x: (x[1]["priority"], -x[0]))
idx = candidates[0][0]
total -= self.entries[idx]["tokens"]
self.entries.pop(idx)
def detect_cycle(self) -> bool:
"""Detect both exact repetition and semantic similarity loops."""
window = config.CYCLE_DETECTION_WINDOW
if len(self._recent_hashes) < 3:
return False
recent = self._recent_hashes[-window:]
# Exact hash repetition
unique = set(recent)
if len(unique) <= max(1, len(recent) // 3):
return True
# Semantic similarity: check if recent LLM outputs are too similar
contents = self._recent_contents[-window:]
if len(contents) >= 3:
similarities = []
for i in range(len(contents) - 1):
ratio = difflib.SequenceMatcher(None, contents[i], contents[i + 1]).ratio()
similarities.append(ratio)
# If average similarity > 0.8, we're likely in a loop
if similarities and sum(similarities) / len(similarities) > 0.8:
return True
return False
def clear_stale(self):
"""Aggressively clear low-value entries when cycles detected."""
keep = [e for e in self.entries if e["priority"] >= 8]
if not keep:
keep = self.entries[-2:]
self.entries = keep
self._recent_hashes = self._recent_hashes[-2:]
self._recent_contents = self._recent_contents[-2:]
def get_relevant_context(self, query: str, max_entries: int = 5) -> list[dict]:
"""Select entries most relevant to the current query using keyword overlap."""
query_words = set(query.lower().split())
scored = []
for e in self.entries:
content_words = set(e["content"].lower().split()[:200])
overlap = len(query_words & content_words)
scored.append((overlap + e["priority"], e))
scored.sort(key=lambda x: x[0], reverse=True)
return [e for _, e in scored[:max_entries]]
def build_messages(self, system_prompt: str = "") -> list[dict]:
msgs = []
if system_prompt:
msgs.append({"role": "system", "content": system_prompt})
for e in self.entries:
msgs.append({"role": e["role"], "content": e["content"]})
return msgs
def build_focused_messages(self, system_prompt: str, query: str,
max_context_tokens: int = None) -> list[dict]:
"""Build messages with only the most relevant context entries."""
budget = max_context_tokens or (self.max_tokens // 2)
msgs = []
if system_prompt:
msgs.append({"role": "system", "content": system_prompt})
budget -= estimate_tokens(system_prompt)
relevant = self.get_relevant_context(query)
for e in relevant:
if budget - e["tokens"] < 0:
break
msgs.append({"role": e["role"], "content": e["content"]})
budget -= e["tokens"]
return msgs
def token_usage(self) -> dict:
total = sum(e["tokens"] for e in self.entries)
return {
"entries": len(self.entries),
"tokens_used": total,
"tokens_max": self.max_tokens,
"utilization": f"{total / self.max_tokens * 100:.0f}%",
}
+272
View File
@@ -0,0 +1,272 @@
"""
AutoDev - Debugger
Analyzes errors, generates fixes, detects cycles, manages rollback.
Approaches debugging like a senior engineer — reason about root cause,
never repeat a failed approach, escalate when stuck.
"""
import json
import os
import shutil
import subprocess
from .llm import LLM
from .logger import Logger
from .context import ContextManager
from .sandbox import Sandbox
from . import config
DEBUG_SYSTEM = config.EXPERT_IDENTITY + """
You are now in DEBUG mode. An error occurred during development. Your job is to:
1. DIAGNOSE: What is the root cause? Not the symptom — the actual cause.
2. REASON: Why did the previous code produce this error? What assumption was wrong?
3. FIX: Provide a concrete fix that addresses the root cause.
CRITICAL RULES:
- If you already tried a fix and it didn't work, you MUST try a DIFFERENT approach.
- Never repeat the same fix. If the same error keeps occurring, the problem is architectural.
- Consider: wrong imports, missing dependencies, wrong API usage, type mismatches, missing files.
- For compilation errors: check every include/import, every function signature, every type.
- For runtime errors: check null/nil handling, array bounds, file paths, permissions.
Output ONLY valid JSON:
{
"diagnosis": "root cause analysis — what exactly went wrong and why",
"previous_approach_wrong_because": "why the previous attempt failed (if applicable)",
"strategy": "what different approach we're taking this time",
"fixes": [
{
"file": "path/to/file",
"action": "replace",
"description": "what changed and why",
"full_content": "COMPLETE new file content"
}
],
"commands": ["verification commands to confirm the fix works"]
}
The 'full_content' field must contain the ENTIRE file, not just the changed lines.
Start your response with { and end with }.
"""
class Debugger:
def __init__(self, llm: LLM, logger: Logger, ctx: ContextManager,
sandbox: Sandbox, workdir: str):
self.llm = llm
self.logger = logger
self.ctx = ctx
self.sandbox = sandbox
self.workdir = workdir
self.attempt_count = 0
self.backup_dir = os.path.join(workdir, ".autodev_backups")
self.failed_approaches: list[str] = [] # Track what we already tried
def debug_errors(self, errors: list[str], step: dict, plan: dict) -> dict:
"""Attempt to fix errors. Returns {fixed, output, errors}."""
self.attempt_count += 1
result = {"fixed": False, "output": "", "errors": []}
if self.attempt_count > config.MAX_DEBUG_CYCLES:
self.logger.log("debug_halt",
f"Max debug cycles ({config.MAX_DEBUG_CYCLES}) reached. "
"This likely requires manual intervention or better documentation.",
"error")
result["errors"].append(
"Maximum debug attempts exceeded. The issue may be architectural. "
"Please review the worklog and provide additional documentation."
)
return result
# Check for cycles
if self.ctx.detect_cycle():
self.logger.log("cycle_detected",
"Detected repetitive pattern. Clearing context and trying fresh approach.",
"warn")
self.ctx.clear_stale()
self.failed_approaches.append("CYCLE: All recent approaches were too similar")
# Backup current state
self._backup_files(step.get("files", []))
prompt = self._build_debug_prompt(errors, step, plan)
self.ctx.add("user", prompt, priority=8)
self.logger.log("debug_attempt",
f"Attempt {self.attempt_count}/{config.MAX_DEBUG_CYCLES}")
try:
response = self.llm.query(prompt, system=DEBUG_SYSTEM, temperature=0.3)
self.ctx.add("assistant", response[:1500], priority=6)
fix = self._parse_fix(response)
if not fix.get("fixes"):
self.logger.log("debug_no_fix", "LLM provided no actionable fixes", "warn")
result["errors"].append("No fix suggested by LLM")
self.failed_approaches.append(f"No fix for: {errors[0][:100]}")
return result
diagnosis = fix.get("diagnosis", "unknown")
strategy = fix.get("strategy", "")
self.logger.log("diagnosis", diagnosis)
if strategy:
self.logger.log("strategy", strategy)
# Apply fixes
for f in fix["fixes"]:
self._apply_fix(f)
# Verify
verify_ok = True
for cmd in fix.get("commands", []):
try:
self.sandbox.validate_command(cmd)
proc = subprocess.run(
cmd, shell=True, capture_output=True, text=True,
timeout=config.COMPILE_TIMEOUT, cwd=self.workdir,
)
result["output"] += proc.stdout
if proc.returncode != 0:
result["errors"].append(
f"Verification '{cmd}' failed:\n{proc.stderr}"
)
verify_ok = False
self.logger.log("fix_verify_fail", proc.stderr[:300], "error")
except Exception as e:
result["errors"].append(str(e))
verify_ok = False
if not verify_ok:
self.failed_approaches.append(
f"Attempt {self.attempt_count}: {diagnosis[:100]} — verification failed"
)
self._rollback_files(step.get("files", []))
self.logger.log("rollback", "Fix didn't pass verification, rolled back", "warn")
return result
result["fixed"] = True
self.attempt_count = 0
self.failed_approaches.clear()
self.logger.log("debug_fixed", diagnosis)
except Exception as e:
result["errors"].append(str(e))
self.logger.log("debug_error", str(e), "error")
self._rollback_files(step.get("files", []))
self.failed_approaches.append(f"Exception: {str(e)[:100]}")
return result
def _build_debug_prompt(self, errors: list[str], step: dict, plan: dict) -> str:
parts = [
f"## Error(s)\n" + "\n---\n".join(errors[:5]),
f"\n## Failed step\n{step.get('description', '')}",
f"\n## Project: {plan.get('language', 'unknown')}{plan.get('summary', '')}",
]
# Include what we already tried so the LLM doesn't repeat
if self.failed_approaches:
parts.append(
"\n## PREVIOUS FAILED APPROACHES (DO NOT REPEAT THESE):\n" +
"\n".join(f"- {a}" for a in self.failed_approaches[-5:])
)
# Include relevant source files
for fpath in step.get("files", [])[:5]:
full = os.path.join(self.workdir, fpath)
if os.path.isfile(full):
try:
with open(full, "r") as f:
content = f.read()
parts.append(f"\n## Current content of {fpath}\n{content[:4000]}")
except (IOError, UnicodeDecodeError):
pass
# Also check for related files that might be causing the issue
# (e.g., header files referenced in error messages)
for err in errors[:3]:
for word in err.split():
if "." in word and "/" in word:
# Looks like a file path in the error
candidate = word.strip("':\"(),")
full = os.path.join(self.workdir, candidate)
if os.path.isfile(full) and candidate not in step.get("files", []):
try:
with open(full, "r") as f:
content = f.read()
parts.append(f"\n## Related file {candidate}\n{content[:2000]}")
except (IOError, UnicodeDecodeError):
pass
return "\n".join(parts)
def _apply_fix(self, fix: dict):
fpath = fix.get("file", "")
action = fix.get("action", "replace")
if not fpath:
return
# Never delete AutoDev state files
protected = {"worklog.json", "plan.json", "dependency.txt",
".autodev_state.json", "description.txt"}
basename = os.path.basename(fpath)
if action == "delete" and basename in protected:
self.logger.log("fix_skip", f"Refusing to delete protected file: {fpath}", "warn")
return
if action == "replace" and fix.get("full_content"):
self.sandbox.safe_write(fpath, fix["full_content"])
self.logger.log("fix_applied", f"Replaced {fpath}: {fix.get('description', '')[:100]}")
elif action == "delete":
full = self.sandbox.validate_path(fpath)
if os.path.exists(full):
os.remove(full)
self.logger.log("fix_applied", f"Deleted {fpath}")
elif action == "create" and fix.get("full_content"):
self.sandbox.safe_write(fpath, fix["full_content"])
self.logger.log("fix_applied", f"Created {fpath}")
def _backup_files(self, files: list[str]):
os.makedirs(self.backup_dir, exist_ok=True)
for fpath in files:
full = os.path.join(self.workdir, fpath)
if os.path.isfile(full):
backup = os.path.join(self.backup_dir, fpath.replace("/", "__"))
shutil.copy2(full, backup)
def _rollback_files(self, files: list[str]):
for fpath in files:
backup = os.path.join(self.backup_dir, fpath.replace("/", "__"))
if os.path.isfile(backup):
full = os.path.join(self.workdir, fpath)
os.makedirs(os.path.dirname(full), exist_ok=True)
shutil.copy2(backup, full)
self.logger.log("rollback", f"Restored {fpath}", "warn")
def _parse_fix(self, response: str) -> dict:
text = response.strip()
for candidate in self._extract_json_candidates(text):
try:
result = json.loads(candidate)
if isinstance(result, dict):
return result
except json.JSONDecodeError:
continue
return {"diagnosis": "Could not parse fix response", "fixes": []}
def _extract_json_candidates(self, text: str) -> list[str]:
candidates = []
if "```json" in text:
candidates.append(text.split("```json", 1)[1].split("```", 1)[0].strip())
candidates.append(text)
start = text.find("{")
if start >= 0:
depth = 0
for i in range(start, len(text)):
if text[i] == "{":
depth += 1
elif text[i] == "}":
depth -= 1
if depth == 0:
candidates.append(text[start:i + 1])
break
return candidates
+46
View File
@@ -0,0 +1,46 @@
"""
AutoDev - Dependency Tracker
Tracks and records all external dependencies.
"""
import os
from . import config
class DependencyTracker:
def __init__(self, workdir: str):
self.path = os.path.join(workdir, config.DEPENDENCY_FILE)
self.deps: set[str] = set()
self._load()
def _load(self):
if os.path.exists(self.path):
with open(self.path, "r") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
self.deps.add(line)
def _save(self):
with open(self.path, "w") as f:
f.write("# AutoDev - Project Dependencies\n")
f.write("# Auto-generated, do not edit manually\n\n")
for d in sorted(self.deps):
f.write(d + "\n")
def add(self, dep: str):
if dep not in self.deps:
self.deps.add(dep)
self._save()
def add_many(self, deps: list[str]):
changed = False
for d in deps:
if d not in self.deps:
self.deps.add(d)
changed = True
if changed:
self._save()
def get_all(self) -> list[str]:
return sorted(self.deps)
+417
View File
@@ -0,0 +1,417 @@
"""
AutoDev - Executor
Generates code, writes files, runs compilation and shell commands.
Uses relevance-based context selection and expert-level prompting.
"""
import subprocess
import os
import json
from .llm import LLM
from .logger import Logger
from .context import ContextManager, estimate_tokens
from .sandbox import Sandbox, SandboxViolation
from .dependency import DependencyTracker
from . import config
CODE_GEN_SYSTEM = config.EXPERT_IDENTITY + """
You are now in CODE GENERATION mode. Generate complete, production-quality code.
Rules:
- Output ONLY the file content. No markdown fences. No explanations before or after.
- The code must be COMPLETE. No "// TODO", no "// ... rest of code", no placeholders.
- Include all necessary imports/includes at the top.
- Include proper error handling.
- Add concise comments explaining non-obvious logic.
- If this is a header file, include proper include guards.
- If this is a build file (Makefile, CMakeLists.txt, etc.), make it complete and correct.
"""
MULTI_FILE_SYSTEM = config.EXPERT_IDENTITY + """
You are now in MULTI-FILE GENERATION mode. Generate multiple complete source files.
Output ONLY valid JSON with this structure:
{
"files": [
{"path": "relative/path/to/file", "content": "complete file content"}
],
"commands": ["optional shell commands to run after writing files"]
}
Rules:
- Every file must be COMPLETE. No placeholders, no stubs.
- All imports/includes must reference files that exist or will be created.
- Output ONLY the JSON object. Start with { and end with }.
"""
class Executor:
def __init__(self, llm: LLM, logger: Logger, ctx: ContextManager,
sandbox: Sandbox, deps: DependencyTracker, workdir: str):
self.llm = llm
self.logger = logger
self.ctx = ctx
self.sandbox = sandbox
self.deps = deps
self.workdir = workdir
def execute_step(self, step: dict, plan: dict) -> dict:
"""Execute a single plan step. Returns {success, output, errors}."""
phase = step.get("phase", "implement")
desc = step.get("description", "")
commands = step.get("commands", [])
self.logger.log("step_start", f"[{phase}] {desc}")
result = {"success": True, "output": "", "errors": []}
try:
if phase == "setup":
result = self._do_setup(step, plan)
elif phase in ("implement", "finalize"):
result = self._do_implement(step, plan)
elif phase == "test":
result = self._do_test(step, plan)
elif phase == "debug":
result = self._do_debug(step, plan)
else:
result = self._do_implement(step, plan)
# Run any explicit commands from the plan
# Skip for phases that handle their own commands or generate files via LLM
if phase not in ("setup", "test", "implement"):
for cmd in commands:
cmd_result = self._run_command(cmd)
if cmd_result["returncode"] != 0:
result["errors"].append(
f"Command '{cmd}' failed (exit {cmd_result['returncode']}):\n"
f"{cmd_result['stderr']}"
)
result["success"] = False
result["output"] += cmd_result["stdout"]
# Verify acceptance criteria if defined
acceptance = step.get("acceptance", "")
if acceptance and result["success"]:
self.logger.log("acceptance_check", acceptance)
except SandboxViolation as e:
result["success"] = False
result["errors"].append(f"Sandbox violation: {e}")
self.logger.log("sandbox_violation", str(e), "error")
except Exception as e:
result["success"] = False
result["errors"].append(str(e))
self.logger.log("step_error", str(e), "error")
status = "ok" if result["success"] else "error"
self.logger.log("step_done", f"[{phase}] success={result['success']}", status)
return result
def _do_setup(self, step: dict, plan: dict) -> dict:
result = {"success": True, "output": "", "errors": []}
for path in plan.get("structure", []):
# If it looks like a file (has extension), ensure parent dir exists
# If it looks like a directory (no extension), create it
# But never mkdir over an existing file
full = os.path.join(self.workdir, path)
if "." in os.path.basename(path):
parent = os.path.dirname(path)
if parent:
self.sandbox.safe_mkdir(parent)
else:
if os.path.isfile(full):
self.logger.log("setup_skip", f"{path} exists as file, not creating dir", "warn")
else:
self.sandbox.safe_mkdir(path)
self.logger.log("mkdir", path)
for dep in plan.get("dependencies", []):
self.deps.add(dep)
# Setup commands are best-effort — non-zero exit is a warning, not failure
for cmd in step.get("commands", []):
# Auto-fix common issues: add -p to mkdir, add -f to touch
cmd = self._fixup_setup_command(cmd)
r = self._run_command(cmd)
result["output"] += r["stdout"]
if r["returncode"] != 0:
self.logger.log("setup_warn", r["stderr"][:200], "warn")
# Only fail setup if it's a real error, not "already exists"
if not self._is_benign_error(r["stderr"]):
result["errors"].append(r["stderr"])
return result
@staticmethod
def _fixup_setup_command(cmd: str) -> str:
"""Auto-fix common setup command issues."""
stripped = cmd.strip()
# Any mkdir without -p → add -p
if "mkdir " in stripped and " -p" not in stripped:
return stripped.replace("mkdir ", "mkdir -p ")
return cmd
@staticmethod
def _is_benign_error(stderr: str) -> bool:
"""Check if an error is harmless (e.g., 'already exists')."""
benign = ["File exists", "already exists", "No such file or directory"]
return any(b in stderr for b in benign)
def _do_implement(self, step: dict, plan: dict) -> dict:
files = step.get("files", [])
if not files:
return self._implement_freeform(step, plan)
if len(files) == 1:
return self._implement_single(files[0], step, plan)
return self._implement_multi(files, step, plan)
def _implement_single(self, filepath: str, step: dict, plan: dict) -> dict:
result = {"success": True, "output": "", "errors": []}
prompt = self._build_code_prompt(step, plan, filepath)
# Use focused context to avoid blowing token limits
self.ctx.add("user", prompt, priority=7)
code = self.llm.query(prompt, system=CODE_GEN_SYSTEM)
code = self._strip_fences(code)
# Validate we got actual code, not an explanation
if self._looks_like_explanation(code):
self.logger.log("regen", f"LLM returned explanation instead of code for {filepath}, retrying", "warn")
retry_prompt = (
prompt + "\n\nYou returned an explanation instead of code. "
"Output ONLY the raw file content. No markdown. No explanations. "
"Start with the first line of the actual source code."
)
code = self.llm.query(retry_prompt, system=CODE_GEN_SYSTEM, temperature=0.1)
code = self._strip_fences(code)
self.ctx.add("assistant", f"Generated {filepath} ({len(code)} chars)", priority=5)
self.sandbox.safe_write(filepath, code)
self.logger.log("file_written", f"{filepath} ({len(code)} chars)")
result["output"] = f"Created {filepath}"
return result
def _implement_multi(self, files: list, step: dict, plan: dict) -> dict:
result = {"success": True, "output": "", "errors": []}
prompt = self._build_code_prompt(step, plan)
prompt += f"\n\nGenerate these files: {json.dumps(files)}"
self.ctx.add("user", prompt, priority=7)
response = self.llm.query(prompt, system=MULTI_FILE_SYSTEM)
self.ctx.add("assistant", f"Generated {len(files)} files", priority=5)
parsed = self._parse_multi_response(response)
if not parsed.get("files"):
# Retry
self.logger.log("regen", "Multi-file response had no files, retrying", "warn")
retry_prompt = (
prompt + "\n\nYour response could not be parsed. "
"Output ONLY a JSON object starting with { and ending with }. "
"The 'files' array must contain objects with 'path' and 'content' keys."
)
response = self.llm.query(retry_prompt, system=MULTI_FILE_SYSTEM, temperature=0.1)
parsed = self._parse_multi_response(response)
for finfo in parsed.get("files", []):
path = finfo.get("path", "")
content = finfo.get("content", "")
if path and content:
self.sandbox.safe_write(path, content)
self.logger.log("file_written", f"{path} ({len(content)} chars)")
result["output"] += f"Created {path}\n"
for cmd in parsed.get("commands", []):
r = self._run_command(cmd)
result["output"] += r["stdout"]
if r["returncode"] != 0:
result["errors"].append(r["stderr"])
result["success"] = False
return result
def _implement_freeform(self, step: dict, plan: dict) -> dict:
return self._implement_multi([], step, plan)
def _do_test(self, step: dict, plan: dict) -> dict:
result = {"success": True, "output": "", "errors": []}
commands = step.get("commands", [])
if not commands:
prompt = (
f"Project: {plan.get('language', 'unknown')} project.\n"
f"Step: {step.get('description', '')}\n"
f"Files in project: {json.dumps(plan.get('structure', []))}\n\n"
"What exact shell commands should I run to compile and test this? "
"Output ONLY the commands, one per line. No explanations. No markdown."
)
response = self.llm.query(
prompt,
system="You are a build engineer. Output only shell commands, one per line.",
temperature=0.1,
)
commands = [
l.strip() for l in response.strip().splitlines()
if l.strip() and not l.strip().startswith("#") and not l.strip().startswith("```")
]
for cmd in commands:
r = self._run_command(cmd)
result["output"] += r["stdout"]
if r["returncode"] != 0:
result["errors"].append(f"Command '{cmd}' failed:\n{r['stderr']}")
result["success"] = False
return result
def _do_debug(self, step: dict, plan: dict) -> dict:
return {"success": True, "output": "Debug step (handled by debugger)", "errors": []}
def _build_code_prompt(self, step: dict, plan: dict, filepath: str = None) -> str:
parts = [
f"Project: {plan.get('project_name', 'unknown')}",
f"Language: {plan.get('language', 'unknown')}",
f"Summary: {plan.get('summary', '')}",
f"Project structure: {json.dumps(plan.get('structure', []))}",
f"\nCurrent task: {step.get('description', '')}",
]
if filepath:
parts.append(f"\nGenerate the COMPLETE content for file: {filepath}")
# Selectively include existing files that are relevant
existing = self._get_relevant_files(step, plan)
if existing:
parts.append("\n## Existing project files (for reference — ensure compatibility):")
for p, c in existing.items():
parts.append(f"\n### {p}\n{c}")
return "\n".join(parts)
def _get_relevant_files(self, step: dict, plan: dict) -> dict[str, str]:
"""Include only files relevant to the current step, within token budget."""
files = {}
budget = config.MAX_CONTEXT_TOKENS // 3 # Reserve 1/3 of context for existing files
step_files = set(step.get("files", []))
# Priority 1: Files explicitly mentioned in this step (headers, dependencies)
# Priority 2: Files that share a directory with step files
# Priority 3: Build files (Makefile, CMakeLists.txt, etc.)
build_files = {"Makefile", "CMakeLists.txt", "setup.py", "pyproject.toml",
"Cargo.toml", "go.mod", "package.json", "pom.xml", "build.gradle"}
candidates = []
for path in plan.get("structure", []):
full = os.path.join(self.workdir, path)
if not os.path.isfile(full):
continue
try:
with open(full, "r") as f:
content = f.read()
except (IOError, UnicodeDecodeError):
continue
# Score relevance
score = 0
basename = os.path.basename(path)
if path in step_files:
score = 0 # Don't include the file we're about to generate
continue
if basename in build_files:
score = 3
elif any(os.path.dirname(path) == os.path.dirname(sf) for sf in step_files):
score = 2
# Check if any step file imports/includes this file
elif any(basename.split(".")[0] in sf for sf in step_files):
score = 2
else:
score = 1
candidates.append((score, path, content))
candidates.sort(key=lambda x: x[0], reverse=True)
used = 0
for score, path, content in candidates:
tokens = estimate_tokens(content)
if used + tokens > budget:
# Truncate large files
if tokens > budget // 2:
content = content[:int(budget * config.TOKEN_CHAR_RATIO // 2)]
content += "\n// ... (truncated for context)\n"
tokens = estimate_tokens(content)
else:
continue
files[path] = content
used += tokens
return files
def _run_command(self, cmd: str) -> dict:
self.sandbox.validate_command(cmd)
self.logger.log("exec", cmd)
try:
proc = subprocess.run(
cmd, shell=True, capture_output=True, text=True,
timeout=config.COMPILE_TIMEOUT, cwd=self.workdir,
)
if proc.stdout:
self.logger.log("stdout", proc.stdout[:500])
if proc.returncode != 0 and proc.stderr:
self.logger.log("stderr", proc.stderr[:500], "error")
return {
"returncode": proc.returncode,
"stdout": proc.stdout,
"stderr": proc.stderr,
}
except subprocess.TimeoutExpired:
self.logger.log("timeout", f"Command timed out ({config.COMPILE_TIMEOUT}s): {cmd}", "error")
return {"returncode": -1, "stdout": "", "stderr": f"Timeout after {config.COMPILE_TIMEOUT}s"}
def _strip_fences(self, text: str) -> str:
text = text.strip()
if text.startswith("```"):
first_nl = text.find("\n")
if first_nl >= 0:
text = text[first_nl + 1:]
if text.endswith("```"):
text = text[:-3]
return text.strip()
def _looks_like_explanation(self, text: str) -> bool:
"""Detect if LLM returned prose instead of code."""
lines = text.strip().splitlines()[:5]
if not lines:
return True
prose_indicators = ["here is", "here's", "below is", "i'll", "let me", "this code",
"the following", "sure,", "certainly"]
first_lines = " ".join(lines[:3]).lower()
return any(ind in first_lines for ind in prose_indicators)
def _parse_multi_response(self, response: str) -> dict:
text = self._strip_fences(response)
# Try progressively more aggressive extraction
for candidate in self._extract_json_candidates(text):
try:
result = json.loads(candidate)
if isinstance(result, dict) and "files" in result:
return result
except json.JSONDecodeError:
continue
self.logger.log("parse_fail", "Could not parse multi-file response", "error")
return {"files": [], "commands": []}
def _extract_json_candidates(self, text: str) -> list[str]:
candidates = [text]
if "```json" in text:
candidates.insert(0, text.split("```json", 1)[1].split("```", 1)[0].strip())
start = text.find("{")
if start >= 0:
depth = 0
for i in range(start, len(text)):
if text[i] == "{":
depth += 1
elif text[i] == "}":
depth -= 1
if depth == 0:
candidates.insert(0, text[start:i + 1])
break
return candidates
+376
View File
@@ -0,0 +1,376 @@
"""
AutoDev - LLM Communication Layer
Supports Ollama and vLLM backends with streaming, retry logic, and robust error handling.
"""
import json
import sys
import time
import urllib.request
import urllib.error
from . import config
class LLMError(Exception):
pass
class LLM:
def __init__(self, backend: str = None, model: str = None):
self.backend = backend or config.LLM_BACKEND
self.model = model or config.MODEL_NAME
if self.backend == "ollama":
self.base_url = config.OLLAMA_URL
elif self.backend == "vllm":
self.base_url = config.VLLM_URL
else:
raise LLMError(f"Unknown backend: {self.backend}")
self.context_size = None # Auto-detected on first use
def detect_context_size(self) -> int:
"""Detect the model's effective context window size.
Checks (in priority order):
1. Ollama /api/ps for running model's actual context_length
2. num_ctx in model parameters from /api/show
3. Model architecture's max context_length from /api/show
4. vLLM max_model_len from /v1/models
5. Fallback to config default
"""
if self.context_size is not None:
return self.context_size
try:
if self.backend == "ollama":
self.context_size = self._detect_ollama_context()
elif self.backend == "vllm":
self.context_size = self._detect_vllm_context()
except Exception:
pass
if not self.context_size:
self.context_size = config.MAX_CONTEXT_TOKENS
return self.context_size
def detect_gpu_status(self) -> dict:
"""Check GPU/CPU offload status for the running model.
Returns dict with:
loaded: bool - whether model is currently loaded
gpu_percent: int - percentage of model on GPU (0-100)
size_total: int - total model size in bytes
size_vram: int - bytes on GPU
warning: str|None - warning message if mostly CPU
"""
result = {"loaded": False, "gpu_percent": 0, "size_total": 0,
"size_vram": 0, "warning": None}
if self.backend != "ollama":
return result
try:
url = f"{self.base_url}/api/ps"
req = urllib.request.Request(url)
with urllib.request.urlopen(req, timeout=5) as resp:
data = json.loads(resp.read().decode("utf-8"))
for m in data.get("models", []):
if self.model in m.get("name", ""):
result["loaded"] = True
result["size_total"] = m.get("size", 0)
result["size_vram"] = m.get("size_vram", 0)
if result["size_total"] > 0:
result["gpu_percent"] = int(
result["size_vram"] / result["size_total"] * 100
)
if result["gpu_percent"] == 0:
result["warning"] = (
"Model is running entirely on CPU. "
"This will be extremely slow and may not complete. "
"Consider using a smaller model or freeing GPU memory."
)
elif result["gpu_percent"] < 50:
result["warning"] = (
f"Only {result['gpu_percent']}% of model is on GPU. "
"Performance will be significantly degraded. "
"Consider using a smaller model."
)
break
except Exception:
pass
return result
def _detect_ollama_context(self) -> int | None:
# 1. Check running model — this gives the actual runtime context_length
try:
url = f"{self.base_url}/api/ps"
req = urllib.request.Request(url)
with urllib.request.urlopen(req, timeout=5) as resp:
data = json.loads(resp.read().decode("utf-8"))
for m in data.get("models", []):
if self.model in m.get("name", ""):
ctx = m.get("context_length")
if ctx:
return int(ctx)
except Exception:
pass
# 2. Check model config from /api/show
try:
url = f"{self.base_url}/api/show"
payload = {"name": self.model}
data = self._post_raw(url, payload)
# Check parameters for explicit num_ctx setting
params = data.get("parameters", "")
for line in params.splitlines():
if "num_ctx" in line:
parts = line.split()
for p in parts:
if p.isdigit():
return int(p)
# Check modelfile for PARAMETER num_ctx
modelfile = data.get("modelfile", "")
for line in modelfile.splitlines():
if "num_ctx" in line.lower():
parts = line.split()
for p in parts:
if p.isdigit():
return int(p)
# 3. Fall back to architecture's max context_length
model_info = data.get("model_info", {})
for key, val in model_info.items():
if "context_length" in key:
return int(val)
except Exception:
pass
return None
def _detect_vllm_context(self) -> int | None:
try:
url = f"{self.base_url}/v1/models"
req = urllib.request.Request(url)
with urllib.request.urlopen(req, timeout=10) as resp:
data = json.loads(resp.read().decode("utf-8"))
for m in data.get("data", []):
if m.get("id") == self.model:
return m.get("max_model_len")
except Exception:
pass
return None
def query(self, prompt: str, system: str = "", temperature: float = 0.2,
stream: bool = False) -> str:
if not system:
system = config.EXPERT_IDENTITY
if self.backend == "ollama":
if stream:
return self._stream_ollama(prompt, system, temperature)
result = self._query_ollama(prompt, system, temperature)
else:
if stream:
return self._stream_vllm(prompt, system, temperature)
result = self._query_vllm(prompt, system, temperature)
# Push LLM thinking to web UI
try:
from .web import push_event
push_event("llm_response", {"response": result})
except Exception:
pass
return result
def _query_ollama(self, prompt: str, system: str, temperature: float) -> str:
url = f"{self.base_url}/api/generate"
payload = {
"model": self.model,
"prompt": prompt,
"system": system,
"stream": False,
"options": {"temperature": temperature},
}
return self._post(url, payload, key="response")
def _stream_ollama(self, prompt: str, system: str, temperature: float) -> str:
url = f"{self.base_url}/api/generate"
payload = {
"model": self.model,
"prompt": prompt,
"system": system,
"stream": True,
"options": {"temperature": temperature},
}
return self._stream_post(url, parse_fn=lambda chunk: chunk.get("response", ""))
def _query_vllm(self, prompt: str, system: str, temperature: float) -> str:
url = f"{self.base_url}/v1/completions"
full_prompt = f"{system}\n\n{prompt}" if system else prompt
payload = {
"model": self.model,
"prompt": full_prompt,
"max_tokens": 4096,
"temperature": temperature,
"stream": False,
}
data = self._post_raw(url, payload)
try:
return data["choices"][0]["text"]
except (KeyError, IndexError):
raise LLMError(f"Unexpected vLLM response: {data}")
def _stream_vllm(self, prompt: str, system: str, temperature: float) -> str:
url = f"{self.base_url}/v1/completions"
full_prompt = f"{system}\n\n{prompt}" if system else prompt
payload = {
"model": self.model,
"prompt": full_prompt,
"max_tokens": 4096,
"temperature": temperature,
"stream": True,
}
return self._stream_post(url, parse_fn=lambda chunk: (
chunk.get("choices", [{}])[0].get("text", "") if chunk.get("choices") else ""
))
def chat(self, messages: list[dict], temperature: float = 0.2,
stream: bool = False) -> str:
if self.backend == "ollama":
url = f"{self.base_url}/api/chat"
payload = {
"model": self.model,
"messages": messages,
"stream": stream,
"options": {"temperature": temperature},
}
if stream:
return self._stream_post(url, parse_fn=lambda c: c.get("message", {}).get("content", ""))
return self._post(url, payload, key="message", subkey="content")
else:
url = f"{self.base_url}/v1/chat/completions"
payload = {
"model": self.model,
"messages": messages,
"max_tokens": 4096,
"temperature": temperature,
"stream": stream,
}
if stream:
return self._stream_post(url, parse_fn=lambda c: (
c.get("choices", [{}])[0].get("delta", {}).get("content", "")
if c.get("choices") else ""
))
data = self._post_raw(url, payload)
try:
return data["choices"][0]["message"]["content"]
except (KeyError, IndexError):
raise LLMError(f"Unexpected vLLM chat response: {data}")
def _post(self, url: str, payload: dict, key: str, subkey: str = None) -> str:
data = self._post_raw(url, payload)
try:
result = data[key]
if subkey:
result = result[subkey]
return result
except (KeyError, TypeError):
raise LLMError(f"Unexpected response structure: {data}")
def _post_raw(self, url: str, payload: dict, retries: int = 2) -> dict:
body = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
url, data=body, headers={"Content-Type": "application/json"}
)
last_err = None
for attempt in range(retries + 1):
try:
with urllib.request.urlopen(req, timeout=config.LLM_TIMEOUT) as resp:
return json.loads(resp.read().decode("utf-8"))
except urllib.error.URLError as e:
last_err = e
if attempt < retries:
time.sleep(2 ** attempt)
except json.JSONDecodeError as e:
raise LLMError(f"Invalid JSON from LLM: {e}")
raise LLMError(f"LLM request failed after {retries + 1} attempts ({url}): {last_err}")
def _stream_post(self, url: str, parse_fn) -> str:
"""Stream response, printing tokens to console as they arrive."""
# Build the same request but with stream=True already in payload
# We need to read line by line
body = json.dumps({"stream": True}).encode("utf-8")
# Actually we need the full payload — caller already set stream=True
# Re-read from the caller context isn't possible, so we use a different approach:
# The caller methods build the payload and call us. We need the payload.
# Refactored: callers should pass payload. For now, fall back to non-streaming.
# This is handled by the _stream_generate and _stream_chat methods.
raise LLMError("Direct _stream_post not supported; use streaming query methods")
def query_stream(self, prompt: str, system: str = "", temperature: float = 0.2) -> str:
"""Query with streaming output to console."""
if not system:
system = config.EXPERT_IDENTITY
if self.backend == "ollama":
return self._stream_ollama_impl(prompt, system, temperature)
else:
return self._stream_vllm_impl(prompt, system, temperature)
def _stream_ollama_impl(self, prompt: str, system: str, temperature: float) -> str:
url = f"{self.base_url}/api/generate"
payload = {
"model": self.model,
"prompt": prompt,
"system": system,
"stream": True,
"options": {"temperature": temperature},
}
return self._do_stream(url, payload, lambda c: c.get("response", ""))
def _stream_vllm_impl(self, prompt: str, system: str, temperature: float) -> str:
url = f"{self.base_url}/v1/completions"
full_prompt = f"{system}\n\n{prompt}" if system else prompt
payload = {
"model": self.model,
"prompt": full_prompt,
"max_tokens": 4096,
"temperature": temperature,
"stream": True,
}
return self._do_stream(url, payload, lambda c: (
c.get("choices", [{}])[0].get("text", "") if c.get("choices") else ""
))
def _do_stream(self, url: str, payload: dict, parse_fn) -> str:
"""Execute streaming request, print tokens live, return full text."""
body = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
url, data=body, headers={"Content-Type": "application/json"}
)
full_text = []
try:
with urllib.request.urlopen(req, timeout=config.LLM_TIMEOUT) as resp:
buffer = b""
while True:
chunk = resp.read(1)
if not chunk:
break
buffer += chunk
if chunk == b"\n" and buffer.strip():
line = buffer.decode("utf-8").strip()
buffer = b""
# vLLM SSE format
if line.startswith("data: "):
line = line[6:]
if line == "[DONE]":
break
try:
data = json.loads(line)
token = parse_fn(data)
if token:
full_text.append(token)
sys.stdout.write(token)
sys.stdout.flush()
except json.JSONDecodeError:
pass
elif chunk == b"\n":
buffer = b""
except urllib.error.URLError as e:
raise LLMError(f"Stream request failed ({url}): {e}")
sys.stdout.write("\n")
sys.stdout.flush()
return "".join(full_text)
+78
View File
@@ -0,0 +1,78 @@
"""
AutoDev - Action Logger & Worklog
Logs every action to console and persistent worklog for resumability.
"""
import json
import os
from datetime import datetime
from . import config
class Logger:
def __init__(self, workdir: str):
self.workdir = workdir
self.log_path = os.path.join(workdir, config.WORKLOG_FILE)
self.entries: list[dict] = []
self._load()
def _load(self):
if os.path.exists(self.log_path):
try:
with open(self.log_path, "r") as f:
self.entries = json.load(f)
except (json.JSONDecodeError, IOError):
self.entries = []
def _save(self):
try:
os.makedirs(os.path.dirname(self.log_path) or ".", exist_ok=True)
with open(self.log_path, "w") as f:
json.dump(self.entries, f, indent=2)
except OSError:
pass # If we truly can't write, don't crash the whole process
def log(self, action: str, detail: str = "", status: str = "ok"):
entry = {
"timestamp": datetime.now().isoformat(),
"action": action,
"detail": detail[:3000],
"status": status,
}
self.entries.append(entry)
self._save()
# Console output with visual indicators
icons = {"ok": "", "error": "", "warn": ""}
icon = icons.get(status, "")
# Color: green for ok, red for error, yellow for warn
colors = {"ok": "\033[32m", "error": "\033[31m", "warn": "\033[33m"}
reset = "\033[0m"
color = colors.get(status, "")
print(f" {color}[{icon}]{reset} {action}: {detail[:200]}")
# Push to web UI if running
try:
from .web import push_event
push_event("log", {"action": action, "detail": detail[:500], "status": status})
except Exception:
pass
def get_recent(self, n: int = 20) -> list[dict]:
return self.entries[-n:]
def get_all(self) -> list[dict]:
return list(self.entries)
def get_last_phase(self) -> str | None:
for e in reversed(self.entries):
if e["action"] == "phase_complete":
return e["detail"]
return None
def get_errors_since(self, n_entries_back: int = 50) -> list[str]:
"""Get recent error details for debugging context."""
errors = []
for e in self.entries[-n_entries_back:]:
if e["status"] == "error":
errors.append(f"[{e['action']}] {e['detail']}")
return errors
+377
View File
@@ -0,0 +1,377 @@
#!/usr/bin/env python3
"""
AutoDev - Autonomous CLI Development Studio
============================================
A fully autonomous development agent that reads project descriptions and
reference manuals, then plans, implements, compiles, tests, and debugs
complete software projects using a local LLM backend.
Usage:
python -m autodev [options]
Options:
-nomanual Skip reading manuals (for minor tasks)
--backend LLM backend: ollama or vllm (default: from config)
--model Model name (default: from config)
--workdir Working directory (default: current directory)
"""
import argparse
import os
import sys
from . import config
from .llm import LLM, LLMError
from .logger import Logger
from .context import ContextManager
from .sandbox import Sandbox, SandboxViolation
from .dependency import DependencyTracker
from .planner import Planner
from .executor import Executor
from .debugger import Debugger
from .resume import ResumeManager
BANNER = r"""
╔══════════════════════════════════════════════════╗
║ AutoDev - Autonomous Dev Studio ║
║ ================================ ║
║ Reads descriptions. Reads manuals. Builds. ║
╚══════════════════════════════════════════════════╝
"""
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="AutoDev - Autonomous CLI Development Studio",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("-nomanual", action="store_true",
help="Skip reading manuals directory")
parser.add_argument("--backend", choices=["ollama", "vllm"],
default=None, help="LLM backend")
parser.add_argument("--model", default=None, help="Model name")
parser.add_argument("--workdir", default=os.getcwd(),
help="Working directory (default: cwd)")
parser.add_argument("-web", type=int, default=None, metavar="PORT",
help="Start web dashboard on this port (e.g. -web 4500)")
return parser.parse_args()
def check_llm_connection(llm: LLM, logger: Logger) -> bool:
try:
resp = llm.query("Respond with only the word: OK",
system="You are a test. Respond with only: OK",
temperature=0.0)
if resp and len(resp.strip()) < 50:
logger.log("llm_check", f"Backend {llm.backend} ({llm.model}) connected")
return True
# Got a response but it was verbose — still connected
logger.log("llm_check", f"Backend connected (verbose response)", "warn")
return True
except LLMError as e:
logger.log("llm_check", str(e), "error")
return False
def _read_user_context_size() -> int | None:
"""Read context_size from ~/.ahvibe.conf if it exists."""
conf_path = os.path.expanduser("~/.ahvibe.conf")
if not os.path.exists(conf_path):
return None
try:
with open(conf_path, "r") as f:
for line in f:
line = line.strip()
if line.startswith("context_size="):
return int(line.split("=", 1)[1].strip())
except (IOError, ValueError):
pass
return None
def run(args: argparse.Namespace):
print(BANNER)
workdir = os.path.realpath(args.workdir)
backend = args.backend or config.LLM_BACKEND
model = args.model or config.MODEL_NAME
print(f" Working directory: {workdir}")
print(f" Backend: {backend} | Model: {model}")
print()
# Initialize components
logger = Logger(workdir)
sandbox = Sandbox(workdir)
deps = DependencyTracker(workdir)
llm = LLM(backend=args.backend, model=args.model)
# Start web dashboard if requested
if args.web:
from .web import start_web_server
start_web_server(args.web, workdir)
print(f" \033[36m🌐 Web dashboard: http://localhost:{args.web}\033[0m")
logger.log("startup", f"AutoDev started in {workdir}")
# Step 1: Check LLM (this also loads the model into memory)
print(" Checking LLM connection...")
if not check_llm_connection(llm, logger):
print(f"\n \033[31m✗ Cannot reach LLM backend.\033[0m")
print(f" Backend: {llm.backend} at {llm.base_url}")
print(f" Ensure Ollama/vLLM is running and the model is loaded.")
logger.log("exit", "LLM unreachable", "error")
sys.exit(1)
# Check GPU/CPU status (model is now loaded, /api/ps has real data)
gpu = llm.detect_gpu_status()
if gpu["loaded"]:
gpu_pct = gpu["gpu_percent"]
size_gb = gpu["size_total"] / 1e9
vram_gb = gpu["size_vram"] / 1e9
if gpu_pct >= 90:
print(f" \033[32m✓\033[0m GPU: {gpu_pct}% offloaded ({vram_gb:.1f}/{size_gb:.1f} GB)")
elif gpu_pct >= 50:
print(f" \033[33m⚠\033[0m GPU: {gpu_pct}% offloaded ({vram_gb:.1f}/{size_gb:.1f} GB) — partial GPU, expect slower performance")
logger.log("gpu_warn", f"{gpu_pct}% GPU", "warn")
else:
print(f" \033[31m✗ GPU: {gpu_pct}% offloaded ({vram_gb:.1f}/{size_gb:.1f} GB)\033[0m")
print(f" {gpu['warning']}")
logger.log("gpu_warn", gpu["warning"], "error")
if gpu_pct == 0:
print(f"\n Running on CPU only. This will likely be too slow for autonomous development.")
print(f" Consider: a smaller model, freeing GPU memory, or increasing VRAM.")
resp = "" # Continue anyway — user can Ctrl+C
else:
print(f" ⊘ Model not pre-loaded (will load on first query)")
# Auto-detect context size (model is now loaded, /api/ps has runtime value)
model_ctx = llm.detect_context_size()
local_ctx = int(model_ctx * 0.6)
ctx = ContextManager(max_tokens=local_ctx)
print(f" Context: {model_ctx:,} tokens (local budget: {local_ctx:,})")
logger.log("context_size", f"Context: {model_ctx}, budget: {local_ctx}")
# Initialize remaining components
planner = Planner(llm, logger, ctx, workdir)
executor = Executor(llm, logger, ctx, sandbox, deps, workdir)
debugger = Debugger(llm, logger, ctx, sandbox, workdir)
resume = ResumeManager(logger, workdir)
# Step 2: Read description
print(" Reading project description...")
description = planner.read_description()
if not description:
print(f"\n \033[31m✗ No '{config.DESCRIPTION_FILE}' found in {workdir}\033[0m")
print(" Create a description.txt with your project requirements and run again.")
logger.log("exit", "No description.txt", "error")
sys.exit(1)
logger.log("description_read", f"{len(description)} chars")
print(f" \033[32m✓\033[0m Description loaded ({len(description)} chars)")
# Step 3: Read manuals
manuals = {}
if args.nomanual:
print(" ⊘ Skipping manuals (-nomanual flag)")
logger.log("manuals_skip", "User requested -nomanual")
else:
manuals_dir = os.path.join(workdir, config.MANUALS_DIR)
if not os.path.isdir(manuals_dir):
print(f"\n \033[31m✗ No '{config.MANUALS_DIR}/' directory found.\033[0m")
print(" Create a manuals/ folder with reference docs, or use -nomanual flag.")
logger.log("exit", "No manuals directory", "error")
sys.exit(1)
manuals = planner.read_manuals()
if not manuals:
print(" \033[33m⚠\033[0m Manuals directory exists but is empty")
logger.log("manuals_empty", "No files in manuals/", "warn")
else:
print(f" \033[32m✓\033[0m Loaded {len(manuals)} manual(s): {', '.join(manuals.keys())}")
logger.log("manuals_read", f"{len(manuals)} files")
# Step 4: Plan or resume
plan = None
start_step = 0
import hashlib
desc_hash = hashlib.md5(description.encode()).hexdigest()
if not resume.is_fresh_start():
print("\n ↻ Previous session detected. Checking resume state...")
if resume.description_changed(desc_hash):
print(" \033[33m⚠\033[0m Description changed. Re-planning with existing work context.")
logger.log("replan", "Description changed, creating incremental plan", "warn")
plan = None # Force re-plan, but we'll pass existing files
else:
plan = planner.load_existing_plan()
if plan and plan.get("steps"):
start_step = resume.get_resume_step()
total = len(plan["steps"])
if start_step >= total:
print(f" \033[32m✓\033[0m Previous run completed. Nothing to do.")
print(f" To re-run, delete .autodev_state.json or change description.txt.")
logger.log("skip", "Already complete, description unchanged")
return
print(f" \033[32m✓\033[0m Resuming from step {start_step + 1}/{total}")
ctx.add("system", f"Project: {plan.get('summary', description[:500])}", priority=9)
else:
print(" \033[33m⚠\033[0m Could not load previous plan. Starting fresh.")
plan = None
if not plan:
print("\n Planning development...")
# Gather existing project files for context (for incremental re-planning)
existing_work = ""
skip_files = {config.WORKLOG_FILE, config.PLAN_FILE, config.DEPENDENCY_FILE,
config.DESCRIPTION_FILE, ".autodev_state.json"}
for fname in sorted(os.listdir(workdir)):
fpath = os.path.join(workdir, fname)
if os.path.isfile(fpath) and fname not in skip_files and not fname.startswith("."):
try:
with open(fpath, "r") as f:
content = f.read()
if content.strip():
existing_work += f"### {fname}\n{content[:2000]}\n\n"
except (IOError, UnicodeDecodeError):
pass
plan = planner.create_plan(description, manuals,
existing_work=existing_work if existing_work else "")
if plan.get("error"):
print(f" \033[31m✗ Planning failed: {plan.get('error')}\033[0m")
if plan.get("raw"):
print(f" Raw LLM output (first 300 chars): {plan['raw'][:300]}")
logger.log("exit", "Planning failed", "error")
sys.exit(1)
steps = plan.get("steps", [])
print(f" \033[32m✓\033[0m Plan created: {plan.get('project_name', 'project')}")
print(f" Language: {plan.get('language', 'unknown')}")
print(f" Summary: {plan.get('summary', 'N/A')}")
print(f" Steps: {len(steps)}")
print(f" Dependencies: {', '.join(plan.get('dependencies', [])) or 'none'}")
print()
deps.add_many(plan.get("dependencies", []))
# Step 5: Execute
steps = plan.get("steps", [])
if not steps:
print(" \033[31m✗ Plan has no steps.\033[0m Check description.txt for clarity.")
logger.log("exit", "Empty plan", "error")
sys.exit(1)
total = len(steps)
print(f"\n{'='*54}")
print(f" EXECUTING PLAN — {total} steps")
print(f"{'='*54}\n")
# Push plan to web UI
try:
from .web import push_event
push_event("plan", {
"project": plan.get("project_name", "project"),
"steps": [{"id": s.get("id", i+1), "phase": s.get("phase", ""),
"description": s.get("description", ""), "status": "pending"}
for i, s in enumerate(steps)],
"start_step": start_step,
})
except Exception:
pass
for i in range(start_step, total):
step = steps[i]
step_num = i + 1
phase = step.get("phase", "implement")
desc = step.get("description", "")
print(f"\n ── Step {step_num}/{total} [\033[36m{phase}\033[0m] ──")
print(f" {desc}")
resume.save_state(i, plan, desc_hash=desc_hash)
result = executor.execute_step(step, plan)
if not result["success"]:
print(f" \033[31m✗ Step failed.\033[0m Entering debug loop...")
logger.log("step_failed", f"Step {step_num}", "error")
try:
from .web import push_event
push_event("step_done", {"step": i, "status": "error"})
except Exception:
pass
fixed = False
for attempt in range(config.MAX_DEBUG_CYCLES):
print(f"\n Debug attempt {attempt + 1}/{config.MAX_DEBUG_CYCLES}...")
dbg = debugger.debug_errors(result["errors"], step, plan)
if dbg["fixed"]:
# Re-run step to verify
print(f" Fix applied. Re-running step...")
result = executor.execute_step(step, plan)
if result["success"]:
print(f" \033[32m✓ Fixed on attempt {attempt + 1}\033[0m")
fixed = True
break
print(f" Step still failing after fix.")
result["errors"] = result.get("errors", [])
elif dbg["errors"]:
result["errors"] = dbg["errors"]
if ctx.detect_cycle():
logger.log("cycle_break", "Breaking debug cycle — approaches too similar", "warn")
ctx.clear_stale()
if not fixed:
print(f"\n \033[31m✗ Could not fix step {step_num} after {config.MAX_DEBUG_CYCLES} attempts.\033[0m")
for err in result["errors"][:3]:
print(f" Error: {err[:200]}")
print(f"\n The worklog has been saved. Review it and add documentation if needed.")
print(f" Restart AutoDev to resume from this step.")
resume.mark_failed(i, plan, "; ".join(result["errors"][:2]),
desc_hash=desc_hash)
logger.log("halt", f"Stopped at step {step_num}", "error")
sys.exit(1)
logger.log("phase_complete", f"step_{step_num}")
ctx_info = ctx.token_usage()
print(f" \033[32m✓\033[0m Step {step_num} complete "
f"(context: {ctx_info['utilization']})")
try:
from .web import push_event
push_event("step_done", {"step": i, "status": "ok"})
except Exception:
pass
# Done
resume.mark_complete(plan, desc_hash=desc_hash)
print(f"\n{'='*54}")
print(f" \033[32m✓ ALL {total} STEPS COMPLETE\033[0m")
print(f"{'='*54}")
print(f"\n Project: {plan.get('project_name', 'project')}")
print(f" Language: {plan.get('language', 'unknown')}")
print(f" Dependencies: see {config.DEPENDENCY_FILE}")
print(f" Worklog: see {config.WORKLOG_FILE}")
print(f" Plan: see {config.PLAN_FILE}")
logger.log("complete", "All steps executed successfully")
def main():
args = parse_args()
try:
run(args)
except KeyboardInterrupt:
print("\n\n Interrupted by user. State saved — restart to resume.")
sys.exit(130)
except SandboxViolation as e:
print(f"\n \033[31m✗ SANDBOX VIOLATION: {e}\033[0m")
sys.exit(1)
except LLMError as e:
print(f"\n \033[31m✗ LLM ERROR: {e}\033[0m")
sys.exit(1)
except Exception as e:
print(f"\n \033[31m✗ UNEXPECTED ERROR: {e}\033[0m")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()
+181
View File
@@ -0,0 +1,181 @@
"""
AutoDev - Planner
Reads description + manuals, queries LLM to produce a structured development plan.
Approaches the task like a principal engineer — thorough, skeptical, complete.
"""
import json
import os
from .llm import LLM
from .logger import Logger
from .context import ContextManager
from . import config
SYSTEM_PROMPT = config.EXPERT_IDENTITY + """
You are now in PLANNING mode. You have been given a project description and optional reference manuals.
Your job is to produce a complete, actionable development plan that another engineer could follow blindly.
Think through this carefully:
1. What EXACTLY is being asked for? Read the description multiple times. Identify every requirement.
2. What language, frameworks, and tools are needed?
3. What is the correct project structure? Think about separation of concerns.
4. What are the build steps? What compiler flags, linker flags, dependencies?
5. What could go wrong? Plan for error handling and edge cases.
6. What is the correct order of implementation? Dependencies between files matter.
Output ONLY valid JSON with this structure:
{
"project_name": "string",
"language": "primary language",
"summary": "2-3 sentence summary of what we're building and why",
"dependencies": ["every tool, compiler, library needed"],
"structure": ["every file and directory to create, in order"],
"steps": [
{
"id": 1,
"phase": "setup|implement|test|debug|finalize",
"description": "detailed description of what to do and WHY",
"files": ["files this step creates or modifies"],
"commands": ["exact shell commands to run, if any"],
"acceptance": "how to verify this step succeeded"
}
]
}
Rules:
- Every file that needs to exist MUST have a step that creates it.
- Implementation steps MUST be ordered so dependencies are created before dependents.
- Include a setup step for directory structure and dependency checks.
- Include test/compile steps after implementation.
- Include a finalize step that does a clean build and verification.
- The "acceptance" field is critical — it defines what success looks like for each step.
- Be specific in commands. Not "compile the code" but "gcc -Wall -Wextra -o main main.c util.c -lm".
"""
class Planner:
def __init__(self, llm: LLM, logger: Logger, ctx: ContextManager, workdir: str):
self.llm = llm
self.logger = logger
self.ctx = ctx
self.workdir = workdir
self.plan_path = os.path.join(workdir, config.PLAN_FILE)
def read_description(self) -> str | None:
path = os.path.join(self.workdir, config.DESCRIPTION_FILE)
if not os.path.exists(path):
return None
with open(path, "r") as f:
return f.read().strip()
def read_manuals(self) -> dict[str, str]:
manuals = {}
mdir = os.path.join(self.workdir, config.MANUALS_DIR)
if not os.path.isdir(mdir):
return manuals
for fname in sorted(os.listdir(mdir)):
fpath = os.path.join(mdir, fname)
if os.path.isfile(fpath):
try:
with open(fpath, "r") as f:
manuals[fname] = f.read()
except (IOError, UnicodeDecodeError):
self.logger.log("manual_skip", f"Could not read {fname}", "warn")
return manuals
def create_plan(self, description: str, manuals: dict[str, str],
existing_work: str = "") -> dict:
prompt_parts = [
"## Project Description\n"
"Read this VERY carefully. Every sentence is a requirement.\n\n"
f"{description}"
]
for name, content in manuals.items():
prompt_parts.append(f"## Reference Manual: {name}\n{content[:6000]}")
if existing_work:
prompt_parts.append(
"## Existing Work\n"
"The following files already exist from a previous iteration. "
"Your plan should BUILD ON what exists — only add steps for NEW or CHANGED requirements. "
"Do NOT recreate files that already satisfy the requirements.\n\n"
f"{existing_work}"
)
prompt = "\n\n".join(prompt_parts)
self.ctx.add("user", prompt, priority=9)
self.logger.log("planning", "Querying LLM for development plan")
response = self.llm.query(prompt, system=SYSTEM_PROMPT, temperature=0.3)
self.ctx.add("assistant", response, priority=8)
plan = self._parse_plan(response)
# Validate plan has required fields
if not plan.get("steps"):
# Retry once with more explicit instruction
self.logger.log("plan_retry", "First plan had no steps, retrying", "warn")
retry_prompt = (
prompt + "\n\nIMPORTANT: Your previous response could not be parsed. "
"You MUST output ONLY a JSON object. No text before or after. "
"Start your response with { and end with }."
)
response = self.llm.query(retry_prompt, system=SYSTEM_PROMPT, temperature=0.1)
plan = self._parse_plan(response)
self._save_plan(plan)
self.logger.log("plan_created", f"{len(plan.get('steps', []))} steps")
return plan
def load_existing_plan(self) -> dict | None:
if os.path.exists(self.plan_path):
try:
with open(self.plan_path, "r") as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
return None
return None
def _parse_plan(self, response: str) -> dict:
text = response.strip()
# Try progressively more aggressive extraction
for extracted in self._extract_json_candidates(text):
try:
plan = json.loads(extracted)
if isinstance(plan, dict):
return plan
except json.JSONDecodeError:
continue
self.logger.log("plan_parse_fail", "Could not parse plan JSON", "error")
return {"steps": [], "error": "Failed to parse plan", "raw": response[:1000]}
def _extract_json_candidates(self, text: str) -> list[str]:
"""Yield JSON candidates from most to least likely."""
candidates = []
# 1. Markdown fenced JSON
if "```json" in text:
block = text.split("```json", 1)[1].split("```", 1)[0].strip()
candidates.append(block)
elif "```" in text:
block = text.split("```", 1)[1].split("```", 1)[0].strip()
candidates.append(block)
# 2. Raw text (entire response)
candidates.append(text)
# 3. Find outermost { ... } with brace matching
start = text.find("{")
if start >= 0:
depth = 0
for i in range(start, len(text)):
if text[i] == "{":
depth += 1
elif text[i] == "}":
depth -= 1
if depth == 0:
candidates.append(text[start:i + 1])
break
return candidates
def _save_plan(self, plan: dict):
with open(self.plan_path, "w") as f:
json.dump(plan, f, indent=2)
+68
View File
@@ -0,0 +1,68 @@
"""
AutoDev - Resume Manager
Handles state persistence and resuming from worklog.
"""
import json
import os
from .logger import Logger
from . import config
class ResumeManager:
def __init__(self, logger: Logger, workdir: str):
self.logger = logger
self.workdir = workdir
self.state_path = os.path.join(workdir, ".autodev_state.json")
def save_state(self, current_step: int, plan: dict, status: str = "in_progress",
desc_hash: str = ""):
state = {
"current_step": current_step,
"total_steps": len(plan.get("steps", [])),
"status": status,
"plan_hash": hash(json.dumps(plan, sort_keys=True)),
"desc_hash": desc_hash,
}
with open(self.state_path, "w") as f:
json.dump(state, f, indent=2)
def load_state(self) -> dict | None:
if not os.path.exists(self.state_path):
return None
try:
with open(self.state_path, "r") as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
return None
def get_resume_step(self) -> int:
"""Determine which step to resume from based on worklog."""
state = self.load_state()
if state and state.get("status") == "in_progress":
step = state.get("current_step", 0)
self.logger.log("resume", f"Resuming from step {step + 1}")
return step
return 0
def mark_complete(self, plan: dict, desc_hash: str = ""):
self.save_state(len(plan.get("steps", [])), plan, status="complete",
desc_hash=desc_hash)
self.logger.log("phase_complete", "all")
def mark_failed(self, step: int, plan: dict, reason: str, desc_hash: str = ""):
self.save_state(step, plan, status="failed", desc_hash=desc_hash)
self.logger.log("failed", f"Step {step + 1}: {reason}", "error")
def description_changed(self, desc_hash: str) -> bool:
"""Check if description.txt has changed since last run."""
state = self.load_state()
if not state:
return False
old_hash = state.get("desc_hash", "")
if not old_hash:
return False # No hash stored — can't tell, assume unchanged
return old_hash != desc_hash
def is_fresh_start(self) -> bool:
return not os.path.exists(self.state_path)
+214
View File
@@ -0,0 +1,214 @@
"""
AutoDev - Sandbox
Enforces working directory confinement with whitelist-based command validation.
"""
import os
import re
# Whitelisted command prefixes — only these are allowed to execute
ALLOWED_COMMANDS = [
# Build tools
"make", "cmake", "gcc", "g++", "clang", "clang++", "rustc", "cargo",
"go ", "go build", "go run", "go test", "go mod",
"javac", "java ", "jar ", "mvn ", "gradle",
"dotnet", "msbuild",
"python", "python3", "pip ", "pip3 ",
"node ", "npm ", "npx ", "yarn ", "pnpm ",
"ruby ", "gem ", "bundle ",
"perl ", "lua ", "luac",
"nasm", "as ", "ld ",
# Common utilities (safe)
"ls", "cat ", "head ", "tail ", "wc ", "sort ", "uniq ",
"find ", "grep ", "awk ", "sed ", "diff ", "patch ",
"mkdir ", "cp ", "mv ", "rm ", "touch ", "chmod ",
"cd ", "pwd",
"tar ", "zip ", "unzip ", "gzip ", "gunzip ",
"curl ", "wget ",
"echo ", "printf ", "test ", "true", "false",
"which ", "env ", "basename ", "dirname ",
"pkg-config", "ldconfig",
# Version checks
"gcc --version", "g++ --version", "python3 --version",
"rustc --version", "cargo --version", "go version",
"java -version", "javac -version", "node --version",
]
# Absolutely forbidden patterns — override whitelist
FORBIDDEN_PATTERNS = [
re.compile(r"\bsudo\b"),
re.compile(r"\bsu\s"),
re.compile(r"\brm\s+-rf\s+/\s*$"),
re.compile(r"\bmkfs\b"),
re.compile(r"\bdd\s+if="),
re.compile(r">\s*/dev/"),
re.compile(r"\bshutdown\b"),
re.compile(r"\breboot\b"),
re.compile(r"\binit\s+[0-6]"),
re.compile(r"\bsystemctl\b"),
re.compile(r"\bchmod\s+777\s+/"),
re.compile(r"\bchown\b.*\s+/"),
re.compile(r"\bmount\b"),
re.compile(r"\bumount\b"),
re.compile(r"\biptables\b"),
re.compile(r"\bnft\b"),
re.compile(r"\bpasswd\b"),
re.compile(r"\buseradd\b"),
re.compile(r"\buserdel\b"),
re.compile(r"\bvisudo\b"),
re.compile(r"\bcrontab\b"),
]
class SandboxViolation(Exception):
pass
class Sandbox:
def __init__(self, workdir: str):
self.workdir = os.path.realpath(workdir)
def validate_path(self, path: str) -> str:
resolved = os.path.realpath(os.path.join(self.workdir, path))
if not resolved.startswith(self.workdir):
raise SandboxViolation(f"Path escapes sandbox: {path} -> {resolved}")
return resolved
def validate_command(self, cmd: str):
cmd_stripped = cmd.strip()
cmd_lower = cmd_stripped.lower()
# Check forbidden patterns first
for pattern in FORBIDDEN_PATTERNS:
if pattern.search(cmd_lower):
raise SandboxViolation(f"Forbidden command pattern: {pattern.pattern}")
# Check if command starts with an allowed prefix
# Handle shell constructs: pipes, &&, ;
# But respect quoted strings — don't split inside them
parts = self._split_shell_commands(cmd_stripped)
for part in parts:
part = part.strip()
if not part:
continue
# Skip env var assignments like FOO=bar
if re.match(r'^[A-Za-z_][A-Za-z0-9_]*=', part):
# Extract the command after assignments
tokens = part.split()
part = " ".join(t for t in tokens if "=" not in t or not re.match(r'^[A-Za-z_]', t))
if not part:
continue
allowed = False
for prefix in ALLOWED_COMMANDS:
if part.startswith(prefix) or part.split()[0] == prefix.strip():
allowed = True
break
# Also allow ./scripts and relative paths
if part.startswith("./") or part.startswith("bash ") or part.startswith("sh "):
allowed = True
if not allowed:
raise SandboxViolation(
f"Command not in whitelist: '{part.split()[0]}'. "
f"Only build tools and safe utilities are allowed."
)
# Check for path escapes in write-like commands
# Only check unquoted tokens that look like real absolute paths
in_single = False
in_double = False
for ch in cmd_stripped:
if ch == "'" and not in_double:
in_single = not in_single
elif ch == '"' and not in_single:
in_double = not in_double
# If the command has balanced quotes, extract only unquoted parts
unquoted_parts = []
current = []
in_single = False
in_double = False
for ch in cmd_stripped:
if ch == "'" and not in_double:
in_single = not in_single
continue
elif ch == '"' and not in_single:
in_double = not in_double
continue
if not in_single and not in_double:
current.append(ch)
else:
if current and current[-1] != " ":
current.append(" ")
unquoted = "".join(current)
for token in unquoted.split():
if token.startswith("/") and not token.startswith(self.workdir):
if token in ("//", "/dev/null") or len(token) <= 2:
continue
read_only_prefixes = ["/usr", "/lib", "/etc/alternatives", "/bin", "/opt"]
if any(token.startswith(p) for p in read_only_prefixes):
continue
raise SandboxViolation(f"Reference to path outside sandbox: {token}")
def safe_write(self, path: str, content: str) -> str:
full = self.validate_path(path)
# Protect AutoDev's own state files from being overwritten
protected = {"worklog.json", "plan.json", "dependency.txt", ".autodev_state.json"}
if os.path.basename(full) in protected and os.path.exists(full):
# Only AutoDev internals should write these
pass # Allow — the caller is AutoDev itself
os.makedirs(os.path.dirname(full), exist_ok=True)
# Don't write a file if a directory exists at that path
if os.path.isdir(full):
import shutil
shutil.rmtree(full)
with open(full, "w") as f:
f.write(content)
return full
def safe_read(self, path: str) -> str:
full = self.validate_path(path)
with open(full, "r") as f:
return f.read()
def safe_mkdir(self, path: str) -> str:
full = self.validate_path(path)
os.makedirs(full, exist_ok=True)
return full
@staticmethod
def _split_shell_commands(cmd: str) -> list[str]:
"""Split a shell command on &&, ||, |, ; but respect quoted strings."""
parts = []
current = []
in_single = False
in_double = False
i = 0
while i < len(cmd):
c = cmd[i]
if c == "'" and not in_double:
in_single = not in_single
current.append(c)
elif c == '"' and not in_single:
in_double = not in_double
current.append(c)
elif not in_single and not in_double:
# Check for &&, ||, |, ;
two = cmd[i:i+2]
if two in ("&&", "||"):
parts.append("".join(current).strip())
current = []
i += 2
continue
elif c in (";", "|"):
parts.append("".join(current).strip())
current = []
else:
current.append(c)
else:
current.append(c)
i += 1
tail = "".join(current).strip()
if tail:
parts.append(tail)
return [p for p in parts if p]
+353
View File
@@ -0,0 +1,353 @@
"""
AutoDev - Web UI
Serves a live dashboard showing LLM activity, file tree, and file contents.
Uses SSE (Server-Sent Events) for real-time updates. No external dependencies.
"""
import http.server
import json
import os
import queue
import threading
import urllib.parse
from . import config
# Global event queue — logger pushes events, SSE endpoint streams them
_event_queue: queue.Queue = queue.Queue(maxsize=5000)
_workdir: str = ""
def push_event(event_type: str, data: dict):
"""Push an event to all connected web clients."""
try:
_event_queue.put_nowait({"type": event_type, "data": data})
except queue.Full:
pass
class WebHandler(http.server.BaseHTTPRequestHandler):
def log_message(self, format, *args):
pass # Suppress default logging
def do_GET(self):
try:
parsed = urllib.parse.urlparse(self.path)
path = parsed.path
params = urllib.parse.parse_qs(parsed.query)
if path == "/":
self._serve_html()
elif path == "/events":
self._serve_sse()
elif path == "/api/files":
self._serve_file_tree()
elif path == "/api/file":
filepath = params.get("path", [""])[0]
self._serve_file_content(filepath)
else:
self.send_error(404)
except (BrokenPipeError, ConnectionResetError, OSError):
pass
def _serve_html(self):
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.end_headers()
self.wfile.write(HTML_PAGE.encode("utf-8"))
def _serve_sse(self):
self.send_response(200)
self.send_header("Content-Type", "text/event-stream")
self.send_header("Cache-Control", "no-cache")
self.send_header("Connection", "keep-alive")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
try:
while True:
try:
event = _event_queue.get(timeout=1)
line = f"data: {json.dumps(event)}\n\n"
self.wfile.write(line.encode("utf-8"))
self.wfile.flush()
except queue.Empty:
# Send keepalive
self.wfile.write(b": keepalive\n\n")
self.wfile.flush()
except (BrokenPipeError, ConnectionResetError, OSError):
pass
def _serve_file_tree(self):
files = []
for root, dirs, filenames in os.walk(_workdir):
# Skip hidden dirs and autodev backups
dirs[:] = [d for d in dirs if not d.startswith(".") and d != "__pycache__"]
for fname in sorted(filenames):
if fname.startswith("."):
continue
full = os.path.join(root, fname)
rel = os.path.relpath(full, _workdir)
try:
size = os.path.getsize(full)
except OSError:
size = 0
files.append({"path": rel, "size": size})
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(files).encode("utf-8"))
def _serve_file_content(self, filepath: str):
if not filepath:
self.send_error(400, "Missing path parameter")
return
full = os.path.realpath(os.path.join(_workdir, filepath))
if not full.startswith(os.path.realpath(_workdir)):
self.send_error(403, "Path outside workspace")
return
try:
with open(full, "r") as f:
content = f.read()
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"path": filepath, "content": content}).encode("utf-8"))
except (IOError, UnicodeDecodeError):
self.send_error(404, "File not found or not readable")
def start_web_server(port: int, workdir: str):
"""Start the web UI server in a background thread."""
global _workdir
_workdir = workdir
server = http.server.HTTPServer(("0.0.0.0", port), WebHandler)
server.daemon_threads = True
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
return server
HTML_PAGE = """<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>AutoDev — Live Dashboard</title>
<style>
* { margin: 0; padding: 0; box-sizing: border-box; }
body { font-family: 'SF Mono', 'Consolas', 'Monaco', monospace; background: #0d1117; color: #c9d1d9; height: 100vh; overflow: hidden; }
.layout { display: grid; grid-template-columns: 300px 1fr; grid-template-rows: 40px 1fr; height: 100vh; }
/* Header */
.header { grid-column: 1 / -1; background: #161b22; border-bottom: 1px solid #21262d; padding: 8px 16px; display: flex; align-items: center; gap: 16px; font-size: 13px; }
.header .title { color: #58a6ff; font-weight: 700; font-size: 14px; }
.header .status { color: #3fb950; }
.header .info { color: #8b949e; }
/* Left panel: plan progress + file tree + file viewer */
.left { display: flex; flex-direction: column; border-right: 1px solid #21262d; overflow: hidden; }
.plan-progress { flex: 0 0 auto; max-height: 35%; overflow-y: auto; border-bottom: 1px solid #21262d; padding: 8px; }
.file-tree { flex: 0 0 25%; overflow-y: auto; border-bottom: 1px solid #21262d; padding: 8px; }
.file-viewer { flex: 1; overflow-y: auto; padding: 8px; min-height: 0; }
/* Right panel: LLM activity log */
.right { display: flex; flex-direction: column; }
.activity { flex: 1; overflow-y: auto; padding: 12px; }
.panel-title { font-size: 11px; text-transform: uppercase; letter-spacing: 1px; color: #8b949e; padding: 8px 0 6px; border-bottom: 1px solid #21262d; margin-bottom: 8px; }
/* Plan steps */
.step-item { padding: 5px 8px; font-size: 12px; border-radius: 4px; margin-bottom: 2px; display: flex; align-items: flex-start; gap: 8px; }
.step-icon { flex-shrink: 0; width: 18px; text-align: center; }
.step-pending .step-icon { color: #484f58; }
.step-active .step-icon { color: #d29922; }
.step-ok .step-icon { color: #3fb950; }
.step-error .step-icon { color: #f85149; }
.step-active { background: #d2992211; }
.step-phase { color: #8b949e; font-size: 10px; text-transform: uppercase; }
.step-desc { color: #c9d1d9; line-height: 1.3; }
.step-ok .step-desc { color: #8b949e; }
/* File tree */
.file-item { padding: 4px 8px; cursor: pointer; font-size: 13px; border-radius: 4px; white-space: nowrap; overflow: hidden; text-overflow: ellipsis; }
.file-item:hover { background: #161b22; }
.file-item.active { background: #1f6feb33; color: #58a6ff; }
.file-size { color: #484f58; font-size: 11px; float: right; }
/* File viewer */
.file-content { font-size: 12px; line-height: 1.5; white-space: pre-wrap; word-break: break-all; color: #e6edf3; }
.file-path { font-size: 11px; color: #58a6ff; padding-bottom: 6px; border-bottom: 1px solid #21262d; margin-bottom: 8px; }
/* Activity log */
.log-entry { padding: 4px 0; font-size: 13px; line-height: 1.4; border-bottom: 1px solid #21262d0a; }
.log-time { color: #484f58; font-size: 11px; }
.log-ok { color: #3fb950; }
.log-error { color: #f85149; }
.log-warn { color: #d29922; }
.log-action { color: #58a6ff; font-weight: 600; }
.log-detail { color: #c9d1d9; }
.log-llm { color: #bc8cff; }
::-webkit-scrollbar { width: 6px; }
::-webkit-scrollbar-track { background: transparent; }
::-webkit-scrollbar-thumb { background: #30363d; border-radius: 3px; }
</style>
</head>
<body>
<div class="layout">
<div class="header">
<span class="title">⚡ AutoDev</span>
<span class="status" id="status">Connecting...</span>
<span class="info" id="info"></span>
</div>
<div class="left">
<div class="plan-progress">
<div class="panel-title">📋 Plan Progress</div>
<div id="plan">Waiting for plan...</div>
</div>
<div class="file-tree">
<div class="panel-title">📁 Project Files</div>
<div id="files"></div>
</div>
<div class="file-viewer">
<div class="panel-title">📄 File Content</div>
<div class="file-path" id="file-path">Select a file</div>
<pre class="file-content" id="file-content"></pre>
</div>
</div>
<div class="right">
<div class="activity">
<div class="panel-title">🤖 LLM Activity</div>
<div id="log"></div>
</div>
</div>
</div>
<script>
const logEl = document.getElementById('log');
const filesEl = document.getElementById('files');
const planEl = document.getElementById('plan');
const filePathEl = document.getElementById('file-path');
const fileContentEl = document.getElementById('file-content');
const statusEl = document.getElementById('status');
const infoEl = document.getElementById('info');
let activeFile = null;
let planSteps = [];
let activeStep = -1;
function connect() {
const es = new EventSource('/events');
es.onopen = () => { statusEl.textContent = '● Connected'; statusEl.style.color = '#3fb950'; };
es.onerror = () => { statusEl.textContent = '● Reconnecting...'; statusEl.style.color = '#d29922'; };
es.onmessage = (e) => { handleEvent(JSON.parse(e.data)); };
}
function handleEvent(evt) {
const d = evt.data;
if (evt.type === 'log') {
addLogEntry(d);
refreshFiles();
if (activeFile) loadFile(activeFile);
if (d.action === 'step_start') {
activeStep++;
updatePlanUI();
}
} else if (evt.type === 'plan') {
planSteps = d.steps || [];
activeStep = d.start_step - 1;
for (let i = 0; i < d.start_step; i++) {
if (planSteps[i]) planSteps[i].status = 'ok';
}
infoEl.textContent = d.project + '' + planSteps.length + ' steps';
updatePlanUI();
} else if (evt.type === 'step_done') {
if (planSteps[d.step]) {
planSteps[d.step].status = d.status;
}
updatePlanUI();
} else if (evt.type === 'llm_response') {
addLogEntry({action: 'thinking', detail: d.response || '', status: 'llm'});
}
}
function updatePlanUI() {
if (!planSteps.length) return;
let html = '';
planSteps.forEach((s, i) => {
let cls = 'step-pending';
let icon = '';
if (s.status === 'ok') { cls = 'step-ok'; icon = ''; }
else if (s.status === 'error') { cls = 'step-error'; icon = ''; }
else if (i === activeStep + 1 || (i === activeStep && s.status === 'pending')) {
cls = 'step-active'; icon = '';
}
html += '<div class="step-item ' + cls + '">'
+ '<span class="step-icon">' + icon + '</span>'
+ '<div><span class="step-phase">' + s.phase + '</span> '
+ '<span class="step-desc">' + escapeHtml(s.description) + '</span></div>'
+ '</div>';
});
// Summary
const done = planSteps.filter(s => s.status === 'ok').length;
const failed = planSteps.filter(s => s.status === 'error').length;
html = '<div style="font-size:11px;color:#8b949e;margin-bottom:6px">'
+ done + '/' + planSteps.length + ' complete'
+ (failed ? ' · <span style="color:#f85149">' + failed + ' failed</span>' : '')
+ '</div>' + html;
planEl.innerHTML = html;
}
function addLogEntry(d) {
const div = document.createElement('div');
div.className = 'log-entry';
const time = new Date().toLocaleTimeString();
const isLlm = d.status === 'llm';
if (isLlm) {
div.innerHTML = '<span class="log-time">' + time + '</span> '
+ '<span class="log-llm">🧠 ' + escapeHtml(d.detail || '') + '</span>';
} else {
const cls = d.status === 'ok' ? 'log-ok' : d.status === 'error' ? 'log-error' : d.status === 'warn' ? 'log-warn' : 'log-ok';
const icon = d.status === 'ok' ? '' : d.status === 'error' ? '' : d.status === 'warn' ? '' : '';
div.innerHTML = '<span class="log-time">' + time + '</span> '
+ '<span class="' + cls + '">[' + icon + ']</span> '
+ '<span class="log-action">' + (d.action || '') + '</span> '
+ '<span class="log-detail">' + escapeHtml(d.detail || '') + '</span>';
}
logEl.prepend(div);
}
function escapeHtml(s) { const d = document.createElement('div'); d.textContent = s; return d.innerHTML; }
function refreshFiles() {
fetch('/api/files').then(r => r.json()).then(files => {
filesEl.innerHTML = '';
files.forEach(f => {
const div = document.createElement('div');
div.className = 'file-item' + (f.path === activeFile ? ' active' : '');
const sz = f.size > 1024 ? (f.size/1024).toFixed(1)+'K' : f.size+'B';
div.innerHTML = escapeHtml(f.path) + '<span class="file-size">' + sz + '</span>';
div.onclick = () => loadFile(f.path);
filesEl.appendChild(div);
});
}).catch(() => {});
}
function loadFile(path) {
activeFile = path;
fetch('/api/file?path=' + encodeURIComponent(path))
.then(r => r.json())
.then(d => { filePathEl.textContent = d.path; fileContentEl.textContent = d.content; })
.catch(() => { fileContentEl.textContent = '(could not load)'; });
document.querySelectorAll('.file-item').forEach(el => {
el.classList.toggle('active', el.textContent.startsWith(path));
});
}
connect();
refreshFiles();
setInterval(refreshFiles, 5000);
</script>
</body>
</html>"""