129 lines
4.8 KiB
Python
129 lines
4.8 KiB
Python
"""
|
|
AutoDev - Context Window Manager
|
|
Manages local context with token-aware pruning, relevance scoring,
|
|
and semantic cycle/hallucination detection.
|
|
"""
|
|
|
|
import hashlib
|
|
import difflib
|
|
from . import config
|
|
|
|
|
|
def estimate_tokens(text: str) -> int:
|
|
"""Estimate token count from character length."""
|
|
return max(1, int(len(text) / config.TOKEN_CHAR_RATIO))
|
|
|
|
|
|
class ContextManager:
|
|
def __init__(self, max_tokens: int = None):
|
|
self.max_tokens = max_tokens or config.MAX_CONTEXT_TOKENS
|
|
self.entries: list[dict] = [] # {role, content, priority, hash, tokens}
|
|
self._recent_contents: list[str] = []
|
|
self._recent_hashes: list[str] = []
|
|
|
|
def add(self, role: str, content: str, priority: int = 5):
|
|
h = hashlib.md5(content.encode()).hexdigest()[:16]
|
|
tokens = estimate_tokens(content)
|
|
self.entries.append({
|
|
"role": role,
|
|
"content": content,
|
|
"priority": priority,
|
|
"hash": h,
|
|
"tokens": tokens,
|
|
})
|
|
self._recent_hashes.append(h)
|
|
self._recent_contents.append(content[:500])
|
|
self._prune()
|
|
|
|
def _prune(self):
|
|
total = sum(e["tokens"] for e in self.entries)
|
|
while total > self.max_tokens and len(self.entries) > 2:
|
|
# Never prune the last entry or system-level entries (priority >= 9)
|
|
candidates = [(i, e) for i, e in enumerate(self.entries[:-1]) if e["priority"] < 9]
|
|
if not candidates:
|
|
break
|
|
# Remove lowest priority, oldest first
|
|
candidates.sort(key=lambda x: (x[1]["priority"], -x[0]))
|
|
idx = candidates[0][0]
|
|
total -= self.entries[idx]["tokens"]
|
|
self.entries.pop(idx)
|
|
|
|
def detect_cycle(self) -> bool:
|
|
"""Detect both exact repetition and semantic similarity loops."""
|
|
window = config.CYCLE_DETECTION_WINDOW
|
|
if len(self._recent_hashes) < 3:
|
|
return False
|
|
|
|
recent = self._recent_hashes[-window:]
|
|
# Exact hash repetition
|
|
unique = set(recent)
|
|
if len(unique) <= max(1, len(recent) // 3):
|
|
return True
|
|
|
|
# Semantic similarity: check if recent LLM outputs are too similar
|
|
contents = self._recent_contents[-window:]
|
|
if len(contents) >= 3:
|
|
similarities = []
|
|
for i in range(len(contents) - 1):
|
|
ratio = difflib.SequenceMatcher(None, contents[i], contents[i + 1]).ratio()
|
|
similarities.append(ratio)
|
|
# If average similarity > 0.8, we're likely in a loop
|
|
if similarities and sum(similarities) / len(similarities) > 0.8:
|
|
return True
|
|
|
|
return False
|
|
|
|
def clear_stale(self):
|
|
"""Aggressively clear low-value entries when cycles detected."""
|
|
keep = [e for e in self.entries if e["priority"] >= 8]
|
|
if not keep:
|
|
keep = self.entries[-2:]
|
|
self.entries = keep
|
|
self._recent_hashes = self._recent_hashes[-2:]
|
|
self._recent_contents = self._recent_contents[-2:]
|
|
|
|
def get_relevant_context(self, query: str, max_entries: int = 5) -> list[dict]:
|
|
"""Select entries most relevant to the current query using keyword overlap."""
|
|
query_words = set(query.lower().split())
|
|
scored = []
|
|
for e in self.entries:
|
|
content_words = set(e["content"].lower().split()[:200])
|
|
overlap = len(query_words & content_words)
|
|
scored.append((overlap + e["priority"], e))
|
|
scored.sort(key=lambda x: x[0], reverse=True)
|
|
return [e for _, e in scored[:max_entries]]
|
|
|
|
def build_messages(self, system_prompt: str = "") -> list[dict]:
|
|
msgs = []
|
|
if system_prompt:
|
|
msgs.append({"role": "system", "content": system_prompt})
|
|
for e in self.entries:
|
|
msgs.append({"role": e["role"], "content": e["content"]})
|
|
return msgs
|
|
|
|
def build_focused_messages(self, system_prompt: str, query: str,
|
|
max_context_tokens: int = None) -> list[dict]:
|
|
"""Build messages with only the most relevant context entries."""
|
|
budget = max_context_tokens or (self.max_tokens // 2)
|
|
msgs = []
|
|
if system_prompt:
|
|
msgs.append({"role": "system", "content": system_prompt})
|
|
budget -= estimate_tokens(system_prompt)
|
|
|
|
relevant = self.get_relevant_context(query)
|
|
for e in relevant:
|
|
if budget - e["tokens"] < 0:
|
|
break
|
|
msgs.append({"role": e["role"], "content": e["content"]})
|
|
budget -= e["tokens"]
|
|
return msgs
|
|
|
|
def token_usage(self) -> dict:
|
|
total = sum(e["tokens"] for e in self.entries)
|
|
return {
|
|
"entries": len(self.entries),
|
|
"tokens_used": total,
|
|
"tokens_max": self.max_tokens,
|
|
"utilization": f"{total / self.max_tokens * 100:.0f}%",
|
|
}
|