""" AutoDev - Executor Generates code, writes files, runs compilation and shell commands. Uses relevance-based context selection and expert-level prompting. """ import subprocess import os import json from .llm import LLM from .logger import Logger from .context import ContextManager, estimate_tokens from .sandbox import Sandbox, SandboxViolation from .dependency import DependencyTracker from . import config CODE_GEN_SYSTEM = config.EXPERT_IDENTITY + """ You are now in CODE GENERATION mode. Generate complete, production-quality code. Rules: - Output ONLY the file content. No markdown fences. No explanations before or after. - The code must be COMPLETE. No "// TODO", no "// ... rest of code", no placeholders. - Include all necessary imports/includes at the top. - Include proper error handling. - Add concise comments explaining non-obvious logic. - If this is a header file, include proper include guards. - If this is a build file (Makefile, CMakeLists.txt, etc.), make it complete and correct. """ MULTI_FILE_SYSTEM = config.EXPERT_IDENTITY + """ You are now in MULTI-FILE GENERATION mode. Generate multiple complete source files. Output ONLY valid JSON with this structure: { "files": [ {"path": "relative/path/to/file", "content": "complete file content"} ], "commands": ["optional shell commands to run after writing files"] } Rules: - Every file must be COMPLETE. No placeholders, no stubs. - All imports/includes must reference files that exist or will be created. - Output ONLY the JSON object. Start with { and end with }. """ class Executor: def __init__(self, llm: LLM, logger: Logger, ctx: ContextManager, sandbox: Sandbox, deps: DependencyTracker, workdir: str): self.llm = llm self.logger = logger self.ctx = ctx self.sandbox = sandbox self.deps = deps self.workdir = workdir def execute_step(self, step: dict, plan: dict) -> dict: """Execute a single plan step. Returns {success, output, errors}.""" phase = step.get("phase", "implement") desc = step.get("description", "") commands = step.get("commands", []) self.logger.log("step_start", f"[{phase}] {desc}") result = {"success": True, "output": "", "errors": []} try: if phase == "setup": result = self._do_setup(step, plan) elif phase in ("implement", "finalize"): result = self._do_implement(step, plan) elif phase == "test": result = self._do_test(step, plan) elif phase == "debug": result = self._do_debug(step, plan) else: result = self._do_implement(step, plan) # Run any explicit commands from the plan # Skip for phases that handle their own commands or generate files via LLM if phase not in ("setup", "test", "implement"): for cmd in commands: cmd_result = self._run_command(cmd) if cmd_result["returncode"] != 0: result["errors"].append( f"Command '{cmd}' failed (exit {cmd_result['returncode']}):\n" f"{cmd_result['stderr']}" ) result["success"] = False result["output"] += cmd_result["stdout"] # Verify acceptance criteria if defined acceptance = step.get("acceptance", "") if acceptance and result["success"]: self.logger.log("acceptance_check", acceptance) except SandboxViolation as e: result["success"] = False result["errors"].append(f"Sandbox violation: {e}") self.logger.log("sandbox_violation", str(e), "error") except Exception as e: result["success"] = False result["errors"].append(str(e)) self.logger.log("step_error", str(e), "error") status = "ok" if result["success"] else "error" self.logger.log("step_done", f"[{phase}] success={result['success']}", status) return result def _do_setup(self, step: dict, plan: dict) -> dict: result = {"success": True, "output": "", "errors": []} for path in plan.get("structure", []): # If it looks like a file (has extension), ensure parent dir exists # If it looks like a directory (no extension), create it # But never mkdir over an existing file full = os.path.join(self.workdir, path) if "." in os.path.basename(path): parent = os.path.dirname(path) if parent: self.sandbox.safe_mkdir(parent) else: if os.path.isfile(full): self.logger.log("setup_skip", f"{path} exists as file, not creating dir", "warn") else: self.sandbox.safe_mkdir(path) self.logger.log("mkdir", path) for dep in plan.get("dependencies", []): self.deps.add(dep) # Setup commands are best-effort — non-zero exit is a warning, not failure for cmd in step.get("commands", []): # Auto-fix common issues: add -p to mkdir, add -f to touch cmd = self._fixup_setup_command(cmd) r = self._run_command(cmd) result["output"] += r["stdout"] if r["returncode"] != 0: self.logger.log("setup_warn", r["stderr"][:200], "warn") # Only fail setup if it's a real error, not "already exists" if not self._is_benign_error(r["stderr"]): result["errors"].append(r["stderr"]) return result @staticmethod def _fixup_setup_command(cmd: str) -> str: """Auto-fix common setup command issues.""" stripped = cmd.strip() # Any mkdir without -p → add -p if "mkdir " in stripped and " -p" not in stripped: return stripped.replace("mkdir ", "mkdir -p ") return cmd @staticmethod def _is_benign_error(stderr: str) -> bool: """Check if an error is harmless (e.g., 'already exists').""" benign = ["File exists", "already exists", "No such file or directory"] return any(b in stderr for b in benign) def _do_implement(self, step: dict, plan: dict) -> dict: files = step.get("files", []) if not files: return self._implement_freeform(step, plan) if len(files) == 1: return self._implement_single(files[0], step, plan) return self._implement_multi(files, step, plan) def _implement_single(self, filepath: str, step: dict, plan: dict) -> dict: result = {"success": True, "output": "", "errors": []} prompt = self._build_code_prompt(step, plan, filepath) # Use focused context to avoid blowing token limits self.ctx.add("user", prompt, priority=7) code = self.llm.query(prompt, system=CODE_GEN_SYSTEM) code = self._strip_fences(code) # Validate we got actual code, not an explanation if self._looks_like_explanation(code): self.logger.log("regen", f"LLM returned explanation instead of code for {filepath}, retrying", "warn") retry_prompt = ( prompt + "\n\nYou returned an explanation instead of code. " "Output ONLY the raw file content. No markdown. No explanations. " "Start with the first line of the actual source code." ) code = self.llm.query(retry_prompt, system=CODE_GEN_SYSTEM, temperature=0.1) code = self._strip_fences(code) self.ctx.add("assistant", f"Generated {filepath} ({len(code)} chars)", priority=5) self.sandbox.safe_write(filepath, code) self.logger.log("file_written", f"{filepath} ({len(code)} chars)") result["output"] = f"Created {filepath}" return result def _implement_multi(self, files: list, step: dict, plan: dict) -> dict: result = {"success": True, "output": "", "errors": []} prompt = self._build_code_prompt(step, plan) prompt += f"\n\nGenerate these files: {json.dumps(files)}" self.ctx.add("user", prompt, priority=7) response = self.llm.query(prompt, system=MULTI_FILE_SYSTEM) self.ctx.add("assistant", f"Generated {len(files)} files", priority=5) parsed = self._parse_multi_response(response) if not parsed.get("files"): # Retry self.logger.log("regen", "Multi-file response had no files, retrying", "warn") retry_prompt = ( prompt + "\n\nYour response could not be parsed. " "Output ONLY a JSON object starting with { and ending with }. " "The 'files' array must contain objects with 'path' and 'content' keys." ) response = self.llm.query(retry_prompt, system=MULTI_FILE_SYSTEM, temperature=0.1) parsed = self._parse_multi_response(response) for finfo in parsed.get("files", []): path = finfo.get("path", "") content = finfo.get("content", "") if path and content: self.sandbox.safe_write(path, content) self.logger.log("file_written", f"{path} ({len(content)} chars)") result["output"] += f"Created {path}\n" for cmd in parsed.get("commands", []): r = self._run_command(cmd) result["output"] += r["stdout"] if r["returncode"] != 0: result["errors"].append(r["stderr"]) result["success"] = False return result def _implement_freeform(self, step: dict, plan: dict) -> dict: return self._implement_multi([], step, plan) def _do_test(self, step: dict, plan: dict) -> dict: result = {"success": True, "output": "", "errors": []} commands = step.get("commands", []) if not commands: prompt = ( f"Project: {plan.get('language', 'unknown')} project.\n" f"Step: {step.get('description', '')}\n" f"Files in project: {json.dumps(plan.get('structure', []))}\n\n" "What exact shell commands should I run to compile and test this? " "Output ONLY the commands, one per line. No explanations. No markdown." ) response = self.llm.query( prompt, system="You are a build engineer. Output only shell commands, one per line.", temperature=0.1, ) commands = [ l.strip() for l in response.strip().splitlines() if l.strip() and not l.strip().startswith("#") and not l.strip().startswith("```") ] for cmd in commands: r = self._run_command(cmd) result["output"] += r["stdout"] if r["returncode"] != 0: result["errors"].append(f"Command '{cmd}' failed:\n{r['stderr']}") result["success"] = False return result def _do_debug(self, step: dict, plan: dict) -> dict: return {"success": True, "output": "Debug step (handled by debugger)", "errors": []} def _build_code_prompt(self, step: dict, plan: dict, filepath: str = None) -> str: parts = [ f"Project: {plan.get('project_name', 'unknown')}", f"Language: {plan.get('language', 'unknown')}", f"Summary: {plan.get('summary', '')}", f"Project structure: {json.dumps(plan.get('structure', []))}", f"\nCurrent task: {step.get('description', '')}", ] if filepath: parts.append(f"\nGenerate the COMPLETE content for file: {filepath}") # Selectively include existing files that are relevant existing = self._get_relevant_files(step, plan) if existing: parts.append("\n## Existing project files (for reference — ensure compatibility):") for p, c in existing.items(): parts.append(f"\n### {p}\n{c}") return "\n".join(parts) def _get_relevant_files(self, step: dict, plan: dict) -> dict[str, str]: """Include only files relevant to the current step, within token budget.""" files = {} budget = config.MAX_CONTEXT_TOKENS // 3 # Reserve 1/3 of context for existing files step_files = set(step.get("files", [])) # Priority 1: Files explicitly mentioned in this step (headers, dependencies) # Priority 2: Files that share a directory with step files # Priority 3: Build files (Makefile, CMakeLists.txt, etc.) build_files = {"Makefile", "CMakeLists.txt", "setup.py", "pyproject.toml", "Cargo.toml", "go.mod", "package.json", "pom.xml", "build.gradle"} candidates = [] for path in plan.get("structure", []): full = os.path.join(self.workdir, path) if not os.path.isfile(full): continue try: with open(full, "r") as f: content = f.read() except (IOError, UnicodeDecodeError): continue # Score relevance score = 0 basename = os.path.basename(path) if path in step_files: score = 0 # Don't include the file we're about to generate continue if basename in build_files: score = 3 elif any(os.path.dirname(path) == os.path.dirname(sf) for sf in step_files): score = 2 # Check if any step file imports/includes this file elif any(basename.split(".")[0] in sf for sf in step_files): score = 2 else: score = 1 candidates.append((score, path, content)) candidates.sort(key=lambda x: x[0], reverse=True) used = 0 for score, path, content in candidates: tokens = estimate_tokens(content) if used + tokens > budget: # Truncate large files if tokens > budget // 2: content = content[:int(budget * config.TOKEN_CHAR_RATIO // 2)] content += "\n// ... (truncated for context)\n" tokens = estimate_tokens(content) else: continue files[path] = content used += tokens return files def _run_command(self, cmd: str) -> dict: self.sandbox.validate_command(cmd) self.logger.log("exec", cmd) try: proc = subprocess.run( cmd, shell=True, capture_output=True, text=True, timeout=config.COMPILE_TIMEOUT, cwd=self.workdir, ) if proc.stdout: self.logger.log("stdout", proc.stdout[:500]) if proc.returncode != 0 and proc.stderr: self.logger.log("stderr", proc.stderr[:500], "error") return { "returncode": proc.returncode, "stdout": proc.stdout, "stderr": proc.stderr, } except subprocess.TimeoutExpired: self.logger.log("timeout", f"Command timed out ({config.COMPILE_TIMEOUT}s): {cmd}", "error") return {"returncode": -1, "stdout": "", "stderr": f"Timeout after {config.COMPILE_TIMEOUT}s"} def _strip_fences(self, text: str) -> str: text = text.strip() if text.startswith("```"): first_nl = text.find("\n") if first_nl >= 0: text = text[first_nl + 1:] if text.endswith("```"): text = text[:-3] return text.strip() def _looks_like_explanation(self, text: str) -> bool: """Detect if LLM returned prose instead of code.""" lines = text.strip().splitlines()[:5] if not lines: return True prose_indicators = ["here is", "here's", "below is", "i'll", "let me", "this code", "the following", "sure,", "certainly"] first_lines = " ".join(lines[:3]).lower() return any(ind in first_lines for ind in prose_indicators) def _parse_multi_response(self, response: str) -> dict: text = self._strip_fences(response) # Try progressively more aggressive extraction for candidate in self._extract_json_candidates(text): try: result = json.loads(candidate) if isinstance(result, dict) and "files" in result: return result except json.JSONDecodeError: continue self.logger.log("parse_fail", "Could not parse multi-file response", "error") return {"files": [], "commands": []} def _extract_json_candidates(self, text: str) -> list[str]: candidates = [text] if "```json" in text: candidates.insert(0, text.split("```json", 1)[1].split("```", 1)[0].strip()) start = text.find("{") if start >= 0: depth = 0 for i in range(start, len(text)): if text[i] == "{": depth += 1 elif text[i] == "}": depth -= 1 if depth == 0: candidates.insert(0, text[start:i + 1]) break return candidates