1515 lines
68 KiB
Python
Executable File
1515 lines
68 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
# vedit, a test client to vibe code. - anders@holck.se March 2026
|
|
|
|
"""Vibe - A simple vibe coding tool bridging Ollama LLM and bash shell."""
|
|
|
|
import curses
|
|
import difflib
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
import re
|
|
import requests
|
|
import sys
|
|
import unicodedata
|
|
from datetime import datetime
|
|
from urllib.parse import urlparse
|
|
|
|
CONFIG_FILE = os.path.expanduser("~/.ahvibe.conf")
|
|
REQUIRED_KEYS = ["ollama_host", "user_name"]
|
|
OPTIONAL_KEYS = ["model", "auth_key", "context_size", "backend"]
|
|
PROMPTS = {
|
|
"user_name": "Your name: ",
|
|
"ollama_host": "Server host URL (e.g. http://localhost:11434): ",
|
|
"model": "Model name (e.g. qwen2.5-coder:14b-instruct): ",
|
|
"auth_key": "Auth key (optional, press Enter to skip): ",
|
|
"backend": "Backend (ollama or vllm, default ollama): ",
|
|
}
|
|
|
|
def _load_or_create_config():
|
|
"""Load config from ~/.ahvibe.conf, prompting for any missing values."""
|
|
conf = {}
|
|
if os.path.exists(CONFIG_FILE):
|
|
with open(CONFIG_FILE) as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if "=" in line and not line.startswith("#"):
|
|
k, v = line.split("=", 1)
|
|
conf[k.strip()] = v.strip()
|
|
# Default context_size silently
|
|
if "context_size" not in conf:
|
|
conf["context_size"] = "16384"
|
|
if "backend" not in conf:
|
|
conf["backend"] = "ollama"
|
|
missing = [k for k in REQUIRED_KEYS if not conf.get(k)]
|
|
missing += [k for k in OPTIONAL_KEYS if k not in conf and k not in ("context_size", "model", "backend")]
|
|
if missing:
|
|
try:
|
|
print(f"Config: {CONFIG_FILE}")
|
|
print()
|
|
for k in missing:
|
|
val = input(PROMPTS[k]).strip()
|
|
if k in REQUIRED_KEYS and not val:
|
|
print(f"Error: {k} is required.")
|
|
sys.exit(1)
|
|
conf[k] = val
|
|
except (KeyboardInterrupt, EOFError):
|
|
print("\nBye.")
|
|
sys.exit(0)
|
|
with open(CONFIG_FILE, "w") as f:
|
|
for k in REQUIRED_KEYS + OPTIONAL_KEYS:
|
|
if k in conf:
|
|
f.write(f"{k}={conf[k]}\n")
|
|
print(f"Saved to {CONFIG_FILE}\n")
|
|
backend = conf.get("backend", "ollama").lower()
|
|
host = conf["ollama_host"].rstrip("/")
|
|
headers = {}
|
|
if conf.get("auth_key"):
|
|
headers["Authorization"] = f"Bearer {conf['auth_key']}"
|
|
# Validate connection
|
|
try:
|
|
if backend == "vllm":
|
|
r = requests.get(host + "/v1/models", headers=headers, timeout=10)
|
|
else:
|
|
r = requests.get(host + "/api/tags", headers=headers, timeout=10)
|
|
if r.status_code == 401:
|
|
print("Error: Authentication failed. Check your auth key.")
|
|
sys.exit(1)
|
|
r.raise_for_status()
|
|
if backend == "vllm":
|
|
models = [m["id"] for m in r.json().get("data", [])]
|
|
else:
|
|
models = [m["name"] for m in r.json().get("models", [])]
|
|
if conf.get("model") and conf["model"] not in models:
|
|
print(f"Warning: Model '{conf['model']}' not found on server.")
|
|
print(f"Available: {', '.join(models) if models else 'none'}")
|
|
conf["model"] = ""
|
|
except requests.ConnectionError:
|
|
print(f"Error: Cannot connect to {host}")
|
|
sys.exit(1)
|
|
except SystemExit:
|
|
raise
|
|
except Exception as e:
|
|
print(f"Error connecting to server: {e}")
|
|
sys.exit(1)
|
|
if backend == "vllm":
|
|
chat_url = host + "/v1/chat/completions"
|
|
else:
|
|
chat_url = host + "/api/chat"
|
|
return chat_url, conf.get("model", ""), conf["user_name"], headers, int(conf["context_size"]), backend
|
|
|
|
OLLAMA_URL, MODEL, USER_NAME, AUTH_HEADERS, CONTEXT_SIZE, BACKEND = _load_or_create_config()
|
|
_h = urlparse(OLLAMA_URL).hostname
|
|
HOST_SHORT = _h if _h[0].isdigit() else _h.split(".")[0]
|
|
WORKLOG_FILE = "worklog.md"
|
|
DESCRIPTION_FILE = "description.md"
|
|
AI_HISTORY_FILE = ".ai-history"
|
|
AI_HISTORY_MAX = 50
|
|
INPUT_HISTORY_FILE = ".input-history"
|
|
INPUT_HISTORY_MAX = 200
|
|
MAX_CONTEXT_MESSAGES = 40 # keep last N messages before summarizing
|
|
DANGEROUS_PATTERNS = re.compile(
|
|
r"\brm\s+(-[a-z]*f|-[a-z]*r|--force|--recursive)|\bdd\b.*of=/|"
|
|
r"\bmkfs\b|\bformat\b|\b>\s*/dev/|:>\s*/|chmod\s+-R\s+777|"
|
|
r"\bsudo\b|\breboot\b|\bshutdown\b|\bsystemctl\b",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
SPINNER_FRAMES = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]
|
|
|
|
SYSTEM_PROMPT = f"""You are a vibe coding assistant. The user's name is {USER_NAME}. Use their name occasionally in conversation but NEVER embed it in code, filenames, or output unless explicitly asked.
|
|
|
|
You help {USER_NAME} by writing code and running shell commands.
|
|
|
|
TOOLS — use fenced code blocks with these tags:
|
|
|
|
bash — run a shell command, example:
|
|
```bash
|
|
ls -la
|
|
```
|
|
|
|
write:<filename> — create/overwrite a file, example:
|
|
```write:hello.c
|
|
#include <stdio.h>
|
|
int main() {{ printf("Hello\\n"); return 0; }}
|
|
```
|
|
|
|
edit:<filename> — search/replace in a file, example:
|
|
```edit:hello.c
|
|
<<<SEARCH
|
|
printf("Hello\\n");
|
|
===
|
|
printf("Hi there\\n");
|
|
>>>
|
|
```
|
|
|
|
memory:<key> — store a note, forget:<key> — clear a note.
|
|
|
|
read:<filename> — read a file's contents, example:
|
|
```read:main.c
|
|
```
|
|
|
|
IMPORTANT: Replace the examples above with ACTUAL commands and code for the task. Never output placeholder text like "command here" or "file content here".
|
|
Only use tools (bash, write, edit, memory) when the user asks you to create, modify, or run something. For general questions, just answer in plain text — do NOT create files or memory entries for conversational replies.
|
|
|
|
BEHAVIOR:
|
|
- Do ONLY what is asked. Nothing more.
|
|
- Do NOT compile, run, test, or launch emulators (qemu, etc.) unless explicitly asked.
|
|
- Do NOT add extra steps beyond the request (no ls, cd, descriptions, verification).
|
|
- If a description.md exists, follow it — but only do what it says, not more.
|
|
- For small changes, prefer ```edit:filename``` over rewriting the whole file.
|
|
- If a command fails and the user asked you to run it, read the error output carefully, fix the code, and retry until it works.
|
|
- If a program runs but produces wrong output, analyze the output and fix the code.
|
|
- cd commands persist between bash blocks. File writes also respect the current directory.
|
|
- CRITICAL: All commands, file writes, edits, and memory ops MUST be inside ```fenced code blocks```. Never output them as plain text.
|
|
|
|
OUTPUT:
|
|
- Be EXTREMELY concise. No filler, no preamble.
|
|
- Do NOT repeat code you just wrote — the user sees it in the work window.
|
|
- ONE LINE summary after completing a task. Nothing more.
|
|
- If there's nothing to add, say nothing."""
|
|
|
|
|
|
def _dw(s):
|
|
"""Display width of a string, accounting for wide chars (emoji etc)."""
|
|
w = 0
|
|
for c in s:
|
|
w += 2 if unicodedata.east_asian_width(c) in ('W', 'F') else 1
|
|
return w
|
|
|
|
def _wrap_dw(s, maxw):
|
|
"""Word-wrap string by display width, returns list of lines."""
|
|
if _dw(s) <= maxw:
|
|
return [s]
|
|
lines = []
|
|
cur = ""
|
|
cw = 0
|
|
for word in re.split(r'(\s+)', s):
|
|
ww = _dw(word)
|
|
if cw + ww <= maxw:
|
|
cur += word
|
|
cw += ww
|
|
elif ww > maxw:
|
|
# Word itself is wider than maxw — character-wrap it
|
|
for c in word:
|
|
cw2 = 2 if unicodedata.east_asian_width(c) in ('W', 'F') else 1
|
|
if cw + cw2 > maxw:
|
|
lines.append(cur)
|
|
cur = c
|
|
cw = cw2
|
|
else:
|
|
cur += c
|
|
cw += cw2
|
|
else:
|
|
if cur:
|
|
lines.append(cur)
|
|
cur = word.lstrip() if not lines else word
|
|
cw = _dw(cur)
|
|
if cur:
|
|
lines.append(cur)
|
|
return lines if lines else [""]
|
|
|
|
class Vibe:
|
|
def __init__(self, stdscr):
|
|
self.stdscr = stdscr
|
|
self.messages = [{"role": "system", "content": SYSTEM_PROMPT}]
|
|
self.work_lines = []
|
|
self.chat_input = ""
|
|
self.chat_history = []
|
|
self.busy = False
|
|
self.stop_flag = False
|
|
self.spinner_idx = 0
|
|
self.memory = {}
|
|
self.ai_history = []
|
|
self.recent_files = []
|
|
self.lock = threading.Lock()
|
|
self.scroll_offset = 0 # 0 = bottom (latest), >0 = scrolled up
|
|
self.yolo_mode = False # skip dangerous command confirmation
|
|
self.pending_cmd = None # command awaiting confirmation
|
|
self.multiline_mode = False
|
|
self.multiline_buf = []
|
|
self.code_mode = True # True = code mode, False = chat mode
|
|
self.chat_messages = [] # separate message history for chat mode
|
|
self.cursor_pos = 0 # cursor position within chat_input
|
|
self.code_input_history = self._load_input_history("code")
|
|
self.chat_input_history = self._load_input_history("chat")
|
|
self.history_idx = -1 # -1 = not browsing history
|
|
self.saved_input = "" # saved current input when browsing history
|
|
self.cwd = os.getcwd() # persistent working directory for commands
|
|
self.root_dir = self.cwd # sandbox — never allow navigating above this
|
|
self._last_backup = None # (filename, content) for /undo
|
|
|
|
curses.curs_set(1)
|
|
curses.start_color()
|
|
curses.use_default_colors()
|
|
curses.init_pair(1, curses.COLOR_GREEN, -1)
|
|
curses.init_pair(2, curses.COLOR_CYAN, -1)
|
|
curses.init_pair(3, curses.COLOR_YELLOW, -1)
|
|
curses.init_pair(4, curses.COLOR_RED, -1)
|
|
curses.init_pair(5, curses.COLOR_BLACK, curses.COLOR_YELLOW)
|
|
curses.init_pair(6, curses.COLOR_WHITE, curses.COLOR_BLUE)
|
|
curses.init_pair(7, curses.COLOR_MAGENTA, -1) # for warnings
|
|
self.stdscr.nodelay(False)
|
|
self.stdscr.timeout(100)
|
|
try:
|
|
curses.mousemask(curses.ALL_MOUSE_EVENTS)
|
|
except Exception:
|
|
pass
|
|
self.ollama_status = "" # GPU/CPU status string
|
|
self._start_status_poller()
|
|
|
|
self._load_project_context()
|
|
self._load_worklog()
|
|
|
|
def _start_status_poller(self):
|
|
def poll():
|
|
host = OLLAMA_URL.rsplit("/api/chat", 1)[0] if BACKEND != "vllm" else OLLAMA_URL.rsplit("/v1/chat/completions", 1)[0]
|
|
while True:
|
|
try:
|
|
if BACKEND == "vllm":
|
|
r = requests.get(host + "/v1/models", headers=AUTH_HEADERS, timeout=5)
|
|
if r.status_code == 200:
|
|
self.ollama_status = "online"
|
|
else:
|
|
self.ollama_status = "error"
|
|
else:
|
|
r = requests.get(host + "/api/ps", headers=AUTH_HEADERS, timeout=5)
|
|
models = r.json().get("models", [])
|
|
if models:
|
|
parts = []
|
|
for m in models:
|
|
total = m.get("size", 1)
|
|
vram = m.get("size_vram", 0)
|
|
gpu_pct = int(vram * 100 / total) if total else 0
|
|
cpu_pct = 100 - gpu_pct
|
|
parts.append(f"GPU:{gpu_pct}% CPU:{cpu_pct}%")
|
|
self.ollama_status = " | ".join(parts)
|
|
else:
|
|
self.ollama_status = "idle"
|
|
except Exception:
|
|
self.ollama_status = "offline"
|
|
time.sleep(15)
|
|
t = threading.Thread(target=poll, daemon=True)
|
|
t.start()
|
|
|
|
def _load_file_context(self, filepath):
|
|
"""Load a file as project context for the AI."""
|
|
if not os.path.isabs(filepath):
|
|
filepath = os.path.join(self.cwd, filepath)
|
|
try:
|
|
with open(filepath) as f:
|
|
content = f.read().strip()
|
|
self.messages.append(
|
|
{"role": "system", "content": f"Contents of {filepath}:\n{content}"}
|
|
)
|
|
self._add_work(f"📋 Loaded '{filepath}' as context")
|
|
except Exception as e:
|
|
self._add_work(f"❌ Could not read '{filepath}': {e}")
|
|
|
|
def _load_project_context(self):
|
|
"""Auto-scan project tree and load description file on startup."""
|
|
# Ensure .gitignore excludes session files
|
|
gi_entries = {".ai-history", ".input-history"}
|
|
gi_path = os.path.join(self.root_dir, ".gitignore")
|
|
existing = set()
|
|
if os.path.exists(gi_path):
|
|
with open(gi_path) as f:
|
|
existing = set(l.strip() for l in f)
|
|
missing = gi_entries - existing
|
|
if missing:
|
|
with open(gi_path, "a") as f:
|
|
for entry in sorted(missing):
|
|
f.write(f"{entry}\n")
|
|
# Load description file if present
|
|
if os.path.exists(DESCRIPTION_FILE):
|
|
try:
|
|
with open(DESCRIPTION_FILE) as f:
|
|
desc = f.read().strip()
|
|
if desc:
|
|
self.messages.append(
|
|
{"role": "system", "content": f"Project description:\n{desc}"}
|
|
)
|
|
self._add_work(f"📋 Loaded project description")
|
|
except Exception:
|
|
pass
|
|
else:
|
|
self._add_work(f"💡 Tip: Create '{DESCRIPTION_FILE}' to describe your project for the AI")
|
|
# Build a compact file tree (max 100 entries, skip hidden/build dirs)
|
|
skip = {'.git', '__pycache__', 'node_modules', '.venv', 'venv', 'build', 'dist', '.ai-history'}
|
|
tree = []
|
|
for root, dirs, files in os.walk("."):
|
|
dirs[:] = [d for d in dirs if d not in skip and not d.startswith('.')]
|
|
depth = root.count(os.sep)
|
|
if depth > 3:
|
|
dirs.clear()
|
|
continue
|
|
for f in files:
|
|
path = os.path.join(root, f)
|
|
if len(tree) >= 100:
|
|
break
|
|
tree.append(path[2:] if path.startswith("./") else path)
|
|
if len(tree) >= 100:
|
|
break
|
|
if tree:
|
|
self.messages.append(
|
|
{"role": "system", "content": "Project files:\n" + "\n".join(tree)}
|
|
)
|
|
self._add_work(f"📂 Scanned project: {len(tree)} files")
|
|
|
|
def _trim_context(self):
|
|
"""Keep message list manageable by summarizing old messages."""
|
|
# Count non-system messages
|
|
non_sys = [(i, m) for i, m in enumerate(self.messages) if m["role"] != "system"]
|
|
if len(non_sys) <= MAX_CONTEXT_MESSAGES:
|
|
return
|
|
# Summarize the oldest half of non-system messages
|
|
cut = len(non_sys) // 2
|
|
to_remove = non_sys[:cut]
|
|
summary_parts = []
|
|
for _, m in to_remove:
|
|
role = m["role"]
|
|
text = m["content"][:150]
|
|
summary_parts.append(f"{role}: {text}")
|
|
summary = "Conversation summary (older messages):\n" + "\n".join(summary_parts)
|
|
# Remove old messages and insert summary after system messages
|
|
indices_to_remove = set(i for i, _ in to_remove)
|
|
self.messages = [m for i, m in enumerate(self.messages) if i not in indices_to_remove]
|
|
# Find insertion point (after last system message)
|
|
insert_at = 0
|
|
for i, m in enumerate(self.messages):
|
|
if m["role"] == "system":
|
|
insert_at = i + 1
|
|
self.messages.insert(insert_at, {"role": "system", "content": summary})
|
|
|
|
def _is_dangerous(self, cmd):
|
|
"""Check if a command matches dangerous patterns."""
|
|
return bool(DANGEROUS_PATTERNS.search(cmd))
|
|
|
|
def _edit_file(self, filename, content):
|
|
"""Apply search/replace edits to an existing file."""
|
|
if not os.path.isabs(filename):
|
|
filename = os.path.join(self.cwd, filename)
|
|
if not os.path.realpath(filename).startswith(self.root_dir):
|
|
self._add_work(f"❌ Edit blocked: {filename} is outside project directory")
|
|
return f"Error: {filename} is outside project directory"
|
|
if not os.path.exists(filename):
|
|
self._add_work(f"❌ Edit failed: {filename} not found")
|
|
return f"Error: {filename} not found"
|
|
with open(filename) as f:
|
|
original = f.read()
|
|
modified = original
|
|
# Parse <<<SEARCH ... === ... >>> or <<< ... === ... >>> blocks
|
|
edits = re.findall(r"<<<\s*(?:SEARCH\s*)?\n?(.*?)\n\s*===\s*\n(.*?)\n\s*>>>", content, re.DOTALL)
|
|
if not edits:
|
|
# Fallback: strip any stray markers and treat as full file rewrite
|
|
clean = re.sub(r"^<<<\s*(?:SEARCH)?\s*\n?|^\s*===\s*\n|^\s*>>>\s*$", "", content, flags=re.MULTILINE).strip()
|
|
self._write_file(filename, clean + "\n")
|
|
return f"Rewrote {filename} (no edit markers, used as full rewrite)"
|
|
applied = 0
|
|
for search, replace in edits:
|
|
search = search.rstrip()
|
|
replace = replace.rstrip()
|
|
if search in modified:
|
|
modified = modified.replace(search, replace, 1)
|
|
applied += 1
|
|
else:
|
|
self._add_work(f"⚠ Search text not found in {filename}")
|
|
if applied > 0:
|
|
with open(filename, "w") as f:
|
|
f.write(modified)
|
|
self._add_work(f"✏️ Edited {filename} ({applied} change{'s' if applied != 1 else ''})")
|
|
self._log_history(f"EDITED: {filename}")
|
|
if filename in self.recent_files:
|
|
self.recent_files.remove(filename)
|
|
self.recent_files.insert(0, filename)
|
|
self.recent_files = self.recent_files[:10]
|
|
return f"Edited {filename}: {applied} change(s) applied"
|
|
|
|
@property
|
|
def input_history(self):
|
|
return self.code_input_history if self.code_mode else self.chat_input_history
|
|
|
|
def _load_input_history(self, prefix):
|
|
hist = []
|
|
if os.path.exists(INPUT_HISTORY_FILE):
|
|
with open(INPUT_HISTORY_FILE) as f:
|
|
for l in f:
|
|
l = l.rstrip("\n")
|
|
if l.startswith(f"{prefix}:"):
|
|
hist.append(l[len(prefix) + 1:])
|
|
return hist[-INPUT_HISTORY_MAX:]
|
|
|
|
def _save_input_history(self, entry):
|
|
hist = self.input_history
|
|
hist.append(entry)
|
|
while len(hist) > INPUT_HISTORY_MAX:
|
|
hist.pop(0)
|
|
# Rewrite file with all entries from both modes
|
|
prefix = "code" if self.code_mode else "chat"
|
|
other_prefix = "chat" if self.code_mode else "code"
|
|
other = self.chat_input_history if self.code_mode else self.code_input_history
|
|
try:
|
|
lines = [f"{other_prefix}:{e}" for e in other]
|
|
lines += [f"{prefix}:{e}" for e in hist]
|
|
with open(INPUT_HISTORY_FILE, "w") as f:
|
|
f.write("\n".join(lines) + "\n")
|
|
except Exception:
|
|
pass
|
|
|
|
def _load_worklog(self):
|
|
if os.path.exists(WORKLOG_FILE):
|
|
with open(WORKLOG_FILE) as f:
|
|
lines = f.readlines()
|
|
# Only load last 100 lines to avoid filling context
|
|
log = "".join(lines[-100:]).strip()
|
|
if log:
|
|
self.messages.append(
|
|
{"role": "system", "content": f"Previous worklog (recent):\n{log}"}
|
|
)
|
|
self._add_work(f"📖 Resumed from '{WORKLOG_FILE}'")
|
|
|
|
def _write_worklog(self, entry):
|
|
with open(WORKLOG_FILE, "a") as f:
|
|
f.write(f"\n## {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n{entry}\n")
|
|
|
|
def _log_history(self, action):
|
|
"""Log an action to .ai-history, keeping last N entries."""
|
|
ts = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
self.ai_history.append(f"[{ts}] {action}")
|
|
self.ai_history = self.ai_history[-AI_HISTORY_MAX:]
|
|
try:
|
|
with open(AI_HISTORY_FILE, "w") as f:
|
|
f.write("\n".join(self.ai_history) + "\n")
|
|
except Exception:
|
|
pass
|
|
|
|
def _add_work(self, text):
|
|
with self.lock:
|
|
try:
|
|
w = self.stdscr.getmaxyx()[1]
|
|
except Exception:
|
|
w = 80
|
|
for line in text.split("\n"):
|
|
for wl in _wrap_dw(line, w):
|
|
self.work_lines.append(wl)
|
|
|
|
def _add_chat(self, text):
|
|
with self.lock:
|
|
for line in text.split("\n"):
|
|
self.chat_history.append(line)
|
|
|
|
def _run_command(self, cmd):
|
|
# Check for dangerous commands
|
|
if not self.yolo_mode and self._is_dangerous(cmd):
|
|
self._add_work(f"⚠️ Dangerous command blocked: {cmd}")
|
|
self._add_work(" Use /yolo to enable, or /allow to run this one")
|
|
self.pending_cmd = cmd
|
|
return "Command blocked as potentially dangerous. User must approve.", 1
|
|
self.pending_cmd = None
|
|
self._add_work(f"▶ [{self.cwd}] $ {cmd}")
|
|
self._log_history(f"RAN: {cmd}")
|
|
# Append a pwd at the end so we can track cd changes
|
|
wrapped = cmd + '\necho "___CWD___:$(pwd)"'
|
|
try:
|
|
result = subprocess.run(
|
|
wrapped, shell=True, capture_output=True, text=True, timeout=60,
|
|
cwd=self.cwd
|
|
)
|
|
stdout = result.stdout.strip()
|
|
stderr = result.stderr.strip()
|
|
# Extract new cwd from output
|
|
out_lines = stdout.split("\n")
|
|
clean_lines = []
|
|
for line in out_lines:
|
|
if line.startswith("___CWD___:"):
|
|
new_cwd = line[10:]
|
|
if os.path.isdir(new_cwd) and os.path.realpath(new_cwd).startswith(self.root_dir):
|
|
with self.lock:
|
|
self.cwd = new_cwd
|
|
else:
|
|
clean_lines.append(line)
|
|
stdout = "\n".join(clean_lines).strip()
|
|
if stdout:
|
|
self._add_work(stdout)
|
|
if stderr and stderr != stdout:
|
|
self._add_work(stderr)
|
|
status = "✓" if result.returncode == 0 else f"✗ (exit {result.returncode})"
|
|
self._add_work(f" {status}")
|
|
parts = [f"exit_code: {result.returncode}"]
|
|
if stdout:
|
|
parts.append(f"stdout: {stdout}")
|
|
if stderr:
|
|
parts.append(f"stderr: {stderr}")
|
|
return "\n".join(parts), result.returncode
|
|
except subprocess.TimeoutExpired:
|
|
self._add_work(" ✗ (timeout)")
|
|
return "Command timed out after 60s", 1
|
|
|
|
def _write_file(self, filename, content):
|
|
if not os.path.isabs(filename):
|
|
filename = os.path.join(self.cwd, filename)
|
|
if not os.path.realpath(filename).startswith(self.root_dir):
|
|
self._add_work(f"❌ Write blocked: {filename} is outside project directory")
|
|
return False
|
|
# Backup existing file before overwriting
|
|
if os.path.exists(filename):
|
|
try:
|
|
with open(filename) as f:
|
|
self._last_backup = (filename, f.read())
|
|
except Exception:
|
|
pass
|
|
os.makedirs(os.path.dirname(filename), exist_ok=True) if os.path.dirname(filename) else None
|
|
with open(filename, "w") as f:
|
|
f.write(content)
|
|
self._add_work(f"📝 Wrote {filename}")
|
|
self._log_history(f"WROTE: {filename}")
|
|
# Track recent files
|
|
if filename in self.recent_files:
|
|
self.recent_files.remove(filename)
|
|
self.recent_files.insert(0, filename)
|
|
self.recent_files = self.recent_files[:10]
|
|
return True
|
|
|
|
def _process_response(self, text):
|
|
"""Extract and execute bash commands and file writes from AI response."""
|
|
results = []
|
|
# Find fenced code blocks (``` or ~~~)
|
|
blocks = re.findall(r"```\s*(\w[\w:./\-]*)\n(.*?)```", text, re.DOTALL)
|
|
blocks += re.findall(r"~~~\s*(\w[\w:./\-]*)\n(.*?)~~~", text, re.DOTALL)
|
|
# Also catch unfenced edit blocks (AI sometimes forgets fences)
|
|
unfenced = re.findall(r"(?:^|\n)\s*(edit:[\w./\-]+)\s*\n(<<<\s*SEARCH\s*\n.*?>>>)", text, re.DOTALL)
|
|
seen = set()
|
|
for lang, content in blocks:
|
|
seen.add(content.strip())
|
|
for lang, content in unfenced:
|
|
if content.strip() not in seen:
|
|
blocks.append((lang, content))
|
|
for lang, content in blocks:
|
|
lang_lower = lang.lower()
|
|
if lang_lower == "bash":
|
|
if self.stop_flag:
|
|
break
|
|
# Detect misplaced edit/write blocks inside bash
|
|
cmd = content.strip()
|
|
m = re.match(r"^(edit|write):(\S+)\s*\n", cmd, re.IGNORECASE)
|
|
if m:
|
|
action, fname = m.group(1).lower(), m.group(2)
|
|
body = cmd[m.end():]
|
|
if action == "edit":
|
|
result = self._edit_file(fname, body)
|
|
results.append(result)
|
|
else:
|
|
if self._write_file(fname, body):
|
|
results.append(f"Wrote file: {fname}")
|
|
else:
|
|
results.append(f"Write blocked: {fname}")
|
|
elif re.match(r"^(Command\s|exit_code:|stdout:|stderr:|Results:)", cmd):
|
|
# AI echoed back feedback format — skip it
|
|
pass
|
|
elif re.search(r"(^import |^from |^def |^class |^#include\s|= init_|\.connect\()", cmd, re.MULTILINE):
|
|
# AI put source code in a bash block — skip it
|
|
self._add_work("⚠ Skipped: source code detected in bash block")
|
|
pass
|
|
else:
|
|
output, rc = self._run_command(cmd)
|
|
results.append(f"Command `{cmd}`:\n{output}")
|
|
elif lang_lower.startswith("write:"):
|
|
filename = lang[6:]
|
|
if self._write_file(filename, content):
|
|
results.append(f"Wrote file: {filename}")
|
|
else:
|
|
results.append(f"Write blocked: {filename}")
|
|
elif lang_lower.startswith("edit:"):
|
|
filename = lang[5:]
|
|
result = self._edit_file(filename, content)
|
|
results.append(result)
|
|
elif lang_lower.startswith("memory:"):
|
|
key = lang[7:]
|
|
self.memory[key] = content.strip()
|
|
self._add_work(f"🧠 Memorized '{key}'")
|
|
self._log_history(f"MEMORIZED: {key}")
|
|
results.append(f"Stored memory: {key}")
|
|
elif lang_lower.startswith("forget:"):
|
|
key = lang[7:].strip() or content.strip()
|
|
if self.memory.pop(key, None) is not None:
|
|
self._add_work(f"🧠 Forgot '{key}'")
|
|
self._log_history(f"FORGOT: {key}")
|
|
results.append(f"Cleared memory: {key}")
|
|
elif lang_lower.startswith("read:"):
|
|
filename = lang[5:]
|
|
if not os.path.isabs(filename):
|
|
filename = os.path.join(self.cwd, filename)
|
|
try:
|
|
with open(filename) as f:
|
|
file_content = f.read()
|
|
self._add_work(f"📄 Read {filename}")
|
|
results.append(f"Contents of {filename}:\n{file_content}")
|
|
except Exception as e:
|
|
results.append(f"Error reading {filename}: {e}")
|
|
else:
|
|
# Unknown language tag — check if first line is write:/edit:
|
|
first_line = content.strip().split("\n", 1)[0]
|
|
m = re.match(r"^(write|edit):(\S+)", first_line, re.IGNORECASE)
|
|
if m:
|
|
action, fname = m.group(1).lower(), m.group(2)
|
|
body = content.strip().split("\n", 1)[1] if "\n" in content.strip() else ""
|
|
if action == "write":
|
|
if self._write_file(fname, body):
|
|
results.append(f"Wrote file: {fname}")
|
|
else:
|
|
results.append(self._edit_file(fname, body))
|
|
return results
|
|
|
|
def _memory_context(self):
|
|
"""Build a system message with current memory contents."""
|
|
if not self.memory:
|
|
return None
|
|
lines = [f"[{k}]: {v}" for k, v in self.memory.items()]
|
|
return {"role": "system", "content": "Current memory:\n" + "\n".join(lines)}
|
|
|
|
def _build_messages(self):
|
|
"""Build message list with memory injected, trimming old context."""
|
|
self._trim_context()
|
|
msgs = list(self.messages)
|
|
mem = self._memory_context()
|
|
if mem:
|
|
msgs.insert(1, mem) # after system prompt
|
|
return msgs
|
|
|
|
def _stream_response(self):
|
|
"""Stream a response from the LLM backend, return full text."""
|
|
self._add_work("🔌 Connecting to server...")
|
|
resp = None
|
|
if BACKEND == "vllm":
|
|
payload = {"model": MODEL, "messages": self._build_messages(), "stream": True, "max_tokens": CONTEXT_SIZE}
|
|
else:
|
|
payload = {"model": MODEL, "messages": self._build_messages(), "stream": True, "options": {"num_ctx": CONTEXT_SIZE}}
|
|
for attempt in range(3):
|
|
try:
|
|
resp = requests.post(
|
|
OLLAMA_URL,
|
|
json=payload,
|
|
headers=AUTH_HEADERS,
|
|
stream=True,
|
|
timeout=120,
|
|
)
|
|
break
|
|
except (requests.ConnectionError, requests.Timeout) as e:
|
|
if attempt < 2:
|
|
self._add_work(f"⚠ Connection failed (attempt {attempt + 1}/3), retrying...")
|
|
time.sleep(2)
|
|
else:
|
|
self._add_work(f"❌ Failed to connect after 3 attempts: {e}")
|
|
return ""
|
|
if resp.status_code != 200:
|
|
err = resp.text[:200]
|
|
self._add_work(f"❌ Server error ({resp.status_code}): {err}")
|
|
return ""
|
|
# Remove the "Connecting" line now that we're streaming
|
|
with self.lock:
|
|
self.work_lines = [l for l in self.work_lines if l != "🔌 Connecting to server..."]
|
|
full = ""
|
|
for line in resp.iter_lines():
|
|
if self.stop_flag:
|
|
self._add_work("\n⛔ Stopped by user")
|
|
break
|
|
if line:
|
|
if BACKEND == "vllm":
|
|
line_str = line.decode("utf-8") if isinstance(line, bytes) else line
|
|
if not line_str.startswith("data: "):
|
|
continue
|
|
line_str = line_str[6:]
|
|
if line_str.strip() == "[DONE]":
|
|
break
|
|
data = json.loads(line_str)
|
|
token = (data.get("choices", [{}])[0].get("delta", {}).get("content") or "")
|
|
else:
|
|
data = json.loads(line)
|
|
token = data.get("message", {}).get("content", "")
|
|
if token:
|
|
full += token
|
|
with self.lock:
|
|
try:
|
|
w = self.stdscr.getmaxyx()[1]
|
|
except Exception:
|
|
w = 80
|
|
parts = token.split("\n")
|
|
self.work_lines[-1] += parts[0]
|
|
# Wrap current line if it exceeds display width
|
|
if _dw(self.work_lines[-1]) > w:
|
|
wrapped = _wrap_dw(self.work_lines[-1], w)
|
|
self.work_lines[-1] = wrapped[0]
|
|
self.work_lines.extend(wrapped[1:])
|
|
for p in parts[1:]:
|
|
for wl in _wrap_dw(p, w):
|
|
self.work_lines.append(wl)
|
|
# Clean up code fence markers from display
|
|
with self.lock:
|
|
self.work_lines = [
|
|
l for l in self.work_lines
|
|
if not re.match(r"^`{3}(?:bash|write:\S+|edit:\S+|memory:\S+|forget:\S+|\w+)?$", l.strip())
|
|
]
|
|
return full
|
|
|
|
def _query_ollama(self, user_msg):
|
|
self.busy = True
|
|
self.stop_flag = False
|
|
|
|
if not self.code_mode:
|
|
# Chat mode — use separate message list, don't touch code history
|
|
self.chat_messages.append({"role": "user", "content": user_msg})
|
|
self._add_work(f"\n🧑 {USER_NAME}: {user_msg}")
|
|
self._log_history(f"USER [chat]: {user_msg}")
|
|
try:
|
|
self._add_work("📠 AI: ")
|
|
# Build chat messages: system prompt + chat history
|
|
chat_msgs = [{"role": "system", "content": f"You are chatting with {USER_NAME}. The user's name is {USER_NAME} — use it. Respond in plain text only. No code blocks, no file writes, no commands. Just discuss and reason naturally."}]
|
|
chat_msgs += self.chat_messages[-MAX_CONTEXT_MESSAGES:]
|
|
# Temporarily swap messages for streaming
|
|
orig = self.messages
|
|
self.messages = chat_msgs
|
|
full_response = self._stream_response()
|
|
self.messages = orig
|
|
if full_response:
|
|
self.chat_messages.append({"role": "assistant", "content": full_response})
|
|
self._log_history(f"AI [chat]: {full_response[:100]}")
|
|
self._write_worklog(f"User [chat]: {user_msg}\nAI: {full_response[:200]}")
|
|
except Exception as e:
|
|
self._add_work(f"❌ Error: {e}")
|
|
finally:
|
|
self.busy = False
|
|
return
|
|
|
|
# Code mode — use main message list
|
|
self.messages.append({"role": "user", "content": user_msg})
|
|
# Include chat summary if there's been chat discussion
|
|
if self.chat_messages:
|
|
summary = "\n".join(f"{m['role']}: {m['content'][:150]}" for m in self.chat_messages[-10:])
|
|
self.messages.append({"role": "system", "content": f"Recent chat discussion for context:\n{summary}"})
|
|
self._add_work(f"\n🧑 {USER_NAME}: {user_msg}")
|
|
self._log_history(f"USER: {user_msg}")
|
|
|
|
try:
|
|
self._add_work("📠 AI: ")
|
|
full_response = self._stream_response()
|
|
if not full_response:
|
|
return
|
|
self.messages.append({"role": "assistant", "content": full_response})
|
|
self._log_history(f"AI: {full_response[:100]}")
|
|
|
|
# Autonomous loop: keep processing actions and feeding back results
|
|
max_rounds = 5
|
|
last_feedback = None
|
|
for _ in range(max_rounds):
|
|
if self.stop_flag:
|
|
break
|
|
action_results = self._process_response(full_response)
|
|
if not action_results:
|
|
break
|
|
# Stop looping if a command was blocked — wait for user to /allow
|
|
if any("blocked" in r.lower() for r in action_results):
|
|
self._write_worklog(f"User: {user_msg}\nAI acted.\nResults:\n" + "\n".join(action_results))
|
|
break
|
|
feedback = "Results:\n" + "\n".join(action_results)
|
|
# Stop if same results as last round (no progress)
|
|
if feedback == last_feedback:
|
|
self._add_work("⚠ AI is stuck (same results), stopping.")
|
|
self._write_worklog(f"User: {user_msg}\nAI acted.\n{feedback}")
|
|
break
|
|
last_feedback = feedback
|
|
self._write_worklog(f"User: {user_msg}\nAI acted.\n{feedback}")
|
|
self.messages.append({"role": "user", "content": feedback})
|
|
self._add_work("\n📠 AI (continuing): ")
|
|
full_response = self._stream_response()
|
|
if not full_response:
|
|
break
|
|
self.messages.append({"role": "assistant", "content": full_response})
|
|
self._log_history(f"AI: {full_response[:100]}")
|
|
|
|
if not any(self._process_response(full_response) for _ in [0]):
|
|
self._write_worklog(f"User: {user_msg}\nAI: {full_response[:200]}")
|
|
|
|
except Exception as e:
|
|
self._add_work(f"❌ Error: {e}")
|
|
finally:
|
|
self.busy = False
|
|
|
|
def _save_config_model(self, new_model):
|
|
"""Update model in config file."""
|
|
conf = {}
|
|
if os.path.exists(CONFIG_FILE):
|
|
with open(CONFIG_FILE) as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if "=" in line and not line.startswith("#"):
|
|
k, v = line.split("=", 1)
|
|
conf[k.strip()] = v.strip()
|
|
conf["model"] = new_model
|
|
with open(CONFIG_FILE, "w") as f:
|
|
for k in REQUIRED_KEYS + OPTIONAL_KEYS:
|
|
if k in conf:
|
|
f.write(f"{k}={conf[k]}\n")
|
|
|
|
def _show_model_picker(self):
|
|
"""Show a popup to pick a model from the server."""
|
|
global MODEL
|
|
self._add_work("⏳ Fetching models...")
|
|
self._draw()
|
|
host = OLLAMA_URL.rsplit("/api/chat", 1)[0] if BACKEND != "vllm" else OLLAMA_URL.rsplit("/v1/chat/completions", 1)[0]
|
|
try:
|
|
if BACKEND == "vllm":
|
|
r = requests.get(host + "/v1/models", headers=AUTH_HEADERS, timeout=10)
|
|
r.raise_for_status()
|
|
models = sorted(m["id"] for m in r.json().get("data", []))
|
|
else:
|
|
r = requests.get(host + "/api/tags", headers=AUTH_HEADERS, timeout=10)
|
|
r.raise_for_status()
|
|
models = sorted(m["name"] for m in r.json().get("models", []))
|
|
except Exception as e:
|
|
self._add_work(f"❌ Failed to fetch models: {e}")
|
|
return
|
|
if not models:
|
|
self._add_work("❌ No models available on server")
|
|
return
|
|
sel = models.index(MODEL) if MODEL in models else 0
|
|
self.stdscr.nodelay(False)
|
|
self.stdscr.timeout(-1)
|
|
while True:
|
|
h, w = self.stdscr.getmaxyx()
|
|
# Popup dimensions
|
|
box_w = min(60, w - 4)
|
|
box_h = min(len(models) + 4, h - 4)
|
|
visible = box_h - 4
|
|
x = (w - box_w) // 2
|
|
y = (h - box_h) // 2
|
|
# Scroll window
|
|
scroll = max(0, sel - visible + 1)
|
|
# Draw popup
|
|
for row in range(box_h):
|
|
self._safe_addstr(y + row, x, " " * box_w, w, curses.color_pair(6))
|
|
title = " SELECT MODEL "
|
|
tpad = box_w - len(title)
|
|
self._safe_addstr(y, x, "─" * (tpad // 2) + title + "─" * ((tpad + 1) // 2), w, curses.color_pair(6))
|
|
self._safe_addstr(y + 1, x, f" Current: {MODEL}"[:box_w], w, curses.color_pair(6))
|
|
self._safe_addstr(y + box_h - 1, x, " ↑↓ select Enter confirm Esc cancel "[:box_w], w, curses.color_pair(3))
|
|
for i in range(visible):
|
|
idx = scroll + i
|
|
if idx >= len(models):
|
|
break
|
|
name = models[idx][:box_w - 4]
|
|
row = y + 2 + i
|
|
if idx == sel:
|
|
self._safe_addstr(row, x, f" ▸ {name:<{box_w - 3}}", w, curses.color_pair(5) | curses.A_BOLD)
|
|
else:
|
|
self._safe_addstr(row, x, f" {name:<{box_w - 3}}", w, curses.color_pair(6))
|
|
self.stdscr.refresh()
|
|
ch = self.stdscr.getch()
|
|
if ch == 27: # Esc
|
|
break
|
|
elif ch == curses.KEY_UP and sel > 0:
|
|
sel -= 1
|
|
elif ch == curses.KEY_DOWN and sel < len(models) - 1:
|
|
sel += 1
|
|
elif ch in (curses.KEY_ENTER, 10, 13):
|
|
MODEL = models[sel]
|
|
self._save_config_model(MODEL)
|
|
self._add_work(f"✅ Model changed to {MODEL}")
|
|
break
|
|
self.stdscr.timeout(100)
|
|
|
|
def _get_editor(self):
|
|
"""Get user's preferred editor."""
|
|
return os.environ.get("VISUAL") or os.environ.get("EDITOR") or "vi"
|
|
|
|
def _edit_in_editor(self, filepath):
|
|
"""Open a file in the user's editor, suspending curses."""
|
|
if not os.path.isabs(filepath):
|
|
filepath = os.path.join(self.cwd, filepath)
|
|
if not os.path.exists(filepath):
|
|
self._add_work(f"❌ File not found: {filepath}")
|
|
return
|
|
editor = self._get_editor()
|
|
curses.endwin()
|
|
try:
|
|
subprocess.run([editor, filepath])
|
|
except Exception as e:
|
|
pass
|
|
self.stdscr.refresh()
|
|
self._add_work(f"📝 Returned from {editor}: {filepath}")
|
|
|
|
def _safe_addstr(self, row, col, text, maxw, attr=0):
|
|
"""Write text to screen, safely clamped to bounds."""
|
|
if row < 0 or col >= maxw:
|
|
return
|
|
text = text[:maxw - col]
|
|
if not text:
|
|
return
|
|
try:
|
|
self.stdscr.addnstr(row, col, text, maxw - col, attr)
|
|
except curses.error:
|
|
pass
|
|
|
|
def _draw(self):
|
|
h, w = self.stdscr.getmaxyx()
|
|
if h < 4 or w < 10:
|
|
return
|
|
work_h = max(2, int(h * 0.9))
|
|
chat_h = h - work_h
|
|
|
|
self.stdscr.erase()
|
|
|
|
# --- Top: work window ---
|
|
title = " WORK "
|
|
bar_attr = curses.color_pair(2)
|
|
if self.busy:
|
|
frame = SPINNER_FRAMES[self.spinner_idx % len(SPINNER_FRAMES)]
|
|
self.spinner_idx += 1
|
|
title = f" {frame} WORKING {frame} "
|
|
if int(time.time() * 3) % 2:
|
|
title_attr = curses.color_pair(5) | curses.A_BOLD
|
|
else:
|
|
title_attr = curses.color_pair(4) | curses.A_BOLD
|
|
else:
|
|
title_attr = bar_attr
|
|
pad = w - len(title)
|
|
left_pad = pad // 2
|
|
right_pad = (pad + 1) // 2
|
|
self._safe_addstr(0, 0, "─" * left_pad, w, bar_attr)
|
|
self._safe_addstr(0, left_pad, title, w, title_attr)
|
|
self._safe_addstr(0, left_pad + len(title), "─" * right_pad, w, bar_attr)
|
|
# Holck Editor label on top-left
|
|
he_label = " Holck Editor "
|
|
if len(he_label) < left_pad:
|
|
self._safe_addstr(0, 0, he_label, w, curses.color_pair(3))
|
|
# Ollama status in top-right
|
|
if self.ollama_status:
|
|
status_str = f" {self.ollama_status} "
|
|
sx = w - len(status_str)
|
|
if sx > left_pad + len(title):
|
|
self._safe_addstr(0, sx, status_str, w, curses.color_pair(3))
|
|
|
|
with self.lock:
|
|
total = len(self.work_lines)
|
|
view_h = work_h - 1
|
|
# Clamp scroll offset
|
|
max_scroll = max(0, total - view_h)
|
|
self.scroll_offset = max(0, min(self.scroll_offset, max_scroll))
|
|
if self.scroll_offset > 0:
|
|
start = total - view_h - self.scroll_offset
|
|
start = max(0, start)
|
|
visible = self.work_lines[start:start + view_h]
|
|
else:
|
|
visible = self.work_lines[-view_h:]
|
|
# Wrap long lines at display time
|
|
wrapped_visible = []
|
|
for line in visible:
|
|
for wl in _wrap_dw(line, w):
|
|
wrapped_visible.append(wl)
|
|
visible = wrapped_visible[-view_h:] if len(wrapped_visible) > view_h else wrapped_visible
|
|
# Determine initial color by scanning backwards from visible start
|
|
cur_attr = 0
|
|
with self.lock:
|
|
# Find where visible starts in work_lines
|
|
total_wl = len(self.work_lines)
|
|
if self.scroll_offset > 0:
|
|
vis_start = max(0, total_wl - (work_h - 1) - self.scroll_offset)
|
|
else:
|
|
vis_start = max(0, total_wl - (work_h - 1))
|
|
for j in range(vis_start - 1, -1, -1):
|
|
pl = self.work_lines[j]
|
|
if pl.startswith("🧑 "):
|
|
cur_attr = curses.color_pair(1); break
|
|
elif pl.startswith("📠 AI"):
|
|
cur_attr = curses.color_pair(2); break
|
|
elif pl.startswith("▶ "):
|
|
cur_attr = curses.color_pair(3); break
|
|
elif pl.startswith(" ✗") or pl.startswith("❌"):
|
|
cur_attr = curses.color_pair(4); break
|
|
elif pl.startswith("📝 ") or pl.startswith("✏️"):
|
|
cur_attr = curses.color_pair(7); break
|
|
for i, line in enumerate(visible):
|
|
row = 1 + i
|
|
if row >= work_h:
|
|
break
|
|
# Colorize by content type — sticky for continuation lines
|
|
if line.startswith("🧑 ") or line.startswith(f"🧑 {USER_NAME}"):
|
|
cur_attr = curses.color_pair(1) # green — user
|
|
elif line.startswith("📠 AI"):
|
|
cur_attr = curses.color_pair(2) # cyan — AI
|
|
elif line.startswith("▶ "):
|
|
cur_attr = curses.color_pair(3) # yellow — command
|
|
elif line.startswith(" ✓"):
|
|
cur_attr = curses.color_pair(1) # green — success
|
|
elif line.startswith(" ✗") or line.startswith("❌"):
|
|
cur_attr = curses.color_pair(4) # red — error
|
|
elif line.startswith("⚠"):
|
|
cur_attr = curses.color_pair(3) # yellow — warning
|
|
elif line.startswith("📝 ") or line.startswith("✏️"):
|
|
cur_attr = curses.color_pair(7) # magenta — file write/edit
|
|
elif line.startswith("🔌 ") or line.startswith("📋 ") or line.startswith("📂 ") or line.startswith("📖 ") or line.startswith("💡"):
|
|
cur_attr = curses.color_pair(2) # cyan — info
|
|
self._safe_addstr(row, 0, line[:w], w, cur_attr)
|
|
|
|
# --- Recent files panel (top-right) ---
|
|
if self.scroll_offset > 0:
|
|
indicator = f" ↑ {self.scroll_offset} lines above "
|
|
self._safe_addstr(1, 0, indicator, w, curses.color_pair(3))
|
|
|
|
if self.recent_files:
|
|
panel_w = min(30, w // 3)
|
|
panel_h = min(len(self.recent_files) + 2, work_h - 1)
|
|
px = w - panel_w
|
|
py = 1
|
|
# Draw panel
|
|
header = " FILES "
|
|
hpad = panel_w - len(header)
|
|
hbar = ("─" * (hpad // 2)) + header + ("─" * ((hpad + 1) // 2))
|
|
self._safe_addstr(py, px, hbar, w, curses.color_pair(6))
|
|
for fi, fname in enumerate(self.recent_files):
|
|
row = py + 1 + fi
|
|
if row >= py + panel_h - 1:
|
|
break
|
|
label = os.path.relpath(fname, self.cwd) if os.path.isabs(fname) else fname
|
|
label = label[:panel_w - 2]
|
|
self._safe_addstr(row, px, f" {label:<{panel_w - 1}}", w, curses.color_pair(6))
|
|
bot_row = min(py + panel_h - 1, work_h - 1)
|
|
self._safe_addstr(bot_row, px, "─" * panel_w, w, curses.color_pair(6))
|
|
|
|
# --- Bottom: chat window ---
|
|
sep_row = work_h
|
|
mode_label = "CODE" if self.code_mode else "CHAT"
|
|
sep_title = f" {mode_label} {MODEL}@{HOST_SHORT} "
|
|
sep_color = curses.color_pair(1) if self.code_mode else curses.color_pair(3)
|
|
spad = w - len(sep_title)
|
|
sep = ("─" * (spad // 2)) + sep_title + ("─" * ((spad + 1) // 2))
|
|
self._safe_addstr(sep_row, 0, sep, w, sep_color)
|
|
|
|
# Chat history lines
|
|
chat_display_h = chat_h - 2 # minus separator and input line
|
|
with self.lock:
|
|
chat_vis = self.chat_history[-chat_display_h:] if chat_display_h > 0 else []
|
|
for i, line in enumerate(chat_vis):
|
|
row = sep_row + 1 + i
|
|
if row >= h - 1:
|
|
break
|
|
self._safe_addstr(row, 0, line[:w], w)
|
|
|
|
# Input area with word-wrapping
|
|
prompt = "▸▸ " if self.multiline_mode else ("▸ " if self.code_mode else "▸ ")
|
|
plen = len(prompt)
|
|
avail_first = w - plen # chars on first line (after prompt)
|
|
avail_rest = w # chars on continuation lines
|
|
# Build wrapped lines: first line has prompt, rest are full-width
|
|
inp = self.chat_input
|
|
wrapped = []
|
|
if avail_first > 0:
|
|
wrapped.append(inp[:avail_first])
|
|
inp = inp[avail_first:]
|
|
while inp and avail_rest > 0:
|
|
wrapped.append(inp[:avail_rest])
|
|
inp = inp[avail_rest:]
|
|
if not wrapped:
|
|
wrapped = [""]
|
|
num_input_lines = len(wrapped)
|
|
input_start_row = h - num_input_lines
|
|
try:
|
|
for li, chunk in enumerate(wrapped):
|
|
row = input_start_row + li
|
|
if row < sep_row + 1:
|
|
continue
|
|
if li == 0:
|
|
self.stdscr.attron(curses.color_pair(1))
|
|
self.stdscr.addnstr(row, 0, prompt, w)
|
|
self.stdscr.attroff(curses.color_pair(1))
|
|
self.stdscr.addnstr(row, plen, chunk, avail_first)
|
|
else:
|
|
self.stdscr.addnstr(row, 0, chunk, avail_rest)
|
|
self.stdscr.clrtoeol()
|
|
# Position cursor
|
|
cp = self.cursor_pos
|
|
if cp <= avail_first:
|
|
cur_row = input_start_row
|
|
cur_col = plen + cp
|
|
else:
|
|
cp2 = cp - avail_first
|
|
cur_row = input_start_row + 1 + (cp2 // avail_rest if avail_rest else 0)
|
|
cur_col = cp2 % avail_rest if avail_rest else 0
|
|
self.stdscr.move(min(cur_row, h - 1), min(cur_col, w - 1))
|
|
except curses.error:
|
|
pass
|
|
|
|
self.stdscr.refresh()
|
|
|
|
def run(self):
|
|
self._add_work(f"🚀 Welcome back, {USER_NAME}! — Ctrl+D to stop AI, Ctrl+C to quit")
|
|
self._add_work("💡 Tab to toggle CODE/CHAT mode | /help for all commands")
|
|
if not MODEL:
|
|
self._add_work("⚠ No model chosen. Use /model to select one.")
|
|
self._add_work("")
|
|
|
|
while True:
|
|
self._draw()
|
|
try:
|
|
ch = self.stdscr.getch()
|
|
except KeyboardInterrupt:
|
|
break
|
|
|
|
if ch == -1:
|
|
continue
|
|
elif ch == 4: # Ctrl+D
|
|
self.stop_flag = True
|
|
self._add_work("⛔ Stop signal sent")
|
|
elif ch == 3: # Ctrl+C
|
|
break
|
|
elif ch == 9: # Tab — toggle code/chat mode
|
|
self.code_mode = not self.code_mode
|
|
elif ch == curses.KEY_MOUSE:
|
|
try:
|
|
_, _, _, _, bstate = curses.getmouse()
|
|
if bstate & curses.BUTTON4_PRESSED:
|
|
self.scroll_offset += 3
|
|
elif bstate & curses.BUTTON5_PRESSED:
|
|
self.scroll_offset = max(0, self.scroll_offset - 3)
|
|
except curses.error:
|
|
pass
|
|
elif ch == curses.KEY_SR or ch == 575: # Shift+Up or Ctrl+Up — scroll up
|
|
self.scroll_offset += 3
|
|
elif ch == curses.KEY_SF or ch == 534: # Shift+Down or Ctrl+Down — scroll down
|
|
self.scroll_offset = max(0, self.scroll_offset - 3)
|
|
elif ch == curses.KEY_PPAGE: # Page Up
|
|
h, _ = self.stdscr.getmaxyx()
|
|
self.scroll_offset += max(1, int(h * 0.9) - 5)
|
|
elif ch == curses.KEY_NPAGE: # Page Down
|
|
h, _ = self.stdscr.getmaxyx()
|
|
self.scroll_offset = max(0, self.scroll_offset - max(1, int(h * 0.9) - 5))
|
|
elif ch == curses.KEY_UP: # Arrow Up — previous history
|
|
if self.input_history:
|
|
if self.history_idx == -1:
|
|
self.saved_input = self.chat_input
|
|
self.history_idx = len(self.input_history) - 1
|
|
elif self.history_idx > 0:
|
|
self.history_idx -= 1
|
|
self.chat_input = self.input_history[self.history_idx]
|
|
self.cursor_pos = len(self.chat_input)
|
|
elif ch == curses.KEY_DOWN: # Arrow Down — next history
|
|
if self.history_idx != -1:
|
|
if self.history_idx < len(self.input_history) - 1:
|
|
self.history_idx += 1
|
|
self.chat_input = self.input_history[self.history_idx]
|
|
else:
|
|
self.history_idx = -1
|
|
self.chat_input = self.saved_input
|
|
self.cursor_pos = len(self.chat_input)
|
|
elif ch == curses.KEY_LEFT: # Arrow Left — back one char
|
|
if self.cursor_pos > 0:
|
|
self.cursor_pos -= 1
|
|
elif ch == curses.KEY_RIGHT: # Arrow Right — forward one char
|
|
if self.cursor_pos < len(self.chat_input):
|
|
self.cursor_pos += 1
|
|
elif ch in (curses.KEY_BACKSPACE, 127, 8):
|
|
if self.cursor_pos > 0:
|
|
self.chat_input = self.chat_input[:self.cursor_pos - 1] + self.chat_input[self.cursor_pos:]
|
|
self.cursor_pos -= 1
|
|
elif ch == 1: # Ctrl+A — beginning of line
|
|
self.cursor_pos = 0
|
|
elif ch == 5: # Ctrl+E — end of line
|
|
self.cursor_pos = len(self.chat_input)
|
|
elif ch == 6: # Ctrl+F — forward one char
|
|
if self.cursor_pos < len(self.chat_input):
|
|
self.cursor_pos += 1
|
|
elif ch == 2: # Ctrl+B — back one char
|
|
if self.cursor_pos > 0:
|
|
self.cursor_pos -= 1
|
|
elif ch == 23: # Ctrl+W — delete word backward
|
|
i = self.cursor_pos
|
|
while i > 0 and self.chat_input[i - 1] == ' ':
|
|
i -= 1
|
|
while i > 0 and self.chat_input[i - 1] != ' ':
|
|
i -= 1
|
|
self.chat_input = self.chat_input[:i] + self.chat_input[self.cursor_pos:]
|
|
self.cursor_pos = i
|
|
elif ch == 21: # Ctrl+U — clear to start
|
|
self.chat_input = self.chat_input[self.cursor_pos:]
|
|
self.cursor_pos = 0
|
|
elif ch == 11: # Ctrl+K — kill to end of line
|
|
self.chat_input = self.chat_input[:self.cursor_pos]
|
|
elif ch == 16: # Ctrl+P — previous history
|
|
if self.input_history:
|
|
if self.history_idx == -1:
|
|
self.saved_input = self.chat_input
|
|
self.history_idx = len(self.input_history) - 1
|
|
elif self.history_idx > 0:
|
|
self.history_idx -= 1
|
|
self.chat_input = self.input_history[self.history_idx]
|
|
self.cursor_pos = len(self.chat_input)
|
|
elif ch == 14: # Ctrl+N — next history
|
|
if self.history_idx != -1:
|
|
if self.history_idx < len(self.input_history) - 1:
|
|
self.history_idx += 1
|
|
self.chat_input = self.input_history[self.history_idx]
|
|
else:
|
|
self.history_idx = -1
|
|
self.chat_input = self.saved_input
|
|
self.cursor_pos = len(self.chat_input)
|
|
elif ch == 13 and self.multiline_mode: # Ctrl+M toggles multiline
|
|
pass # handled below
|
|
elif ch in (curses.KEY_ENTER, 10, 13):
|
|
# Detect paste: drain any buffered input rapidly
|
|
self.stdscr.nodelay(True)
|
|
paste_buf = self.chat_input
|
|
while True:
|
|
pc = self.stdscr.getch()
|
|
if pc == -1:
|
|
break
|
|
elif pc in (curses.KEY_ENTER, 10, 13):
|
|
paste_buf += "\n"
|
|
elif 32 <= pc <= 126:
|
|
paste_buf += chr(pc)
|
|
elif pc in (curses.KEY_BACKSPACE, 127, 8):
|
|
paste_buf = paste_buf[:-1] if paste_buf else paste_buf
|
|
self.stdscr.nodelay(False)
|
|
self.stdscr.timeout(100)
|
|
self.chat_input = paste_buf
|
|
self.cursor_pos = len(self.chat_input)
|
|
# In multiline mode, empty line sends; otherwise add to buffer
|
|
if self.multiline_mode:
|
|
if self.chat_input == "":
|
|
msg = "\n".join(self.multiline_buf).strip()
|
|
self.multiline_buf = []
|
|
else:
|
|
self.multiline_buf.append(self.chat_input)
|
|
self.chat_input = ""
|
|
self.cursor_pos = 0
|
|
continue
|
|
else:
|
|
msg = self.chat_input.strip()
|
|
self.chat_input = ""
|
|
self.cursor_pos = 0
|
|
if not msg:
|
|
continue
|
|
# Save to input history
|
|
if msg and (not self.input_history or self.input_history[-1] != msg):
|
|
self._save_input_history(msg)
|
|
self.history_idx = -1
|
|
self.saved_input = ""
|
|
# Reset scroll to bottom on new input
|
|
self.scroll_offset = 0
|
|
if msg.lower() in ("quit", "exit"):
|
|
break
|
|
if msg == "!" or msg.startswith("!"):
|
|
if msg == "!":
|
|
curses.endwin()
|
|
shell = os.environ.get("SHELL", "/bin/sh")
|
|
subprocess.run([shell], cwd=self.cwd)
|
|
self.stdscr.refresh()
|
|
self._add_work("📟 Returned from shell")
|
|
else:
|
|
cmd = msg[1:].strip()
|
|
# Detect interactive commands that need the full terminal
|
|
first_word = cmd.split()[0] if cmd.split() else ""
|
|
interactive = {"vi", "vim", "nvim", "nano", "pico", "emacs", "less", "more", "top", "htop", "man", "ssh", "bash", "sh", "zsh", "fish"}
|
|
if first_word in interactive:
|
|
curses.endwin()
|
|
subprocess.run(cmd, shell=True, cwd=self.cwd)
|
|
self.stdscr.refresh()
|
|
self._add_work(f"📟 Returned from: {cmd}")
|
|
else:
|
|
self._add_work(f"▶ $ {cmd}")
|
|
try:
|
|
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=60, cwd=self.cwd)
|
|
if result.stdout.strip():
|
|
self._add_work(result.stdout.strip())
|
|
if result.stderr.strip():
|
|
self._add_work(result.stderr.strip())
|
|
status = "✓" if result.returncode == 0 else f"✗ (exit {result.returncode})"
|
|
self._add_work(f" {status}")
|
|
except subprocess.TimeoutExpired:
|
|
self._add_work(" ✗ (timeout)")
|
|
self._draw()
|
|
continue
|
|
if msg.startswith("/open "):
|
|
filepath = msg[6:].strip()
|
|
self._load_file_context(filepath)
|
|
continue
|
|
if msg == "/memory":
|
|
if self.memory:
|
|
self._add_work("\n🧠 Memory:")
|
|
for k, v in self.memory.items():
|
|
self._add_work(f" [{k}]: {v}")
|
|
else:
|
|
self._add_work("🧠 Memory is empty")
|
|
continue
|
|
if msg == "/help":
|
|
self._add_work("\n📖 Commands:")
|
|
self._add_work(" /open <file> — load file as AI context")
|
|
self._add_work(" /memory — view AI memory scratchpad")
|
|
self._add_work(" /yolo — toggle dangerous command safety")
|
|
self._add_work(" /allow — approve a blocked command")
|
|
self._add_work(" /ml — toggle multi-line input")
|
|
self._add_work(" /clear — reset conversation and work window")
|
|
self._add_work(" /pwd — show current working directory")
|
|
self._add_work(" /cd <dir> — change working directory")
|
|
self._add_work(" /config — show current configuration")
|
|
self._add_work(" /model — pick model from server")
|
|
self._add_work(" /undo — restore last overwritten file")
|
|
self._add_work(" /diff — show diff of last overwritten file")
|
|
self._add_work(" /cat <file> — view a file in work window")
|
|
self._add_work(" /edit <file> — open file in editor ($VISUAL/$EDITOR/vi)")
|
|
self._add_work(" /grep <pat> — search project files")
|
|
self._add_work(" /history — show recent AI actions")
|
|
self._add_work(" /help — show this help")
|
|
self._add_work(" quit / exit — exit the program")
|
|
self._add_work(" Ctrl+D — stop AI mid-action")
|
|
self._add_work(" Ctrl+C — quit")
|
|
self._add_work(" Tab — toggle CODE/CHAT mode")
|
|
self._add_work(" PgUp/PgDn — scroll work window")
|
|
self._add_work(" ↑/↓ — input history")
|
|
self._add_work(" ←/→ — move cursor")
|
|
self._add_work(" Ctrl+A/E — beginning/end of line")
|
|
self._add_work(" Ctrl+F/B — forward/back one char")
|
|
self._add_work(" Ctrl+W — delete word backward")
|
|
self._add_work(" Ctrl+U — clear to start of line")
|
|
self._add_work(" Ctrl+K — kill to end of line")
|
|
self._add_work(" Ctrl+P/N — previous/next history")
|
|
continue
|
|
if msg == "/yolo":
|
|
self.yolo_mode = not self.yolo_mode
|
|
state = "ON ⚡" if self.yolo_mode else "OFF 🛡️"
|
|
self._add_work(f"🔧 YOLO mode: {state}")
|
|
continue
|
|
if msg == "/allow" and self.pending_cmd:
|
|
cmd = self.pending_cmd
|
|
self.pending_cmd = None
|
|
self._add_work(f"✅ Approved: {cmd}")
|
|
old = self.yolo_mode
|
|
self.yolo_mode = True
|
|
self._run_command(cmd)
|
|
self.yolo_mode = old
|
|
continue
|
|
if msg == "/ml":
|
|
self.multiline_mode = not self.multiline_mode
|
|
state = "ON (blank line to send)" if self.multiline_mode else "OFF"
|
|
self._add_work(f"📝 Multi-line mode: {state}")
|
|
continue
|
|
if msg == "/clear":
|
|
with self.lock:
|
|
self.work_lines.clear()
|
|
self.chat_history.clear()
|
|
self.messages = [{"role": "system", "content": SYSTEM_PROMPT}]
|
|
self.scroll_offset = 0
|
|
self._load_project_context()
|
|
self._add_work("🧹 Cleared conversation and work window")
|
|
continue
|
|
if msg == "/pwd":
|
|
self._add_work(f"📁 {self.cwd}")
|
|
continue
|
|
if msg.startswith("/cd "):
|
|
target = msg[4:].strip()
|
|
if not os.path.isabs(target):
|
|
target = os.path.join(self.cwd, target)
|
|
target = os.path.realpath(os.path.normpath(target))
|
|
if not target.startswith(self.root_dir):
|
|
self._add_work(f"❌ Cannot leave project directory: {self.root_dir}")
|
|
elif os.path.isdir(target):
|
|
self.cwd = target
|
|
self._add_work(f"📁 Changed to {self.cwd}")
|
|
else:
|
|
self._add_work(f"❌ Not a directory: {target}")
|
|
continue
|
|
if msg == "/config":
|
|
self._add_work(f"\n⚙️ Config ({CONFIG_FILE}):")
|
|
self._add_work(f" Host: {OLLAMA_URL}")
|
|
self._add_work(f" Model: {MODEL}")
|
|
self._add_work(f" User: {USER_NAME}")
|
|
self._add_work(f" Auth: {'set' if AUTH_HEADERS else 'none'}")
|
|
self._add_work(f" Ctx: {CONTEXT_SIZE}")
|
|
self._add_work(f" Back: {BACKEND}")
|
|
continue
|
|
if msg == "/model":
|
|
self._show_model_picker()
|
|
continue
|
|
if msg == "/undo":
|
|
if self._last_backup:
|
|
fname, content = self._last_backup
|
|
with open(fname, "w") as f:
|
|
f.write(content)
|
|
self._add_work(f"↩️ Restored {fname}")
|
|
self._last_backup = None
|
|
else:
|
|
self._add_work("❌ Nothing to undo")
|
|
continue
|
|
if msg.startswith("/cat "):
|
|
filepath = msg[5:].strip()
|
|
if not os.path.isabs(filepath):
|
|
filepath = os.path.join(self.cwd, filepath)
|
|
try:
|
|
with open(filepath) as f:
|
|
self._add_work(f"\n📄 {filepath}:")
|
|
self._add_work(f.read().rstrip())
|
|
except Exception as e:
|
|
self._add_work(f"❌ {e}")
|
|
continue
|
|
if msg.startswith("/diff"):
|
|
if self._last_backup:
|
|
fname, old = self._last_backup
|
|
try:
|
|
with open(fname) as f:
|
|
new = f.read()
|
|
diff = difflib.unified_diff(old.splitlines(), new.splitlines(), fromfile="before", tofile="after", lineterm="")
|
|
self._add_work(f"\n📊 Diff: {fname}")
|
|
for line in diff:
|
|
self._add_work(line)
|
|
except Exception as e:
|
|
self._add_work(f"❌ {e}")
|
|
else:
|
|
self._add_work("❌ No backup to diff against")
|
|
continue
|
|
if msg == "/history":
|
|
self._add_work("\n📜 Recent actions:")
|
|
for entry in self.ai_history[-20:]:
|
|
self._add_work(f" {entry}")
|
|
if not self.ai_history:
|
|
self._add_work(" (empty)")
|
|
continue
|
|
if msg.startswith("/edit "):
|
|
filepath = msg[6:].strip()
|
|
self._edit_in_editor(filepath)
|
|
continue
|
|
if msg.startswith("/grep "):
|
|
pattern = msg[6:].strip()
|
|
self._add_work(f"\n🔍 Searching for: {pattern}")
|
|
try:
|
|
result = subprocess.run(
|
|
["grep", "-rn", "--color=never", pattern, "."],
|
|
capture_output=True, text=True, timeout=10, cwd=self.cwd
|
|
)
|
|
if result.stdout.strip():
|
|
for line in result.stdout.strip().split("\n")[:30]:
|
|
self._add_work(f" {line}")
|
|
else:
|
|
self._add_work(" No matches found")
|
|
except Exception as e:
|
|
self._add_work(f"❌ {e}")
|
|
continue
|
|
self._add_chat(f"▸ {msg}")
|
|
if not self.busy:
|
|
t = threading.Thread(target=self._query_ollama, args=(msg,), daemon=True)
|
|
t.start()
|
|
else:
|
|
self._add_chat("⏳ AI is busy, wait or Ctrl+D to stop")
|
|
elif 32 <= ch <= 126:
|
|
self.chat_input = self.chat_input[:self.cursor_pos] + chr(ch) + self.chat_input[self.cursor_pos:]
|
|
self.cursor_pos += 1
|
|
|
|
|
|
def main(stdscr):
|
|
Vibe(stdscr).run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
curses.wrapper(main)
|
|
print(f"\n👋 Bye, {USER_NAME}! Don't forget to commit your changes to git.\n")
|