Files

273 lines
11 KiB
Python

"""
AutoDev - Debugger
Analyzes errors, generates fixes, detects cycles, manages rollback.
Approaches debugging like a senior engineer — reason about root cause,
never repeat a failed approach, escalate when stuck.
"""
import json
import os
import shutil
import subprocess
from .llm import LLM
from .logger import Logger
from .context import ContextManager
from .sandbox import Sandbox
from . import config
DEBUG_SYSTEM = config.EXPERT_IDENTITY + """
You are now in DEBUG mode. An error occurred during development. Your job is to:
1. DIAGNOSE: What is the root cause? Not the symptom — the actual cause.
2. REASON: Why did the previous code produce this error? What assumption was wrong?
3. FIX: Provide a concrete fix that addresses the root cause.
CRITICAL RULES:
- If you already tried a fix and it didn't work, you MUST try a DIFFERENT approach.
- Never repeat the same fix. If the same error keeps occurring, the problem is architectural.
- Consider: wrong imports, missing dependencies, wrong API usage, type mismatches, missing files.
- For compilation errors: check every include/import, every function signature, every type.
- For runtime errors: check null/nil handling, array bounds, file paths, permissions.
Output ONLY valid JSON:
{
"diagnosis": "root cause analysis — what exactly went wrong and why",
"previous_approach_wrong_because": "why the previous attempt failed (if applicable)",
"strategy": "what different approach we're taking this time",
"fixes": [
{
"file": "path/to/file",
"action": "replace",
"description": "what changed and why",
"full_content": "COMPLETE new file content"
}
],
"commands": ["verification commands to confirm the fix works"]
}
The 'full_content' field must contain the ENTIRE file, not just the changed lines.
Start your response with { and end with }.
"""
class Debugger:
def __init__(self, llm: LLM, logger: Logger, ctx: ContextManager,
sandbox: Sandbox, workdir: str):
self.llm = llm
self.logger = logger
self.ctx = ctx
self.sandbox = sandbox
self.workdir = workdir
self.attempt_count = 0
self.backup_dir = os.path.join(workdir, ".autodev_backups")
self.failed_approaches: list[str] = [] # Track what we already tried
def debug_errors(self, errors: list[str], step: dict, plan: dict) -> dict:
"""Attempt to fix errors. Returns {fixed, output, errors}."""
self.attempt_count += 1
result = {"fixed": False, "output": "", "errors": []}
if self.attempt_count > config.MAX_DEBUG_CYCLES:
self.logger.log("debug_halt",
f"Max debug cycles ({config.MAX_DEBUG_CYCLES}) reached. "
"This likely requires manual intervention or better documentation.",
"error")
result["errors"].append(
"Maximum debug attempts exceeded. The issue may be architectural. "
"Please review the worklog and provide additional documentation."
)
return result
# Check for cycles
if self.ctx.detect_cycle():
self.logger.log("cycle_detected",
"Detected repetitive pattern. Clearing context and trying fresh approach.",
"warn")
self.ctx.clear_stale()
self.failed_approaches.append("CYCLE: All recent approaches were too similar")
# Backup current state
self._backup_files(step.get("files", []))
prompt = self._build_debug_prompt(errors, step, plan)
self.ctx.add("user", prompt, priority=8)
self.logger.log("debug_attempt",
f"Attempt {self.attempt_count}/{config.MAX_DEBUG_CYCLES}")
try:
response = self.llm.query(prompt, system=DEBUG_SYSTEM, temperature=0.3)
self.ctx.add("assistant", response[:1500], priority=6)
fix = self._parse_fix(response)
if not fix.get("fixes"):
self.logger.log("debug_no_fix", "LLM provided no actionable fixes", "warn")
result["errors"].append("No fix suggested by LLM")
self.failed_approaches.append(f"No fix for: {errors[0][:100]}")
return result
diagnosis = fix.get("diagnosis", "unknown")
strategy = fix.get("strategy", "")
self.logger.log("diagnosis", diagnosis)
if strategy:
self.logger.log("strategy", strategy)
# Apply fixes
for f in fix["fixes"]:
self._apply_fix(f)
# Verify
verify_ok = True
for cmd in fix.get("commands", []):
try:
self.sandbox.validate_command(cmd)
proc = subprocess.run(
cmd, shell=True, capture_output=True, text=True,
timeout=config.COMPILE_TIMEOUT, cwd=self.workdir,
)
result["output"] += proc.stdout
if proc.returncode != 0:
result["errors"].append(
f"Verification '{cmd}' failed:\n{proc.stderr}"
)
verify_ok = False
self.logger.log("fix_verify_fail", proc.stderr[:300], "error")
except Exception as e:
result["errors"].append(str(e))
verify_ok = False
if not verify_ok:
self.failed_approaches.append(
f"Attempt {self.attempt_count}: {diagnosis[:100]} — verification failed"
)
self._rollback_files(step.get("files", []))
self.logger.log("rollback", "Fix didn't pass verification, rolled back", "warn")
return result
result["fixed"] = True
self.attempt_count = 0
self.failed_approaches.clear()
self.logger.log("debug_fixed", diagnosis)
except Exception as e:
result["errors"].append(str(e))
self.logger.log("debug_error", str(e), "error")
self._rollback_files(step.get("files", []))
self.failed_approaches.append(f"Exception: {str(e)[:100]}")
return result
def _build_debug_prompt(self, errors: list[str], step: dict, plan: dict) -> str:
parts = [
f"## Error(s)\n" + "\n---\n".join(errors[:5]),
f"\n## Failed step\n{step.get('description', '')}",
f"\n## Project: {plan.get('language', 'unknown')}{plan.get('summary', '')}",
]
# Include what we already tried so the LLM doesn't repeat
if self.failed_approaches:
parts.append(
"\n## PREVIOUS FAILED APPROACHES (DO NOT REPEAT THESE):\n" +
"\n".join(f"- {a}" for a in self.failed_approaches[-5:])
)
# Include relevant source files
for fpath in step.get("files", [])[:5]:
full = os.path.join(self.workdir, fpath)
if os.path.isfile(full):
try:
with open(full, "r") as f:
content = f.read()
parts.append(f"\n## Current content of {fpath}\n{content[:4000]}")
except (IOError, UnicodeDecodeError):
pass
# Also check for related files that might be causing the issue
# (e.g., header files referenced in error messages)
for err in errors[:3]:
for word in err.split():
if "." in word and "/" in word:
# Looks like a file path in the error
candidate = word.strip("':\"(),")
full = os.path.join(self.workdir, candidate)
if os.path.isfile(full) and candidate not in step.get("files", []):
try:
with open(full, "r") as f:
content = f.read()
parts.append(f"\n## Related file {candidate}\n{content[:2000]}")
except (IOError, UnicodeDecodeError):
pass
return "\n".join(parts)
def _apply_fix(self, fix: dict):
fpath = fix.get("file", "")
action = fix.get("action", "replace")
if not fpath:
return
# Never delete AutoDev state files
protected = {"worklog.json", "plan.json", "dependency.txt",
".autodev_state.json", "description.txt"}
basename = os.path.basename(fpath)
if action == "delete" and basename in protected:
self.logger.log("fix_skip", f"Refusing to delete protected file: {fpath}", "warn")
return
if action == "replace" and fix.get("full_content"):
self.sandbox.safe_write(fpath, fix["full_content"])
self.logger.log("fix_applied", f"Replaced {fpath}: {fix.get('description', '')[:100]}")
elif action == "delete":
full = self.sandbox.validate_path(fpath)
if os.path.exists(full):
os.remove(full)
self.logger.log("fix_applied", f"Deleted {fpath}")
elif action == "create" and fix.get("full_content"):
self.sandbox.safe_write(fpath, fix["full_content"])
self.logger.log("fix_applied", f"Created {fpath}")
def _backup_files(self, files: list[str]):
os.makedirs(self.backup_dir, exist_ok=True)
for fpath in files:
full = os.path.join(self.workdir, fpath)
if os.path.isfile(full):
backup = os.path.join(self.backup_dir, fpath.replace("/", "__"))
shutil.copy2(full, backup)
def _rollback_files(self, files: list[str]):
for fpath in files:
backup = os.path.join(self.backup_dir, fpath.replace("/", "__"))
if os.path.isfile(backup):
full = os.path.join(self.workdir, fpath)
os.makedirs(os.path.dirname(full), exist_ok=True)
shutil.copy2(backup, full)
self.logger.log("rollback", f"Restored {fpath}", "warn")
def _parse_fix(self, response: str) -> dict:
text = response.strip()
for candidate in self._extract_json_candidates(text):
try:
result = json.loads(candidate)
if isinstance(result, dict):
return result
except json.JSONDecodeError:
continue
return {"diagnosis": "Could not parse fix response", "fixes": []}
def _extract_json_candidates(self, text: str) -> list[str]:
candidates = []
if "```json" in text:
candidates.append(text.split("```json", 1)[1].split("```", 1)[0].strip())
candidates.append(text)
start = text.find("{")
if start >= 0:
depth = 0
for i in range(start, len(text)):
if text[i] == "{":
depth += 1
elif text[i] == "}":
depth -= 1
if depth == 0:
candidates.append(text[start:i + 1])
break
return candidates