418 lines
17 KiB
Python
418 lines
17 KiB
Python
"""
|
|
AutoDev - Executor
|
|
Generates code, writes files, runs compilation and shell commands.
|
|
Uses relevance-based context selection and expert-level prompting.
|
|
"""
|
|
|
|
import subprocess
|
|
import os
|
|
import json
|
|
from .llm import LLM
|
|
from .logger import Logger
|
|
from .context import ContextManager, estimate_tokens
|
|
from .sandbox import Sandbox, SandboxViolation
|
|
from .dependency import DependencyTracker
|
|
from . import config
|
|
|
|
CODE_GEN_SYSTEM = config.EXPERT_IDENTITY + """
|
|
|
|
You are now in CODE GENERATION mode. Generate complete, production-quality code.
|
|
|
|
Rules:
|
|
- Output ONLY the file content. No markdown fences. No explanations before or after.
|
|
- The code must be COMPLETE. No "// TODO", no "// ... rest of code", no placeholders.
|
|
- Include all necessary imports/includes at the top.
|
|
- Include proper error handling.
|
|
- Add concise comments explaining non-obvious logic.
|
|
- If this is a header file, include proper include guards.
|
|
- If this is a build file (Makefile, CMakeLists.txt, etc.), make it complete and correct.
|
|
"""
|
|
|
|
MULTI_FILE_SYSTEM = config.EXPERT_IDENTITY + """
|
|
|
|
You are now in MULTI-FILE GENERATION mode. Generate multiple complete source files.
|
|
|
|
Output ONLY valid JSON with this structure:
|
|
{
|
|
"files": [
|
|
{"path": "relative/path/to/file", "content": "complete file content"}
|
|
],
|
|
"commands": ["optional shell commands to run after writing files"]
|
|
}
|
|
|
|
Rules:
|
|
- Every file must be COMPLETE. No placeholders, no stubs.
|
|
- All imports/includes must reference files that exist or will be created.
|
|
- Output ONLY the JSON object. Start with { and end with }.
|
|
"""
|
|
|
|
|
|
class Executor:
|
|
def __init__(self, llm: LLM, logger: Logger, ctx: ContextManager,
|
|
sandbox: Sandbox, deps: DependencyTracker, workdir: str):
|
|
self.llm = llm
|
|
self.logger = logger
|
|
self.ctx = ctx
|
|
self.sandbox = sandbox
|
|
self.deps = deps
|
|
self.workdir = workdir
|
|
|
|
def execute_step(self, step: dict, plan: dict) -> dict:
|
|
"""Execute a single plan step. Returns {success, output, errors}."""
|
|
phase = step.get("phase", "implement")
|
|
desc = step.get("description", "")
|
|
commands = step.get("commands", [])
|
|
|
|
self.logger.log("step_start", f"[{phase}] {desc}")
|
|
result = {"success": True, "output": "", "errors": []}
|
|
|
|
try:
|
|
if phase == "setup":
|
|
result = self._do_setup(step, plan)
|
|
elif phase in ("implement", "finalize"):
|
|
result = self._do_implement(step, plan)
|
|
elif phase == "test":
|
|
result = self._do_test(step, plan)
|
|
elif phase == "debug":
|
|
result = self._do_debug(step, plan)
|
|
else:
|
|
result = self._do_implement(step, plan)
|
|
|
|
# Run any explicit commands from the plan
|
|
# Skip for phases that handle their own commands or generate files via LLM
|
|
if phase not in ("setup", "test", "implement"):
|
|
for cmd in commands:
|
|
cmd_result = self._run_command(cmd)
|
|
if cmd_result["returncode"] != 0:
|
|
result["errors"].append(
|
|
f"Command '{cmd}' failed (exit {cmd_result['returncode']}):\n"
|
|
f"{cmd_result['stderr']}"
|
|
)
|
|
result["success"] = False
|
|
result["output"] += cmd_result["stdout"]
|
|
|
|
# Verify acceptance criteria if defined
|
|
acceptance = step.get("acceptance", "")
|
|
if acceptance and result["success"]:
|
|
self.logger.log("acceptance_check", acceptance)
|
|
|
|
except SandboxViolation as e:
|
|
result["success"] = False
|
|
result["errors"].append(f"Sandbox violation: {e}")
|
|
self.logger.log("sandbox_violation", str(e), "error")
|
|
except Exception as e:
|
|
result["success"] = False
|
|
result["errors"].append(str(e))
|
|
self.logger.log("step_error", str(e), "error")
|
|
|
|
status = "ok" if result["success"] else "error"
|
|
self.logger.log("step_done", f"[{phase}] success={result['success']}", status)
|
|
return result
|
|
|
|
def _do_setup(self, step: dict, plan: dict) -> dict:
|
|
result = {"success": True, "output": "", "errors": []}
|
|
for path in plan.get("structure", []):
|
|
# If it looks like a file (has extension), ensure parent dir exists
|
|
# If it looks like a directory (no extension), create it
|
|
# But never mkdir over an existing file
|
|
full = os.path.join(self.workdir, path)
|
|
if "." in os.path.basename(path):
|
|
parent = os.path.dirname(path)
|
|
if parent:
|
|
self.sandbox.safe_mkdir(parent)
|
|
else:
|
|
if os.path.isfile(full):
|
|
self.logger.log("setup_skip", f"{path} exists as file, not creating dir", "warn")
|
|
else:
|
|
self.sandbox.safe_mkdir(path)
|
|
self.logger.log("mkdir", path)
|
|
|
|
for dep in plan.get("dependencies", []):
|
|
self.deps.add(dep)
|
|
|
|
# Setup commands are best-effort — non-zero exit is a warning, not failure
|
|
for cmd in step.get("commands", []):
|
|
# Auto-fix common issues: add -p to mkdir, add -f to touch
|
|
cmd = self._fixup_setup_command(cmd)
|
|
r = self._run_command(cmd)
|
|
result["output"] += r["stdout"]
|
|
if r["returncode"] != 0:
|
|
self.logger.log("setup_warn", r["stderr"][:200], "warn")
|
|
# Only fail setup if it's a real error, not "already exists"
|
|
if not self._is_benign_error(r["stderr"]):
|
|
result["errors"].append(r["stderr"])
|
|
|
|
return result
|
|
|
|
@staticmethod
|
|
def _fixup_setup_command(cmd: str) -> str:
|
|
"""Auto-fix common setup command issues."""
|
|
stripped = cmd.strip()
|
|
# Any mkdir without -p → add -p
|
|
if "mkdir " in stripped and " -p" not in stripped:
|
|
return stripped.replace("mkdir ", "mkdir -p ")
|
|
return cmd
|
|
|
|
@staticmethod
|
|
def _is_benign_error(stderr: str) -> bool:
|
|
"""Check if an error is harmless (e.g., 'already exists')."""
|
|
benign = ["File exists", "already exists", "No such file or directory"]
|
|
return any(b in stderr for b in benign)
|
|
|
|
def _do_implement(self, step: dict, plan: dict) -> dict:
|
|
files = step.get("files", [])
|
|
if not files:
|
|
return self._implement_freeform(step, plan)
|
|
if len(files) == 1:
|
|
return self._implement_single(files[0], step, plan)
|
|
return self._implement_multi(files, step, plan)
|
|
|
|
def _implement_single(self, filepath: str, step: dict, plan: dict) -> dict:
|
|
result = {"success": True, "output": "", "errors": []}
|
|
prompt = self._build_code_prompt(step, plan, filepath)
|
|
|
|
# Use focused context to avoid blowing token limits
|
|
self.ctx.add("user", prompt, priority=7)
|
|
code = self.llm.query(prompt, system=CODE_GEN_SYSTEM)
|
|
code = self._strip_fences(code)
|
|
|
|
# Validate we got actual code, not an explanation
|
|
if self._looks_like_explanation(code):
|
|
self.logger.log("regen", f"LLM returned explanation instead of code for {filepath}, retrying", "warn")
|
|
retry_prompt = (
|
|
prompt + "\n\nYou returned an explanation instead of code. "
|
|
"Output ONLY the raw file content. No markdown. No explanations. "
|
|
"Start with the first line of the actual source code."
|
|
)
|
|
code = self.llm.query(retry_prompt, system=CODE_GEN_SYSTEM, temperature=0.1)
|
|
code = self._strip_fences(code)
|
|
|
|
self.ctx.add("assistant", f"Generated {filepath} ({len(code)} chars)", priority=5)
|
|
self.sandbox.safe_write(filepath, code)
|
|
self.logger.log("file_written", f"{filepath} ({len(code)} chars)")
|
|
result["output"] = f"Created {filepath}"
|
|
return result
|
|
|
|
def _implement_multi(self, files: list, step: dict, plan: dict) -> dict:
|
|
result = {"success": True, "output": "", "errors": []}
|
|
prompt = self._build_code_prompt(step, plan)
|
|
prompt += f"\n\nGenerate these files: {json.dumps(files)}"
|
|
self.ctx.add("user", prompt, priority=7)
|
|
|
|
response = self.llm.query(prompt, system=MULTI_FILE_SYSTEM)
|
|
self.ctx.add("assistant", f"Generated {len(files)} files", priority=5)
|
|
|
|
parsed = self._parse_multi_response(response)
|
|
if not parsed.get("files"):
|
|
# Retry
|
|
self.logger.log("regen", "Multi-file response had no files, retrying", "warn")
|
|
retry_prompt = (
|
|
prompt + "\n\nYour response could not be parsed. "
|
|
"Output ONLY a JSON object starting with { and ending with }. "
|
|
"The 'files' array must contain objects with 'path' and 'content' keys."
|
|
)
|
|
response = self.llm.query(retry_prompt, system=MULTI_FILE_SYSTEM, temperature=0.1)
|
|
parsed = self._parse_multi_response(response)
|
|
|
|
for finfo in parsed.get("files", []):
|
|
path = finfo.get("path", "")
|
|
content = finfo.get("content", "")
|
|
if path and content:
|
|
self.sandbox.safe_write(path, content)
|
|
self.logger.log("file_written", f"{path} ({len(content)} chars)")
|
|
result["output"] += f"Created {path}\n"
|
|
|
|
for cmd in parsed.get("commands", []):
|
|
r = self._run_command(cmd)
|
|
result["output"] += r["stdout"]
|
|
if r["returncode"] != 0:
|
|
result["errors"].append(r["stderr"])
|
|
result["success"] = False
|
|
|
|
return result
|
|
|
|
def _implement_freeform(self, step: dict, plan: dict) -> dict:
|
|
return self._implement_multi([], step, plan)
|
|
|
|
def _do_test(self, step: dict, plan: dict) -> dict:
|
|
result = {"success": True, "output": "", "errors": []}
|
|
commands = step.get("commands", [])
|
|
if not commands:
|
|
prompt = (
|
|
f"Project: {plan.get('language', 'unknown')} project.\n"
|
|
f"Step: {step.get('description', '')}\n"
|
|
f"Files in project: {json.dumps(plan.get('structure', []))}\n\n"
|
|
"What exact shell commands should I run to compile and test this? "
|
|
"Output ONLY the commands, one per line. No explanations. No markdown."
|
|
)
|
|
response = self.llm.query(
|
|
prompt,
|
|
system="You are a build engineer. Output only shell commands, one per line.",
|
|
temperature=0.1,
|
|
)
|
|
commands = [
|
|
l.strip() for l in response.strip().splitlines()
|
|
if l.strip() and not l.strip().startswith("#") and not l.strip().startswith("```")
|
|
]
|
|
|
|
for cmd in commands:
|
|
r = self._run_command(cmd)
|
|
result["output"] += r["stdout"]
|
|
if r["returncode"] != 0:
|
|
result["errors"].append(f"Command '{cmd}' failed:\n{r['stderr']}")
|
|
result["success"] = False
|
|
|
|
return result
|
|
|
|
def _do_debug(self, step: dict, plan: dict) -> dict:
|
|
return {"success": True, "output": "Debug step (handled by debugger)", "errors": []}
|
|
|
|
def _build_code_prompt(self, step: dict, plan: dict, filepath: str = None) -> str:
|
|
parts = [
|
|
f"Project: {plan.get('project_name', 'unknown')}",
|
|
f"Language: {plan.get('language', 'unknown')}",
|
|
f"Summary: {plan.get('summary', '')}",
|
|
f"Project structure: {json.dumps(plan.get('structure', []))}",
|
|
f"\nCurrent task: {step.get('description', '')}",
|
|
]
|
|
if filepath:
|
|
parts.append(f"\nGenerate the COMPLETE content for file: {filepath}")
|
|
|
|
# Selectively include existing files that are relevant
|
|
existing = self._get_relevant_files(step, plan)
|
|
if existing:
|
|
parts.append("\n## Existing project files (for reference — ensure compatibility):")
|
|
for p, c in existing.items():
|
|
parts.append(f"\n### {p}\n{c}")
|
|
|
|
return "\n".join(parts)
|
|
|
|
def _get_relevant_files(self, step: dict, plan: dict) -> dict[str, str]:
|
|
"""Include only files relevant to the current step, within token budget."""
|
|
files = {}
|
|
budget = config.MAX_CONTEXT_TOKENS // 3 # Reserve 1/3 of context for existing files
|
|
step_files = set(step.get("files", []))
|
|
|
|
# Priority 1: Files explicitly mentioned in this step (headers, dependencies)
|
|
# Priority 2: Files that share a directory with step files
|
|
# Priority 3: Build files (Makefile, CMakeLists.txt, etc.)
|
|
build_files = {"Makefile", "CMakeLists.txt", "setup.py", "pyproject.toml",
|
|
"Cargo.toml", "go.mod", "package.json", "pom.xml", "build.gradle"}
|
|
|
|
candidates = []
|
|
for path in plan.get("structure", []):
|
|
full = os.path.join(self.workdir, path)
|
|
if not os.path.isfile(full):
|
|
continue
|
|
try:
|
|
with open(full, "r") as f:
|
|
content = f.read()
|
|
except (IOError, UnicodeDecodeError):
|
|
continue
|
|
|
|
# Score relevance
|
|
score = 0
|
|
basename = os.path.basename(path)
|
|
if path in step_files:
|
|
score = 0 # Don't include the file we're about to generate
|
|
continue
|
|
if basename in build_files:
|
|
score = 3
|
|
elif any(os.path.dirname(path) == os.path.dirname(sf) for sf in step_files):
|
|
score = 2
|
|
# Check if any step file imports/includes this file
|
|
elif any(basename.split(".")[0] in sf for sf in step_files):
|
|
score = 2
|
|
else:
|
|
score = 1
|
|
|
|
candidates.append((score, path, content))
|
|
|
|
candidates.sort(key=lambda x: x[0], reverse=True)
|
|
used = 0
|
|
for score, path, content in candidates:
|
|
tokens = estimate_tokens(content)
|
|
if used + tokens > budget:
|
|
# Truncate large files
|
|
if tokens > budget // 2:
|
|
content = content[:int(budget * config.TOKEN_CHAR_RATIO // 2)]
|
|
content += "\n// ... (truncated for context)\n"
|
|
tokens = estimate_tokens(content)
|
|
else:
|
|
continue
|
|
files[path] = content
|
|
used += tokens
|
|
|
|
return files
|
|
|
|
def _run_command(self, cmd: str) -> dict:
|
|
self.sandbox.validate_command(cmd)
|
|
self.logger.log("exec", cmd)
|
|
try:
|
|
proc = subprocess.run(
|
|
cmd, shell=True, capture_output=True, text=True,
|
|
timeout=config.COMPILE_TIMEOUT, cwd=self.workdir,
|
|
)
|
|
if proc.stdout:
|
|
self.logger.log("stdout", proc.stdout[:500])
|
|
if proc.returncode != 0 and proc.stderr:
|
|
self.logger.log("stderr", proc.stderr[:500], "error")
|
|
return {
|
|
"returncode": proc.returncode,
|
|
"stdout": proc.stdout,
|
|
"stderr": proc.stderr,
|
|
}
|
|
except subprocess.TimeoutExpired:
|
|
self.logger.log("timeout", f"Command timed out ({config.COMPILE_TIMEOUT}s): {cmd}", "error")
|
|
return {"returncode": -1, "stdout": "", "stderr": f"Timeout after {config.COMPILE_TIMEOUT}s"}
|
|
|
|
def _strip_fences(self, text: str) -> str:
|
|
text = text.strip()
|
|
if text.startswith("```"):
|
|
first_nl = text.find("\n")
|
|
if first_nl >= 0:
|
|
text = text[first_nl + 1:]
|
|
if text.endswith("```"):
|
|
text = text[:-3]
|
|
return text.strip()
|
|
|
|
def _looks_like_explanation(self, text: str) -> bool:
|
|
"""Detect if LLM returned prose instead of code."""
|
|
lines = text.strip().splitlines()[:5]
|
|
if not lines:
|
|
return True
|
|
prose_indicators = ["here is", "here's", "below is", "i'll", "let me", "this code",
|
|
"the following", "sure,", "certainly"]
|
|
first_lines = " ".join(lines[:3]).lower()
|
|
return any(ind in first_lines for ind in prose_indicators)
|
|
|
|
def _parse_multi_response(self, response: str) -> dict:
|
|
text = self._strip_fences(response)
|
|
# Try progressively more aggressive extraction
|
|
for candidate in self._extract_json_candidates(text):
|
|
try:
|
|
result = json.loads(candidate)
|
|
if isinstance(result, dict) and "files" in result:
|
|
return result
|
|
except json.JSONDecodeError:
|
|
continue
|
|
self.logger.log("parse_fail", "Could not parse multi-file response", "error")
|
|
return {"files": [], "commands": []}
|
|
|
|
def _extract_json_candidates(self, text: str) -> list[str]:
|
|
candidates = [text]
|
|
if "```json" in text:
|
|
candidates.insert(0, text.split("```json", 1)[1].split("```", 1)[0].strip())
|
|
start = text.find("{")
|
|
if start >= 0:
|
|
depth = 0
|
|
for i in range(start, len(text)):
|
|
if text[i] == "{":
|
|
depth += 1
|
|
elif text[i] == "}":
|
|
depth -= 1
|
|
if depth == 0:
|
|
candidates.insert(0, text[start:i + 1])
|
|
break
|
|
return candidates
|