273 lines
11 KiB
Python
273 lines
11 KiB
Python
"""
|
|
AutoDev - Debugger
|
|
Analyzes errors, generates fixes, detects cycles, manages rollback.
|
|
Approaches debugging like a senior engineer — reason about root cause,
|
|
never repeat a failed approach, escalate when stuck.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
from .llm import LLM
|
|
from .logger import Logger
|
|
from .context import ContextManager
|
|
from .sandbox import Sandbox
|
|
from . import config
|
|
|
|
DEBUG_SYSTEM = config.EXPERT_IDENTITY + """
|
|
|
|
You are now in DEBUG mode. An error occurred during development. Your job is to:
|
|
|
|
1. DIAGNOSE: What is the root cause? Not the symptom — the actual cause.
|
|
2. REASON: Why did the previous code produce this error? What assumption was wrong?
|
|
3. FIX: Provide a concrete fix that addresses the root cause.
|
|
|
|
CRITICAL RULES:
|
|
- If you already tried a fix and it didn't work, you MUST try a DIFFERENT approach.
|
|
- Never repeat the same fix. If the same error keeps occurring, the problem is architectural.
|
|
- Consider: wrong imports, missing dependencies, wrong API usage, type mismatches, missing files.
|
|
- For compilation errors: check every include/import, every function signature, every type.
|
|
- For runtime errors: check null/nil handling, array bounds, file paths, permissions.
|
|
|
|
Output ONLY valid JSON:
|
|
{
|
|
"diagnosis": "root cause analysis — what exactly went wrong and why",
|
|
"previous_approach_wrong_because": "why the previous attempt failed (if applicable)",
|
|
"strategy": "what different approach we're taking this time",
|
|
"fixes": [
|
|
{
|
|
"file": "path/to/file",
|
|
"action": "replace",
|
|
"description": "what changed and why",
|
|
"full_content": "COMPLETE new file content"
|
|
}
|
|
],
|
|
"commands": ["verification commands to confirm the fix works"]
|
|
}
|
|
|
|
The 'full_content' field must contain the ENTIRE file, not just the changed lines.
|
|
Start your response with { and end with }.
|
|
"""
|
|
|
|
|
|
class Debugger:
|
|
def __init__(self, llm: LLM, logger: Logger, ctx: ContextManager,
|
|
sandbox: Sandbox, workdir: str):
|
|
self.llm = llm
|
|
self.logger = logger
|
|
self.ctx = ctx
|
|
self.sandbox = sandbox
|
|
self.workdir = workdir
|
|
self.attempt_count = 0
|
|
self.backup_dir = os.path.join(workdir, ".autodev_backups")
|
|
self.failed_approaches: list[str] = [] # Track what we already tried
|
|
|
|
def debug_errors(self, errors: list[str], step: dict, plan: dict) -> dict:
|
|
"""Attempt to fix errors. Returns {fixed, output, errors}."""
|
|
self.attempt_count += 1
|
|
result = {"fixed": False, "output": "", "errors": []}
|
|
|
|
if self.attempt_count > config.MAX_DEBUG_CYCLES:
|
|
self.logger.log("debug_halt",
|
|
f"Max debug cycles ({config.MAX_DEBUG_CYCLES}) reached. "
|
|
"This likely requires manual intervention or better documentation.",
|
|
"error")
|
|
result["errors"].append(
|
|
"Maximum debug attempts exceeded. The issue may be architectural. "
|
|
"Please review the worklog and provide additional documentation."
|
|
)
|
|
return result
|
|
|
|
# Check for cycles
|
|
if self.ctx.detect_cycle():
|
|
self.logger.log("cycle_detected",
|
|
"Detected repetitive pattern. Clearing context and trying fresh approach.",
|
|
"warn")
|
|
self.ctx.clear_stale()
|
|
self.failed_approaches.append("CYCLE: All recent approaches were too similar")
|
|
|
|
# Backup current state
|
|
self._backup_files(step.get("files", []))
|
|
|
|
prompt = self._build_debug_prompt(errors, step, plan)
|
|
self.ctx.add("user", prompt, priority=8)
|
|
|
|
self.logger.log("debug_attempt",
|
|
f"Attempt {self.attempt_count}/{config.MAX_DEBUG_CYCLES}")
|
|
|
|
try:
|
|
response = self.llm.query(prompt, system=DEBUG_SYSTEM, temperature=0.3)
|
|
self.ctx.add("assistant", response[:1500], priority=6)
|
|
fix = self._parse_fix(response)
|
|
|
|
if not fix.get("fixes"):
|
|
self.logger.log("debug_no_fix", "LLM provided no actionable fixes", "warn")
|
|
result["errors"].append("No fix suggested by LLM")
|
|
self.failed_approaches.append(f"No fix for: {errors[0][:100]}")
|
|
return result
|
|
|
|
diagnosis = fix.get("diagnosis", "unknown")
|
|
strategy = fix.get("strategy", "")
|
|
self.logger.log("diagnosis", diagnosis)
|
|
if strategy:
|
|
self.logger.log("strategy", strategy)
|
|
|
|
# Apply fixes
|
|
for f in fix["fixes"]:
|
|
self._apply_fix(f)
|
|
|
|
# Verify
|
|
verify_ok = True
|
|
for cmd in fix.get("commands", []):
|
|
try:
|
|
self.sandbox.validate_command(cmd)
|
|
proc = subprocess.run(
|
|
cmd, shell=True, capture_output=True, text=True,
|
|
timeout=config.COMPILE_TIMEOUT, cwd=self.workdir,
|
|
)
|
|
result["output"] += proc.stdout
|
|
if proc.returncode != 0:
|
|
result["errors"].append(
|
|
f"Verification '{cmd}' failed:\n{proc.stderr}"
|
|
)
|
|
verify_ok = False
|
|
self.logger.log("fix_verify_fail", proc.stderr[:300], "error")
|
|
except Exception as e:
|
|
result["errors"].append(str(e))
|
|
verify_ok = False
|
|
|
|
if not verify_ok:
|
|
self.failed_approaches.append(
|
|
f"Attempt {self.attempt_count}: {diagnosis[:100]} — verification failed"
|
|
)
|
|
self._rollback_files(step.get("files", []))
|
|
self.logger.log("rollback", "Fix didn't pass verification, rolled back", "warn")
|
|
return result
|
|
|
|
result["fixed"] = True
|
|
self.attempt_count = 0
|
|
self.failed_approaches.clear()
|
|
self.logger.log("debug_fixed", diagnosis)
|
|
|
|
except Exception as e:
|
|
result["errors"].append(str(e))
|
|
self.logger.log("debug_error", str(e), "error")
|
|
self._rollback_files(step.get("files", []))
|
|
self.failed_approaches.append(f"Exception: {str(e)[:100]}")
|
|
|
|
return result
|
|
|
|
def _build_debug_prompt(self, errors: list[str], step: dict, plan: dict) -> str:
|
|
parts = [
|
|
f"## Error(s)\n" + "\n---\n".join(errors[:5]),
|
|
f"\n## Failed step\n{step.get('description', '')}",
|
|
f"\n## Project: {plan.get('language', 'unknown')} — {plan.get('summary', '')}",
|
|
]
|
|
|
|
# Include what we already tried so the LLM doesn't repeat
|
|
if self.failed_approaches:
|
|
parts.append(
|
|
"\n## PREVIOUS FAILED APPROACHES (DO NOT REPEAT THESE):\n" +
|
|
"\n".join(f"- {a}" for a in self.failed_approaches[-5:])
|
|
)
|
|
|
|
# Include relevant source files
|
|
for fpath in step.get("files", [])[:5]:
|
|
full = os.path.join(self.workdir, fpath)
|
|
if os.path.isfile(full):
|
|
try:
|
|
with open(full, "r") as f:
|
|
content = f.read()
|
|
parts.append(f"\n## Current content of {fpath}\n{content[:4000]}")
|
|
except (IOError, UnicodeDecodeError):
|
|
pass
|
|
|
|
# Also check for related files that might be causing the issue
|
|
# (e.g., header files referenced in error messages)
|
|
for err in errors[:3]:
|
|
for word in err.split():
|
|
if "." in word and "/" in word:
|
|
# Looks like a file path in the error
|
|
candidate = word.strip("':\"(),")
|
|
full = os.path.join(self.workdir, candidate)
|
|
if os.path.isfile(full) and candidate not in step.get("files", []):
|
|
try:
|
|
with open(full, "r") as f:
|
|
content = f.read()
|
|
parts.append(f"\n## Related file {candidate}\n{content[:2000]}")
|
|
except (IOError, UnicodeDecodeError):
|
|
pass
|
|
|
|
return "\n".join(parts)
|
|
|
|
def _apply_fix(self, fix: dict):
|
|
fpath = fix.get("file", "")
|
|
action = fix.get("action", "replace")
|
|
if not fpath:
|
|
return
|
|
# Never delete AutoDev state files
|
|
protected = {"worklog.json", "plan.json", "dependency.txt",
|
|
".autodev_state.json", "description.txt"}
|
|
basename = os.path.basename(fpath)
|
|
if action == "delete" and basename in protected:
|
|
self.logger.log("fix_skip", f"Refusing to delete protected file: {fpath}", "warn")
|
|
return
|
|
if action == "replace" and fix.get("full_content"):
|
|
self.sandbox.safe_write(fpath, fix["full_content"])
|
|
self.logger.log("fix_applied", f"Replaced {fpath}: {fix.get('description', '')[:100]}")
|
|
elif action == "delete":
|
|
full = self.sandbox.validate_path(fpath)
|
|
if os.path.exists(full):
|
|
os.remove(full)
|
|
self.logger.log("fix_applied", f"Deleted {fpath}")
|
|
elif action == "create" and fix.get("full_content"):
|
|
self.sandbox.safe_write(fpath, fix["full_content"])
|
|
self.logger.log("fix_applied", f"Created {fpath}")
|
|
|
|
def _backup_files(self, files: list[str]):
|
|
os.makedirs(self.backup_dir, exist_ok=True)
|
|
for fpath in files:
|
|
full = os.path.join(self.workdir, fpath)
|
|
if os.path.isfile(full):
|
|
backup = os.path.join(self.backup_dir, fpath.replace("/", "__"))
|
|
shutil.copy2(full, backup)
|
|
|
|
def _rollback_files(self, files: list[str]):
|
|
for fpath in files:
|
|
backup = os.path.join(self.backup_dir, fpath.replace("/", "__"))
|
|
if os.path.isfile(backup):
|
|
full = os.path.join(self.workdir, fpath)
|
|
os.makedirs(os.path.dirname(full), exist_ok=True)
|
|
shutil.copy2(backup, full)
|
|
self.logger.log("rollback", f"Restored {fpath}", "warn")
|
|
|
|
def _parse_fix(self, response: str) -> dict:
|
|
text = response.strip()
|
|
for candidate in self._extract_json_candidates(text):
|
|
try:
|
|
result = json.loads(candidate)
|
|
if isinstance(result, dict):
|
|
return result
|
|
except json.JSONDecodeError:
|
|
continue
|
|
return {"diagnosis": "Could not parse fix response", "fixes": []}
|
|
|
|
def _extract_json_candidates(self, text: str) -> list[str]:
|
|
candidates = []
|
|
if "```json" in text:
|
|
candidates.append(text.split("```json", 1)[1].split("```", 1)[0].strip())
|
|
candidates.append(text)
|
|
start = text.find("{")
|
|
if start >= 0:
|
|
depth = 0
|
|
for i in range(start, len(text)):
|
|
if text[i] == "{":
|
|
depth += 1
|
|
elif text[i] == "}":
|
|
depth -= 1
|
|
if depth == 0:
|
|
candidates.append(text[start:i + 1])
|
|
break
|
|
return candidates
|