Files

418 lines
17 KiB
Python

"""
AutoDev - Executor
Generates code, writes files, runs compilation and shell commands.
Uses relevance-based context selection and expert-level prompting.
"""
import subprocess
import os
import json
from .llm import LLM
from .logger import Logger
from .context import ContextManager, estimate_tokens
from .sandbox import Sandbox, SandboxViolation
from .dependency import DependencyTracker
from . import config
CODE_GEN_SYSTEM = config.EXPERT_IDENTITY + """
You are now in CODE GENERATION mode. Generate complete, production-quality code.
Rules:
- Output ONLY the file content. No markdown fences. No explanations before or after.
- The code must be COMPLETE. No "// TODO", no "// ... rest of code", no placeholders.
- Include all necessary imports/includes at the top.
- Include proper error handling.
- Add concise comments explaining non-obvious logic.
- If this is a header file, include proper include guards.
- If this is a build file (Makefile, CMakeLists.txt, etc.), make it complete and correct.
"""
MULTI_FILE_SYSTEM = config.EXPERT_IDENTITY + """
You are now in MULTI-FILE GENERATION mode. Generate multiple complete source files.
Output ONLY valid JSON with this structure:
{
"files": [
{"path": "relative/path/to/file", "content": "complete file content"}
],
"commands": ["optional shell commands to run after writing files"]
}
Rules:
- Every file must be COMPLETE. No placeholders, no stubs.
- All imports/includes must reference files that exist or will be created.
- Output ONLY the JSON object. Start with { and end with }.
"""
class Executor:
def __init__(self, llm: LLM, logger: Logger, ctx: ContextManager,
sandbox: Sandbox, deps: DependencyTracker, workdir: str):
self.llm = llm
self.logger = logger
self.ctx = ctx
self.sandbox = sandbox
self.deps = deps
self.workdir = workdir
def execute_step(self, step: dict, plan: dict) -> dict:
"""Execute a single plan step. Returns {success, output, errors}."""
phase = step.get("phase", "implement")
desc = step.get("description", "")
commands = step.get("commands", [])
self.logger.log("step_start", f"[{phase}] {desc}")
result = {"success": True, "output": "", "errors": []}
try:
if phase == "setup":
result = self._do_setup(step, plan)
elif phase in ("implement", "finalize"):
result = self._do_implement(step, plan)
elif phase == "test":
result = self._do_test(step, plan)
elif phase == "debug":
result = self._do_debug(step, plan)
else:
result = self._do_implement(step, plan)
# Run any explicit commands from the plan
# Skip for phases that handle their own commands or generate files via LLM
if phase not in ("setup", "test", "implement"):
for cmd in commands:
cmd_result = self._run_command(cmd)
if cmd_result["returncode"] != 0:
result["errors"].append(
f"Command '{cmd}' failed (exit {cmd_result['returncode']}):\n"
f"{cmd_result['stderr']}"
)
result["success"] = False
result["output"] += cmd_result["stdout"]
# Verify acceptance criteria if defined
acceptance = step.get("acceptance", "")
if acceptance and result["success"]:
self.logger.log("acceptance_check", acceptance)
except SandboxViolation as e:
result["success"] = False
result["errors"].append(f"Sandbox violation: {e}")
self.logger.log("sandbox_violation", str(e), "error")
except Exception as e:
result["success"] = False
result["errors"].append(str(e))
self.logger.log("step_error", str(e), "error")
status = "ok" if result["success"] else "error"
self.logger.log("step_done", f"[{phase}] success={result['success']}", status)
return result
def _do_setup(self, step: dict, plan: dict) -> dict:
result = {"success": True, "output": "", "errors": []}
for path in plan.get("structure", []):
# If it looks like a file (has extension), ensure parent dir exists
# If it looks like a directory (no extension), create it
# But never mkdir over an existing file
full = os.path.join(self.workdir, path)
if "." in os.path.basename(path):
parent = os.path.dirname(path)
if parent:
self.sandbox.safe_mkdir(parent)
else:
if os.path.isfile(full):
self.logger.log("setup_skip", f"{path} exists as file, not creating dir", "warn")
else:
self.sandbox.safe_mkdir(path)
self.logger.log("mkdir", path)
for dep in plan.get("dependencies", []):
self.deps.add(dep)
# Setup commands are best-effort — non-zero exit is a warning, not failure
for cmd in step.get("commands", []):
# Auto-fix common issues: add -p to mkdir, add -f to touch
cmd = self._fixup_setup_command(cmd)
r = self._run_command(cmd)
result["output"] += r["stdout"]
if r["returncode"] != 0:
self.logger.log("setup_warn", r["stderr"][:200], "warn")
# Only fail setup if it's a real error, not "already exists"
if not self._is_benign_error(r["stderr"]):
result["errors"].append(r["stderr"])
return result
@staticmethod
def _fixup_setup_command(cmd: str) -> str:
"""Auto-fix common setup command issues."""
stripped = cmd.strip()
# Any mkdir without -p → add -p
if "mkdir " in stripped and " -p" not in stripped:
return stripped.replace("mkdir ", "mkdir -p ")
return cmd
@staticmethod
def _is_benign_error(stderr: str) -> bool:
"""Check if an error is harmless (e.g., 'already exists')."""
benign = ["File exists", "already exists", "No such file or directory"]
return any(b in stderr for b in benign)
def _do_implement(self, step: dict, plan: dict) -> dict:
files = step.get("files", [])
if not files:
return self._implement_freeform(step, plan)
if len(files) == 1:
return self._implement_single(files[0], step, plan)
return self._implement_multi(files, step, plan)
def _implement_single(self, filepath: str, step: dict, plan: dict) -> dict:
result = {"success": True, "output": "", "errors": []}
prompt = self._build_code_prompt(step, plan, filepath)
# Use focused context to avoid blowing token limits
self.ctx.add("user", prompt, priority=7)
code = self.llm.query(prompt, system=CODE_GEN_SYSTEM)
code = self._strip_fences(code)
# Validate we got actual code, not an explanation
if self._looks_like_explanation(code):
self.logger.log("regen", f"LLM returned explanation instead of code for {filepath}, retrying", "warn")
retry_prompt = (
prompt + "\n\nYou returned an explanation instead of code. "
"Output ONLY the raw file content. No markdown. No explanations. "
"Start with the first line of the actual source code."
)
code = self.llm.query(retry_prompt, system=CODE_GEN_SYSTEM, temperature=0.1)
code = self._strip_fences(code)
self.ctx.add("assistant", f"Generated {filepath} ({len(code)} chars)", priority=5)
self.sandbox.safe_write(filepath, code)
self.logger.log("file_written", f"{filepath} ({len(code)} chars)")
result["output"] = f"Created {filepath}"
return result
def _implement_multi(self, files: list, step: dict, plan: dict) -> dict:
result = {"success": True, "output": "", "errors": []}
prompt = self._build_code_prompt(step, plan)
prompt += f"\n\nGenerate these files: {json.dumps(files)}"
self.ctx.add("user", prompt, priority=7)
response = self.llm.query(prompt, system=MULTI_FILE_SYSTEM)
self.ctx.add("assistant", f"Generated {len(files)} files", priority=5)
parsed = self._parse_multi_response(response)
if not parsed.get("files"):
# Retry
self.logger.log("regen", "Multi-file response had no files, retrying", "warn")
retry_prompt = (
prompt + "\n\nYour response could not be parsed. "
"Output ONLY a JSON object starting with { and ending with }. "
"The 'files' array must contain objects with 'path' and 'content' keys."
)
response = self.llm.query(retry_prompt, system=MULTI_FILE_SYSTEM, temperature=0.1)
parsed = self._parse_multi_response(response)
for finfo in parsed.get("files", []):
path = finfo.get("path", "")
content = finfo.get("content", "")
if path and content:
self.sandbox.safe_write(path, content)
self.logger.log("file_written", f"{path} ({len(content)} chars)")
result["output"] += f"Created {path}\n"
for cmd in parsed.get("commands", []):
r = self._run_command(cmd)
result["output"] += r["stdout"]
if r["returncode"] != 0:
result["errors"].append(r["stderr"])
result["success"] = False
return result
def _implement_freeform(self, step: dict, plan: dict) -> dict:
return self._implement_multi([], step, plan)
def _do_test(self, step: dict, plan: dict) -> dict:
result = {"success": True, "output": "", "errors": []}
commands = step.get("commands", [])
if not commands:
prompt = (
f"Project: {plan.get('language', 'unknown')} project.\n"
f"Step: {step.get('description', '')}\n"
f"Files in project: {json.dumps(plan.get('structure', []))}\n\n"
"What exact shell commands should I run to compile and test this? "
"Output ONLY the commands, one per line. No explanations. No markdown."
)
response = self.llm.query(
prompt,
system="You are a build engineer. Output only shell commands, one per line.",
temperature=0.1,
)
commands = [
l.strip() for l in response.strip().splitlines()
if l.strip() and not l.strip().startswith("#") and not l.strip().startswith("```")
]
for cmd in commands:
r = self._run_command(cmd)
result["output"] += r["stdout"]
if r["returncode"] != 0:
result["errors"].append(f"Command '{cmd}' failed:\n{r['stderr']}")
result["success"] = False
return result
def _do_debug(self, step: dict, plan: dict) -> dict:
return {"success": True, "output": "Debug step (handled by debugger)", "errors": []}
def _build_code_prompt(self, step: dict, plan: dict, filepath: str = None) -> str:
parts = [
f"Project: {plan.get('project_name', 'unknown')}",
f"Language: {plan.get('language', 'unknown')}",
f"Summary: {plan.get('summary', '')}",
f"Project structure: {json.dumps(plan.get('structure', []))}",
f"\nCurrent task: {step.get('description', '')}",
]
if filepath:
parts.append(f"\nGenerate the COMPLETE content for file: {filepath}")
# Selectively include existing files that are relevant
existing = self._get_relevant_files(step, plan)
if existing:
parts.append("\n## Existing project files (for reference — ensure compatibility):")
for p, c in existing.items():
parts.append(f"\n### {p}\n{c}")
return "\n".join(parts)
def _get_relevant_files(self, step: dict, plan: dict) -> dict[str, str]:
"""Include only files relevant to the current step, within token budget."""
files = {}
budget = config.MAX_CONTEXT_TOKENS // 3 # Reserve 1/3 of context for existing files
step_files = set(step.get("files", []))
# Priority 1: Files explicitly mentioned in this step (headers, dependencies)
# Priority 2: Files that share a directory with step files
# Priority 3: Build files (Makefile, CMakeLists.txt, etc.)
build_files = {"Makefile", "CMakeLists.txt", "setup.py", "pyproject.toml",
"Cargo.toml", "go.mod", "package.json", "pom.xml", "build.gradle"}
candidates = []
for path in plan.get("structure", []):
full = os.path.join(self.workdir, path)
if not os.path.isfile(full):
continue
try:
with open(full, "r") as f:
content = f.read()
except (IOError, UnicodeDecodeError):
continue
# Score relevance
score = 0
basename = os.path.basename(path)
if path in step_files:
score = 0 # Don't include the file we're about to generate
continue
if basename in build_files:
score = 3
elif any(os.path.dirname(path) == os.path.dirname(sf) for sf in step_files):
score = 2
# Check if any step file imports/includes this file
elif any(basename.split(".")[0] in sf for sf in step_files):
score = 2
else:
score = 1
candidates.append((score, path, content))
candidates.sort(key=lambda x: x[0], reverse=True)
used = 0
for score, path, content in candidates:
tokens = estimate_tokens(content)
if used + tokens > budget:
# Truncate large files
if tokens > budget // 2:
content = content[:int(budget * config.TOKEN_CHAR_RATIO // 2)]
content += "\n// ... (truncated for context)\n"
tokens = estimate_tokens(content)
else:
continue
files[path] = content
used += tokens
return files
def _run_command(self, cmd: str) -> dict:
self.sandbox.validate_command(cmd)
self.logger.log("exec", cmd)
try:
proc = subprocess.run(
cmd, shell=True, capture_output=True, text=True,
timeout=config.COMPILE_TIMEOUT, cwd=self.workdir,
)
if proc.stdout:
self.logger.log("stdout", proc.stdout[:500])
if proc.returncode != 0 and proc.stderr:
self.logger.log("stderr", proc.stderr[:500], "error")
return {
"returncode": proc.returncode,
"stdout": proc.stdout,
"stderr": proc.stderr,
}
except subprocess.TimeoutExpired:
self.logger.log("timeout", f"Command timed out ({config.COMPILE_TIMEOUT}s): {cmd}", "error")
return {"returncode": -1, "stdout": "", "stderr": f"Timeout after {config.COMPILE_TIMEOUT}s"}
def _strip_fences(self, text: str) -> str:
text = text.strip()
if text.startswith("```"):
first_nl = text.find("\n")
if first_nl >= 0:
text = text[first_nl + 1:]
if text.endswith("```"):
text = text[:-3]
return text.strip()
def _looks_like_explanation(self, text: str) -> bool:
"""Detect if LLM returned prose instead of code."""
lines = text.strip().splitlines()[:5]
if not lines:
return True
prose_indicators = ["here is", "here's", "below is", "i'll", "let me", "this code",
"the following", "sure,", "certainly"]
first_lines = " ".join(lines[:3]).lower()
return any(ind in first_lines for ind in prose_indicators)
def _parse_multi_response(self, response: str) -> dict:
text = self._strip_fences(response)
# Try progressively more aggressive extraction
for candidate in self._extract_json_candidates(text):
try:
result = json.loads(candidate)
if isinstance(result, dict) and "files" in result:
return result
except json.JSONDecodeError:
continue
self.logger.log("parse_fail", "Could not parse multi-file response", "error")
return {"files": [], "commands": []}
def _extract_json_candidates(self, text: str) -> list[str]:
candidates = [text]
if "```json" in text:
candidates.insert(0, text.split("```json", 1)[1].split("```", 1)[0].strip())
start = text.find("{")
if start >= 0:
depth = 0
for i in range(start, len(text)):
if text[i] == "{":
depth += 1
elif text[i] == "}":
depth -= 1
if depth == 0:
candidates.insert(0, text[start:i + 1])
break
return candidates