Files
tools/vedit/vedit.py
T
2026-04-08 16:06:34 +02:00

1512 lines
68 KiB
Python
Executable File

#!/usr/bin/env python3
"""Vibe - A simple vibe coding tool bridging Ollama LLM and bash shell."""
import curses
import difflib
import json
import os
import subprocess
import threading
import time
import re
import requests
import sys
import unicodedata
from datetime import datetime
from urllib.parse import urlparse
CONFIG_FILE = os.path.expanduser("~/.ahvibe.conf")
REQUIRED_KEYS = ["ollama_host", "user_name"]
OPTIONAL_KEYS = ["model", "auth_key", "context_size", "backend"]
PROMPTS = {
"user_name": "Your name: ",
"ollama_host": "Server host URL (e.g. http://localhost:11434): ",
"model": "Model name (e.g. qwen2.5-coder:14b-instruct): ",
"auth_key": "Auth key (optional, press Enter to skip): ",
"backend": "Backend (ollama or vllm, default ollama): ",
}
def _load_or_create_config():
"""Load config from ~/.ahvibe.conf, prompting for any missing values."""
conf = {}
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE) as f:
for line in f:
line = line.strip()
if "=" in line and not line.startswith("#"):
k, v = line.split("=", 1)
conf[k.strip()] = v.strip()
# Default context_size silently
if "context_size" not in conf:
conf["context_size"] = "16384"
if "backend" not in conf:
conf["backend"] = "ollama"
missing = [k for k in REQUIRED_KEYS if not conf.get(k)]
missing += [k for k in OPTIONAL_KEYS if k not in conf and k not in ("context_size", "model", "backend")]
if missing:
try:
print(f"Config: {CONFIG_FILE}")
print()
for k in missing:
val = input(PROMPTS[k]).strip()
if k in REQUIRED_KEYS and not val:
print(f"Error: {k} is required.")
sys.exit(1)
conf[k] = val
except (KeyboardInterrupt, EOFError):
print("\nBye.")
sys.exit(0)
with open(CONFIG_FILE, "w") as f:
for k in REQUIRED_KEYS + OPTIONAL_KEYS:
if k in conf:
f.write(f"{k}={conf[k]}\n")
print(f"Saved to {CONFIG_FILE}\n")
backend = conf.get("backend", "ollama").lower()
host = conf["ollama_host"].rstrip("/")
headers = {}
if conf.get("auth_key"):
headers["Authorization"] = f"Bearer {conf['auth_key']}"
# Validate connection
try:
if backend == "vllm":
r = requests.get(host + "/v1/models", headers=headers, timeout=10)
else:
r = requests.get(host + "/api/tags", headers=headers, timeout=10)
if r.status_code == 401:
print("Error: Authentication failed. Check your auth key.")
sys.exit(1)
r.raise_for_status()
if backend == "vllm":
models = [m["id"] for m in r.json().get("data", [])]
else:
models = [m["name"] for m in r.json().get("models", [])]
if conf.get("model") and conf["model"] not in models:
print(f"Warning: Model '{conf['model']}' not found on server.")
print(f"Available: {', '.join(models) if models else 'none'}")
conf["model"] = ""
except requests.ConnectionError:
print(f"Error: Cannot connect to {host}")
sys.exit(1)
except SystemExit:
raise
except Exception as e:
print(f"Error connecting to server: {e}")
sys.exit(1)
if backend == "vllm":
chat_url = host + "/v1/chat/completions"
else:
chat_url = host + "/api/chat"
return chat_url, conf.get("model", ""), conf["user_name"], headers, int(conf["context_size"]), backend
OLLAMA_URL, MODEL, USER_NAME, AUTH_HEADERS, CONTEXT_SIZE, BACKEND = _load_or_create_config()
_h = urlparse(OLLAMA_URL).hostname
HOST_SHORT = _h if _h[0].isdigit() else _h.split(".")[0]
WORKLOG_FILE = "worklog.md"
DESCRIPTION_FILE = "description.md"
AI_HISTORY_FILE = ".ai-history"
AI_HISTORY_MAX = 50
INPUT_HISTORY_FILE = ".input-history"
INPUT_HISTORY_MAX = 200
MAX_CONTEXT_MESSAGES = 40 # keep last N messages before summarizing
DANGEROUS_PATTERNS = re.compile(
r"\brm\s+(-[a-z]*f|-[a-z]*r|--force|--recursive)|\bdd\b.*of=/|"
r"\bmkfs\b|\bformat\b|\b>\s*/dev/|:>\s*/|chmod\s+-R\s+777|"
r"\bsudo\b|\breboot\b|\bshutdown\b|\bsystemctl\b",
re.IGNORECASE,
)
SPINNER_FRAMES = ["", "", "", "", "", "", "", "", "", ""]
SYSTEM_PROMPT = f"""You are a vibe coding assistant. The user's name is {USER_NAME}. Use their name occasionally in conversation but NEVER embed it in code, filenames, or output unless explicitly asked.
You help {USER_NAME} by writing code and running shell commands.
TOOLS — use fenced code blocks with these tags:
bash — run a shell command, example:
```bash
ls -la
```
write:<filename> — create/overwrite a file, example:
```write:hello.c
#include <stdio.h>
int main() {{ printf("Hello\\n"); return 0; }}
```
edit:<filename> — search/replace in a file, example:
```edit:hello.c
<<<SEARCH
printf("Hello\\n");
===
printf("Hi there\\n");
>>>
```
memory:<key> — store a note, forget:<key> — clear a note.
read:<filename> — read a file's contents, example:
```read:main.c
```
IMPORTANT: Replace the examples above with ACTUAL commands and code for the task. Never output placeholder text like "command here" or "file content here".
Only use tools (bash, write, edit, memory) when the user asks you to create, modify, or run something. For general questions, just answer in plain text — do NOT create files or memory entries for conversational replies.
BEHAVIOR:
- Do ONLY what is asked. Nothing more.
- Do NOT compile, run, test, or launch emulators (qemu, etc.) unless explicitly asked.
- Do NOT add extra steps beyond the request (no ls, cd, descriptions, verification).
- If a description.md exists, follow it — but only do what it says, not more.
- For small changes, prefer ```edit:filename``` over rewriting the whole file.
- If a command fails and the user asked you to run it, read the error output carefully, fix the code, and retry until it works.
- If a program runs but produces wrong output, analyze the output and fix the code.
- cd commands persist between bash blocks. File writes also respect the current directory.
- CRITICAL: All commands, file writes, edits, and memory ops MUST be inside ```fenced code blocks```. Never output them as plain text.
OUTPUT:
- Be EXTREMELY concise. No filler, no preamble.
- Do NOT repeat code you just wrote — the user sees it in the work window.
- ONE LINE summary after completing a task. Nothing more.
- If there's nothing to add, say nothing."""
def _dw(s):
"""Display width of a string, accounting for wide chars (emoji etc)."""
w = 0
for c in s:
w += 2 if unicodedata.east_asian_width(c) in ('W', 'F') else 1
return w
def _wrap_dw(s, maxw):
"""Word-wrap string by display width, returns list of lines."""
if _dw(s) <= maxw:
return [s]
lines = []
cur = ""
cw = 0
for word in re.split(r'(\s+)', s):
ww = _dw(word)
if cw + ww <= maxw:
cur += word
cw += ww
elif ww > maxw:
# Word itself is wider than maxw — character-wrap it
for c in word:
cw2 = 2 if unicodedata.east_asian_width(c) in ('W', 'F') else 1
if cw + cw2 > maxw:
lines.append(cur)
cur = c
cw = cw2
else:
cur += c
cw += cw2
else:
if cur:
lines.append(cur)
cur = word.lstrip() if not lines else word
cw = _dw(cur)
if cur:
lines.append(cur)
return lines if lines else [""]
class Vibe:
def __init__(self, stdscr):
self.stdscr = stdscr
self.messages = [{"role": "system", "content": SYSTEM_PROMPT}]
self.work_lines = []
self.chat_input = ""
self.chat_history = []
self.busy = False
self.stop_flag = False
self.spinner_idx = 0
self.memory = {}
self.ai_history = []
self.recent_files = []
self.lock = threading.Lock()
self.scroll_offset = 0 # 0 = bottom (latest), >0 = scrolled up
self.yolo_mode = False # skip dangerous command confirmation
self.pending_cmd = None # command awaiting confirmation
self.multiline_mode = False
self.multiline_buf = []
self.code_mode = True # True = code mode, False = chat mode
self.chat_messages = [] # separate message history for chat mode
self.cursor_pos = 0 # cursor position within chat_input
self.code_input_history = self._load_input_history("code")
self.chat_input_history = self._load_input_history("chat")
self.history_idx = -1 # -1 = not browsing history
self.saved_input = "" # saved current input when browsing history
self.cwd = os.getcwd() # persistent working directory for commands
self.root_dir = self.cwd # sandbox — never allow navigating above this
self._last_backup = None # (filename, content) for /undo
curses.curs_set(1)
curses.start_color()
curses.use_default_colors()
curses.init_pair(1, curses.COLOR_GREEN, -1)
curses.init_pair(2, curses.COLOR_CYAN, -1)
curses.init_pair(3, curses.COLOR_YELLOW, -1)
curses.init_pair(4, curses.COLOR_RED, -1)
curses.init_pair(5, curses.COLOR_BLACK, curses.COLOR_YELLOW)
curses.init_pair(6, curses.COLOR_WHITE, curses.COLOR_BLUE)
curses.init_pair(7, curses.COLOR_MAGENTA, -1) # for warnings
self.stdscr.nodelay(False)
self.stdscr.timeout(100)
try:
curses.mousemask(curses.ALL_MOUSE_EVENTS)
except Exception:
pass
self.ollama_status = "" # GPU/CPU status string
self._start_status_poller()
self._load_project_context()
self._load_worklog()
def _start_status_poller(self):
def poll():
host = OLLAMA_URL.rsplit("/api/chat", 1)[0] if BACKEND != "vllm" else OLLAMA_URL.rsplit("/v1/chat/completions", 1)[0]
while True:
try:
if BACKEND == "vllm":
r = requests.get(host + "/v1/models", headers=AUTH_HEADERS, timeout=5)
if r.status_code == 200:
self.ollama_status = "online"
else:
self.ollama_status = "error"
else:
r = requests.get(host + "/api/ps", headers=AUTH_HEADERS, timeout=5)
models = r.json().get("models", [])
if models:
parts = []
for m in models:
total = m.get("size", 1)
vram = m.get("size_vram", 0)
gpu_pct = int(vram * 100 / total) if total else 0
cpu_pct = 100 - gpu_pct
parts.append(f"GPU:{gpu_pct}% CPU:{cpu_pct}%")
self.ollama_status = " | ".join(parts)
else:
self.ollama_status = "idle"
except Exception:
self.ollama_status = "offline"
time.sleep(15)
t = threading.Thread(target=poll, daemon=True)
t.start()
def _load_file_context(self, filepath):
"""Load a file as project context for the AI."""
if not os.path.isabs(filepath):
filepath = os.path.join(self.cwd, filepath)
try:
with open(filepath) as f:
content = f.read().strip()
self.messages.append(
{"role": "system", "content": f"Contents of {filepath}:\n{content}"}
)
self._add_work(f"📋 Loaded '{filepath}' as context")
except Exception as e:
self._add_work(f"❌ Could not read '{filepath}': {e}")
def _load_project_context(self):
"""Auto-scan project tree and load description file on startup."""
# Ensure .gitignore excludes session files
gi_entries = {".ai-history", ".input-history"}
gi_path = os.path.join(self.root_dir, ".gitignore")
existing = set()
if os.path.exists(gi_path):
with open(gi_path) as f:
existing = set(l.strip() for l in f)
missing = gi_entries - existing
if missing:
with open(gi_path, "a") as f:
for entry in sorted(missing):
f.write(f"{entry}\n")
# Load description file if present
if os.path.exists(DESCRIPTION_FILE):
try:
with open(DESCRIPTION_FILE) as f:
desc = f.read().strip()
if desc:
self.messages.append(
{"role": "system", "content": f"Project description:\n{desc}"}
)
self._add_work(f"📋 Loaded project description")
except Exception:
pass
else:
self._add_work(f"💡 Tip: Create '{DESCRIPTION_FILE}' to describe your project for the AI")
# Build a compact file tree (max 100 entries, skip hidden/build dirs)
skip = {'.git', '__pycache__', 'node_modules', '.venv', 'venv', 'build', 'dist', '.ai-history'}
tree = []
for root, dirs, files in os.walk("."):
dirs[:] = [d for d in dirs if d not in skip and not d.startswith('.')]
depth = root.count(os.sep)
if depth > 3:
dirs.clear()
continue
for f in files:
path = os.path.join(root, f)
if len(tree) >= 100:
break
tree.append(path[2:] if path.startswith("./") else path)
if len(tree) >= 100:
break
if tree:
self.messages.append(
{"role": "system", "content": "Project files:\n" + "\n".join(tree)}
)
self._add_work(f"📂 Scanned project: {len(tree)} files")
def _trim_context(self):
"""Keep message list manageable by summarizing old messages."""
# Count non-system messages
non_sys = [(i, m) for i, m in enumerate(self.messages) if m["role"] != "system"]
if len(non_sys) <= MAX_CONTEXT_MESSAGES:
return
# Summarize the oldest half of non-system messages
cut = len(non_sys) // 2
to_remove = non_sys[:cut]
summary_parts = []
for _, m in to_remove:
role = m["role"]
text = m["content"][:150]
summary_parts.append(f"{role}: {text}")
summary = "Conversation summary (older messages):\n" + "\n".join(summary_parts)
# Remove old messages and insert summary after system messages
indices_to_remove = set(i for i, _ in to_remove)
self.messages = [m for i, m in enumerate(self.messages) if i not in indices_to_remove]
# Find insertion point (after last system message)
insert_at = 0
for i, m in enumerate(self.messages):
if m["role"] == "system":
insert_at = i + 1
self.messages.insert(insert_at, {"role": "system", "content": summary})
def _is_dangerous(self, cmd):
"""Check if a command matches dangerous patterns."""
return bool(DANGEROUS_PATTERNS.search(cmd))
def _edit_file(self, filename, content):
"""Apply search/replace edits to an existing file."""
if not os.path.isabs(filename):
filename = os.path.join(self.cwd, filename)
if not os.path.realpath(filename).startswith(self.root_dir):
self._add_work(f"❌ Edit blocked: {filename} is outside project directory")
return f"Error: {filename} is outside project directory"
if not os.path.exists(filename):
self._add_work(f"❌ Edit failed: {filename} not found")
return f"Error: {filename} not found"
with open(filename) as f:
original = f.read()
modified = original
# Parse <<<SEARCH ... === ... >>> or <<< ... === ... >>> blocks
edits = re.findall(r"<<<\s*(?:SEARCH\s*)?\n?(.*?)\n\s*===\s*\n(.*?)\n\s*>>>", content, re.DOTALL)
if not edits:
# Fallback: strip any stray markers and treat as full file rewrite
clean = re.sub(r"^<<<\s*(?:SEARCH)?\s*\n?|^\s*===\s*\n|^\s*>>>\s*$", "", content, flags=re.MULTILINE).strip()
self._write_file(filename, clean + "\n")
return f"Rewrote {filename} (no edit markers, used as full rewrite)"
applied = 0
for search, replace in edits:
search = search.rstrip()
replace = replace.rstrip()
if search in modified:
modified = modified.replace(search, replace, 1)
applied += 1
else:
self._add_work(f"⚠ Search text not found in {filename}")
if applied > 0:
with open(filename, "w") as f:
f.write(modified)
self._add_work(f"✏️ Edited {filename} ({applied} change{'s' if applied != 1 else ''})")
self._log_history(f"EDITED: {filename}")
if filename in self.recent_files:
self.recent_files.remove(filename)
self.recent_files.insert(0, filename)
self.recent_files = self.recent_files[:10]
return f"Edited {filename}: {applied} change(s) applied"
@property
def input_history(self):
return self.code_input_history if self.code_mode else self.chat_input_history
def _load_input_history(self, prefix):
hist = []
if os.path.exists(INPUT_HISTORY_FILE):
with open(INPUT_HISTORY_FILE) as f:
for l in f:
l = l.rstrip("\n")
if l.startswith(f"{prefix}:"):
hist.append(l[len(prefix) + 1:])
return hist[-INPUT_HISTORY_MAX:]
def _save_input_history(self, entry):
hist = self.input_history
hist.append(entry)
while len(hist) > INPUT_HISTORY_MAX:
hist.pop(0)
# Rewrite file with all entries from both modes
prefix = "code" if self.code_mode else "chat"
other_prefix = "chat" if self.code_mode else "code"
other = self.chat_input_history if self.code_mode else self.code_input_history
try:
lines = [f"{other_prefix}:{e}" for e in other]
lines += [f"{prefix}:{e}" for e in hist]
with open(INPUT_HISTORY_FILE, "w") as f:
f.write("\n".join(lines) + "\n")
except Exception:
pass
def _load_worklog(self):
if os.path.exists(WORKLOG_FILE):
with open(WORKLOG_FILE) as f:
lines = f.readlines()
# Only load last 100 lines to avoid filling context
log = "".join(lines[-100:]).strip()
if log:
self.messages.append(
{"role": "system", "content": f"Previous worklog (recent):\n{log}"}
)
self._add_work(f"📖 Resumed from '{WORKLOG_FILE}'")
def _write_worklog(self, entry):
with open(WORKLOG_FILE, "a") as f:
f.write(f"\n## {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n{entry}\n")
def _log_history(self, action):
"""Log an action to .ai-history, keeping last N entries."""
ts = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.ai_history.append(f"[{ts}] {action}")
self.ai_history = self.ai_history[-AI_HISTORY_MAX:]
try:
with open(AI_HISTORY_FILE, "w") as f:
f.write("\n".join(self.ai_history) + "\n")
except Exception:
pass
def _add_work(self, text):
with self.lock:
try:
w = self.stdscr.getmaxyx()[1]
except Exception:
w = 80
for line in text.split("\n"):
for wl in _wrap_dw(line, w):
self.work_lines.append(wl)
def _add_chat(self, text):
with self.lock:
for line in text.split("\n"):
self.chat_history.append(line)
def _run_command(self, cmd):
# Check for dangerous commands
if not self.yolo_mode and self._is_dangerous(cmd):
self._add_work(f"⚠️ Dangerous command blocked: {cmd}")
self._add_work(" Use /yolo to enable, or /allow to run this one")
self.pending_cmd = cmd
return "Command blocked as potentially dangerous. User must approve.", 1
self.pending_cmd = None
self._add_work(f"▶ [{self.cwd}] $ {cmd}")
self._log_history(f"RAN: {cmd}")
# Append a pwd at the end so we can track cd changes
wrapped = cmd + '\necho "___CWD___:$(pwd)"'
try:
result = subprocess.run(
wrapped, shell=True, capture_output=True, text=True, timeout=60,
cwd=self.cwd
)
stdout = result.stdout.strip()
stderr = result.stderr.strip()
# Extract new cwd from output
out_lines = stdout.split("\n")
clean_lines = []
for line in out_lines:
if line.startswith("___CWD___:"):
new_cwd = line[10:]
if os.path.isdir(new_cwd) and os.path.realpath(new_cwd).startswith(self.root_dir):
with self.lock:
self.cwd = new_cwd
else:
clean_lines.append(line)
stdout = "\n".join(clean_lines).strip()
if stdout:
self._add_work(stdout)
if stderr and stderr != stdout:
self._add_work(stderr)
status = "" if result.returncode == 0 else f"✗ (exit {result.returncode})"
self._add_work(f" {status}")
parts = [f"exit_code: {result.returncode}"]
if stdout:
parts.append(f"stdout: {stdout}")
if stderr:
parts.append(f"stderr: {stderr}")
return "\n".join(parts), result.returncode
except subprocess.TimeoutExpired:
self._add_work(" ✗ (timeout)")
return "Command timed out after 60s", 1
def _write_file(self, filename, content):
if not os.path.isabs(filename):
filename = os.path.join(self.cwd, filename)
if not os.path.realpath(filename).startswith(self.root_dir):
self._add_work(f"❌ Write blocked: {filename} is outside project directory")
return False
# Backup existing file before overwriting
if os.path.exists(filename):
try:
with open(filename) as f:
self._last_backup = (filename, f.read())
except Exception:
pass
os.makedirs(os.path.dirname(filename), exist_ok=True) if os.path.dirname(filename) else None
with open(filename, "w") as f:
f.write(content)
self._add_work(f"📝 Wrote {filename}")
self._log_history(f"WROTE: {filename}")
# Track recent files
if filename in self.recent_files:
self.recent_files.remove(filename)
self.recent_files.insert(0, filename)
self.recent_files = self.recent_files[:10]
return True
def _process_response(self, text):
"""Extract and execute bash commands and file writes from AI response."""
results = []
# Find fenced code blocks (``` or ~~~)
blocks = re.findall(r"```\s*(\w[\w:./\-]*)\n(.*?)```", text, re.DOTALL)
blocks += re.findall(r"~~~\s*(\w[\w:./\-]*)\n(.*?)~~~", text, re.DOTALL)
# Also catch unfenced edit blocks (AI sometimes forgets fences)
unfenced = re.findall(r"(?:^|\n)\s*(edit:[\w./\-]+)\s*\n(<<<\s*SEARCH\s*\n.*?>>>)", text, re.DOTALL)
seen = set()
for lang, content in blocks:
seen.add(content.strip())
for lang, content in unfenced:
if content.strip() not in seen:
blocks.append((lang, content))
for lang, content in blocks:
lang_lower = lang.lower()
if lang_lower == "bash":
if self.stop_flag:
break
# Detect misplaced edit/write blocks inside bash
cmd = content.strip()
m = re.match(r"^(edit|write):(\S+)\s*\n", cmd, re.IGNORECASE)
if m:
action, fname = m.group(1).lower(), m.group(2)
body = cmd[m.end():]
if action == "edit":
result = self._edit_file(fname, body)
results.append(result)
else:
if self._write_file(fname, body):
results.append(f"Wrote file: {fname}")
else:
results.append(f"Write blocked: {fname}")
elif re.match(r"^(Command\s|exit_code:|stdout:|stderr:|Results:)", cmd):
# AI echoed back feedback format — skip it
pass
elif re.search(r"(^import |^from |^def |^class |^#include\s|= init_|\.connect\()", cmd, re.MULTILINE):
# AI put source code in a bash block — skip it
self._add_work("⚠ Skipped: source code detected in bash block")
pass
else:
output, rc = self._run_command(cmd)
results.append(f"Command `{cmd}`:\n{output}")
elif lang_lower.startswith("write:"):
filename = lang[6:]
if self._write_file(filename, content):
results.append(f"Wrote file: {filename}")
else:
results.append(f"Write blocked: {filename}")
elif lang_lower.startswith("edit:"):
filename = lang[5:]
result = self._edit_file(filename, content)
results.append(result)
elif lang_lower.startswith("memory:"):
key = lang[7:]
self.memory[key] = content.strip()
self._add_work(f"🧠 Memorized '{key}'")
self._log_history(f"MEMORIZED: {key}")
results.append(f"Stored memory: {key}")
elif lang_lower.startswith("forget:"):
key = lang[7:].strip() or content.strip()
if self.memory.pop(key, None) is not None:
self._add_work(f"🧠 Forgot '{key}'")
self._log_history(f"FORGOT: {key}")
results.append(f"Cleared memory: {key}")
elif lang_lower.startswith("read:"):
filename = lang[5:]
if not os.path.isabs(filename):
filename = os.path.join(self.cwd, filename)
try:
with open(filename) as f:
file_content = f.read()
self._add_work(f"📄 Read {filename}")
results.append(f"Contents of {filename}:\n{file_content}")
except Exception as e:
results.append(f"Error reading {filename}: {e}")
else:
# Unknown language tag — check if first line is write:/edit:
first_line = content.strip().split("\n", 1)[0]
m = re.match(r"^(write|edit):(\S+)", first_line, re.IGNORECASE)
if m:
action, fname = m.group(1).lower(), m.group(2)
body = content.strip().split("\n", 1)[1] if "\n" in content.strip() else ""
if action == "write":
if self._write_file(fname, body):
results.append(f"Wrote file: {fname}")
else:
results.append(self._edit_file(fname, body))
return results
def _memory_context(self):
"""Build a system message with current memory contents."""
if not self.memory:
return None
lines = [f"[{k}]: {v}" for k, v in self.memory.items()]
return {"role": "system", "content": "Current memory:\n" + "\n".join(lines)}
def _build_messages(self):
"""Build message list with memory injected, trimming old context."""
self._trim_context()
msgs = list(self.messages)
mem = self._memory_context()
if mem:
msgs.insert(1, mem) # after system prompt
return msgs
def _stream_response(self):
"""Stream a response from the LLM backend, return full text."""
self._add_work("🔌 Connecting to server...")
resp = None
if BACKEND == "vllm":
payload = {"model": MODEL, "messages": self._build_messages(), "stream": True, "max_tokens": CONTEXT_SIZE}
else:
payload = {"model": MODEL, "messages": self._build_messages(), "stream": True, "options": {"num_ctx": CONTEXT_SIZE}}
for attempt in range(3):
try:
resp = requests.post(
OLLAMA_URL,
json=payload,
headers=AUTH_HEADERS,
stream=True,
timeout=120,
)
break
except (requests.ConnectionError, requests.Timeout) as e:
if attempt < 2:
self._add_work(f"⚠ Connection failed (attempt {attempt + 1}/3), retrying...")
time.sleep(2)
else:
self._add_work(f"❌ Failed to connect after 3 attempts: {e}")
return ""
if resp.status_code != 200:
err = resp.text[:200]
self._add_work(f"❌ Server error ({resp.status_code}): {err}")
return ""
# Remove the "Connecting" line now that we're streaming
with self.lock:
self.work_lines = [l for l in self.work_lines if l != "🔌 Connecting to server..."]
full = ""
for line in resp.iter_lines():
if self.stop_flag:
self._add_work("\n⛔ Stopped by user")
break
if line:
if BACKEND == "vllm":
line_str = line.decode("utf-8") if isinstance(line, bytes) else line
if not line_str.startswith("data: "):
continue
line_str = line_str[6:]
if line_str.strip() == "[DONE]":
break
data = json.loads(line_str)
token = (data.get("choices", [{}])[0].get("delta", {}).get("content") or "")
else:
data = json.loads(line)
token = data.get("message", {}).get("content", "")
if token:
full += token
with self.lock:
try:
w = self.stdscr.getmaxyx()[1]
except Exception:
w = 80
parts = token.split("\n")
self.work_lines[-1] += parts[0]
# Wrap current line if it exceeds display width
if _dw(self.work_lines[-1]) > w:
wrapped = _wrap_dw(self.work_lines[-1], w)
self.work_lines[-1] = wrapped[0]
self.work_lines.extend(wrapped[1:])
for p in parts[1:]:
for wl in _wrap_dw(p, w):
self.work_lines.append(wl)
# Clean up code fence markers from display
with self.lock:
self.work_lines = [
l for l in self.work_lines
if not re.match(r"^`{3}(?:bash|write:\S+|edit:\S+|memory:\S+|forget:\S+|\w+)?$", l.strip())
]
return full
def _query_ollama(self, user_msg):
self.busy = True
self.stop_flag = False
if not self.code_mode:
# Chat mode — use separate message list, don't touch code history
self.chat_messages.append({"role": "user", "content": user_msg})
self._add_work(f"\n🧑 {USER_NAME}: {user_msg}")
self._log_history(f"USER [chat]: {user_msg}")
try:
self._add_work("📠 AI: ")
# Build chat messages: system prompt + chat history
chat_msgs = [{"role": "system", "content": f"You are chatting with {USER_NAME}. The user's name is {USER_NAME} — use it. Respond in plain text only. No code blocks, no file writes, no commands. Just discuss and reason naturally."}]
chat_msgs += self.chat_messages[-MAX_CONTEXT_MESSAGES:]
# Temporarily swap messages for streaming
orig = self.messages
self.messages = chat_msgs
full_response = self._stream_response()
self.messages = orig
if full_response:
self.chat_messages.append({"role": "assistant", "content": full_response})
self._log_history(f"AI [chat]: {full_response[:100]}")
self._write_worklog(f"User [chat]: {user_msg}\nAI: {full_response[:200]}")
except Exception as e:
self._add_work(f"❌ Error: {e}")
finally:
self.busy = False
return
# Code mode — use main message list
self.messages.append({"role": "user", "content": user_msg})
# Include chat summary if there's been chat discussion
if self.chat_messages:
summary = "\n".join(f"{m['role']}: {m['content'][:150]}" for m in self.chat_messages[-10:])
self.messages.append({"role": "system", "content": f"Recent chat discussion for context:\n{summary}"})
self._add_work(f"\n🧑 {USER_NAME}: {user_msg}")
self._log_history(f"USER: {user_msg}")
try:
self._add_work("📠 AI: ")
full_response = self._stream_response()
if not full_response:
return
self.messages.append({"role": "assistant", "content": full_response})
self._log_history(f"AI: {full_response[:100]}")
# Autonomous loop: keep processing actions and feeding back results
max_rounds = 5
last_feedback = None
for _ in range(max_rounds):
if self.stop_flag:
break
action_results = self._process_response(full_response)
if not action_results:
break
# Stop looping if a command was blocked — wait for user to /allow
if any("blocked" in r.lower() for r in action_results):
self._write_worklog(f"User: {user_msg}\nAI acted.\nResults:\n" + "\n".join(action_results))
break
feedback = "Results:\n" + "\n".join(action_results)
# Stop if same results as last round (no progress)
if feedback == last_feedback:
self._add_work("⚠ AI is stuck (same results), stopping.")
self._write_worklog(f"User: {user_msg}\nAI acted.\n{feedback}")
break
last_feedback = feedback
self._write_worklog(f"User: {user_msg}\nAI acted.\n{feedback}")
self.messages.append({"role": "user", "content": feedback})
self._add_work("\n📠 AI (continuing): ")
full_response = self._stream_response()
if not full_response:
break
self.messages.append({"role": "assistant", "content": full_response})
self._log_history(f"AI: {full_response[:100]}")
if not any(self._process_response(full_response) for _ in [0]):
self._write_worklog(f"User: {user_msg}\nAI: {full_response[:200]}")
except Exception as e:
self._add_work(f"❌ Error: {e}")
finally:
self.busy = False
def _save_config_model(self, new_model):
"""Update model in config file."""
conf = {}
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE) as f:
for line in f:
line = line.strip()
if "=" in line and not line.startswith("#"):
k, v = line.split("=", 1)
conf[k.strip()] = v.strip()
conf["model"] = new_model
with open(CONFIG_FILE, "w") as f:
for k in REQUIRED_KEYS + OPTIONAL_KEYS:
if k in conf:
f.write(f"{k}={conf[k]}\n")
def _show_model_picker(self):
"""Show a popup to pick a model from the server."""
global MODEL
self._add_work("⏳ Fetching models...")
self._draw()
host = OLLAMA_URL.rsplit("/api/chat", 1)[0] if BACKEND != "vllm" else OLLAMA_URL.rsplit("/v1/chat/completions", 1)[0]
try:
if BACKEND == "vllm":
r = requests.get(host + "/v1/models", headers=AUTH_HEADERS, timeout=10)
r.raise_for_status()
models = sorted(m["id"] for m in r.json().get("data", []))
else:
r = requests.get(host + "/api/tags", headers=AUTH_HEADERS, timeout=10)
r.raise_for_status()
models = sorted(m["name"] for m in r.json().get("models", []))
except Exception as e:
self._add_work(f"❌ Failed to fetch models: {e}")
return
if not models:
self._add_work("❌ No models available on server")
return
sel = models.index(MODEL) if MODEL in models else 0
self.stdscr.nodelay(False)
self.stdscr.timeout(-1)
while True:
h, w = self.stdscr.getmaxyx()
# Popup dimensions
box_w = min(60, w - 4)
box_h = min(len(models) + 4, h - 4)
visible = box_h - 4
x = (w - box_w) // 2
y = (h - box_h) // 2
# Scroll window
scroll = max(0, sel - visible + 1)
# Draw popup
for row in range(box_h):
self._safe_addstr(y + row, x, " " * box_w, w, curses.color_pair(6))
title = " SELECT MODEL "
tpad = box_w - len(title)
self._safe_addstr(y, x, "" * (tpad // 2) + title + "" * ((tpad + 1) // 2), w, curses.color_pair(6))
self._safe_addstr(y + 1, x, f" Current: {MODEL}"[:box_w], w, curses.color_pair(6))
self._safe_addstr(y + box_h - 1, x, " ↑↓ select Enter confirm Esc cancel "[:box_w], w, curses.color_pair(3))
for i in range(visible):
idx = scroll + i
if idx >= len(models):
break
name = models[idx][:box_w - 4]
row = y + 2 + i
if idx == sel:
self._safe_addstr(row, x, f"{name:<{box_w - 3}}", w, curses.color_pair(5) | curses.A_BOLD)
else:
self._safe_addstr(row, x, f" {name:<{box_w - 3}}", w, curses.color_pair(6))
self.stdscr.refresh()
ch = self.stdscr.getch()
if ch == 27: # Esc
break
elif ch == curses.KEY_UP and sel > 0:
sel -= 1
elif ch == curses.KEY_DOWN and sel < len(models) - 1:
sel += 1
elif ch in (curses.KEY_ENTER, 10, 13):
MODEL = models[sel]
self._save_config_model(MODEL)
self._add_work(f"✅ Model changed to {MODEL}")
break
self.stdscr.timeout(100)
def _get_editor(self):
"""Get user's preferred editor."""
return os.environ.get("VISUAL") or os.environ.get("EDITOR") or "vi"
def _edit_in_editor(self, filepath):
"""Open a file in the user's editor, suspending curses."""
if not os.path.isabs(filepath):
filepath = os.path.join(self.cwd, filepath)
if not os.path.exists(filepath):
self._add_work(f"❌ File not found: {filepath}")
return
editor = self._get_editor()
curses.endwin()
try:
subprocess.run([editor, filepath])
except Exception as e:
pass
self.stdscr.refresh()
self._add_work(f"📝 Returned from {editor}: {filepath}")
def _safe_addstr(self, row, col, text, maxw, attr=0):
"""Write text to screen, safely clamped to bounds."""
if row < 0 or col >= maxw:
return
text = text[:maxw - col]
if not text:
return
try:
self.stdscr.addnstr(row, col, text, maxw - col, attr)
except curses.error:
pass
def _draw(self):
h, w = self.stdscr.getmaxyx()
if h < 4 or w < 10:
return
work_h = max(2, int(h * 0.9))
chat_h = h - work_h
self.stdscr.erase()
# --- Top: work window ---
title = " WORK "
bar_attr = curses.color_pair(2)
if self.busy:
frame = SPINNER_FRAMES[self.spinner_idx % len(SPINNER_FRAMES)]
self.spinner_idx += 1
title = f" {frame} WORKING {frame} "
if int(time.time() * 3) % 2:
title_attr = curses.color_pair(5) | curses.A_BOLD
else:
title_attr = curses.color_pair(4) | curses.A_BOLD
else:
title_attr = bar_attr
pad = w - len(title)
left_pad = pad // 2
right_pad = (pad + 1) // 2
self._safe_addstr(0, 0, "" * left_pad, w, bar_attr)
self._safe_addstr(0, left_pad, title, w, title_attr)
self._safe_addstr(0, left_pad + len(title), "" * right_pad, w, bar_attr)
# Holck Editor label on top-left
he_label = " Holck Editor "
if len(he_label) < left_pad:
self._safe_addstr(0, 0, he_label, w, curses.color_pair(3))
# Ollama status in top-right
if self.ollama_status:
status_str = f" {self.ollama_status} "
sx = w - len(status_str)
if sx > left_pad + len(title):
self._safe_addstr(0, sx, status_str, w, curses.color_pair(3))
with self.lock:
total = len(self.work_lines)
view_h = work_h - 1
# Clamp scroll offset
max_scroll = max(0, total - view_h)
self.scroll_offset = max(0, min(self.scroll_offset, max_scroll))
if self.scroll_offset > 0:
start = total - view_h - self.scroll_offset
start = max(0, start)
visible = self.work_lines[start:start + view_h]
else:
visible = self.work_lines[-view_h:]
# Wrap long lines at display time
wrapped_visible = []
for line in visible:
for wl in _wrap_dw(line, w):
wrapped_visible.append(wl)
visible = wrapped_visible[-view_h:] if len(wrapped_visible) > view_h else wrapped_visible
# Determine initial color by scanning backwards from visible start
cur_attr = 0
with self.lock:
# Find where visible starts in work_lines
total_wl = len(self.work_lines)
if self.scroll_offset > 0:
vis_start = max(0, total_wl - (work_h - 1) - self.scroll_offset)
else:
vis_start = max(0, total_wl - (work_h - 1))
for j in range(vis_start - 1, -1, -1):
pl = self.work_lines[j]
if pl.startswith("🧑 "):
cur_attr = curses.color_pair(1); break
elif pl.startswith("📠 AI"):
cur_attr = curses.color_pair(2); break
elif pl.startswith(""):
cur_attr = curses.color_pair(3); break
elif pl.startswith("") or pl.startswith(""):
cur_attr = curses.color_pair(4); break
elif pl.startswith("📝 ") or pl.startswith("✏️"):
cur_attr = curses.color_pair(7); break
for i, line in enumerate(visible):
row = 1 + i
if row >= work_h:
break
# Colorize by content type — sticky for continuation lines
if line.startswith("🧑 ") or line.startswith(f"🧑 {USER_NAME}"):
cur_attr = curses.color_pair(1) # green — user
elif line.startswith("📠 AI"):
cur_attr = curses.color_pair(2) # cyan — AI
elif line.startswith(""):
cur_attr = curses.color_pair(3) # yellow — command
elif line.startswith(""):
cur_attr = curses.color_pair(1) # green — success
elif line.startswith("") or line.startswith(""):
cur_attr = curses.color_pair(4) # red — error
elif line.startswith(""):
cur_attr = curses.color_pair(3) # yellow — warning
elif line.startswith("📝 ") or line.startswith("✏️"):
cur_attr = curses.color_pair(7) # magenta — file write/edit
elif line.startswith("🔌 ") or line.startswith("📋 ") or line.startswith("📂 ") or line.startswith("📖 ") or line.startswith("💡"):
cur_attr = curses.color_pair(2) # cyan — info
self._safe_addstr(row, 0, line[:w], w, cur_attr)
# --- Recent files panel (top-right) ---
if self.scroll_offset > 0:
indicator = f"{self.scroll_offset} lines above "
self._safe_addstr(1, 0, indicator, w, curses.color_pair(3))
if self.recent_files:
panel_w = min(30, w // 3)
panel_h = min(len(self.recent_files) + 2, work_h - 1)
px = w - panel_w
py = 1
# Draw panel
header = " FILES "
hpad = panel_w - len(header)
hbar = ("" * (hpad // 2)) + header + ("" * ((hpad + 1) // 2))
self._safe_addstr(py, px, hbar, w, curses.color_pair(6))
for fi, fname in enumerate(self.recent_files):
row = py + 1 + fi
if row >= py + panel_h - 1:
break
label = os.path.relpath(fname, self.cwd) if os.path.isabs(fname) else fname
label = label[:panel_w - 2]
self._safe_addstr(row, px, f" {label:<{panel_w - 1}}", w, curses.color_pair(6))
bot_row = min(py + panel_h - 1, work_h - 1)
self._safe_addstr(bot_row, px, "" * panel_w, w, curses.color_pair(6))
# --- Bottom: chat window ---
sep_row = work_h
mode_label = "CODE" if self.code_mode else "CHAT"
sep_title = f" {mode_label} {MODEL}@{HOST_SHORT} "
sep_color = curses.color_pair(1) if self.code_mode else curses.color_pair(3)
spad = w - len(sep_title)
sep = ("" * (spad // 2)) + sep_title + ("" * ((spad + 1) // 2))
self._safe_addstr(sep_row, 0, sep, w, sep_color)
# Chat history lines
chat_display_h = chat_h - 2 # minus separator and input line
with self.lock:
chat_vis = self.chat_history[-chat_display_h:] if chat_display_h > 0 else []
for i, line in enumerate(chat_vis):
row = sep_row + 1 + i
if row >= h - 1:
break
self._safe_addstr(row, 0, line[:w], w)
# Input area with word-wrapping
prompt = "▸▸ " if self.multiline_mode else ("" if self.code_mode else "")
plen = len(prompt)
avail_first = w - plen # chars on first line (after prompt)
avail_rest = w # chars on continuation lines
# Build wrapped lines: first line has prompt, rest are full-width
inp = self.chat_input
wrapped = []
if avail_first > 0:
wrapped.append(inp[:avail_first])
inp = inp[avail_first:]
while inp and avail_rest > 0:
wrapped.append(inp[:avail_rest])
inp = inp[avail_rest:]
if not wrapped:
wrapped = [""]
num_input_lines = len(wrapped)
input_start_row = h - num_input_lines
try:
for li, chunk in enumerate(wrapped):
row = input_start_row + li
if row < sep_row + 1:
continue
if li == 0:
self.stdscr.attron(curses.color_pair(1))
self.stdscr.addnstr(row, 0, prompt, w)
self.stdscr.attroff(curses.color_pair(1))
self.stdscr.addnstr(row, plen, chunk, avail_first)
else:
self.stdscr.addnstr(row, 0, chunk, avail_rest)
self.stdscr.clrtoeol()
# Position cursor
cp = self.cursor_pos
if cp <= avail_first:
cur_row = input_start_row
cur_col = plen + cp
else:
cp2 = cp - avail_first
cur_row = input_start_row + 1 + (cp2 // avail_rest if avail_rest else 0)
cur_col = cp2 % avail_rest if avail_rest else 0
self.stdscr.move(min(cur_row, h - 1), min(cur_col, w - 1))
except curses.error:
pass
self.stdscr.refresh()
def run(self):
self._add_work(f"🚀 Welcome back, {USER_NAME}! — Ctrl+D to stop AI, Ctrl+C to quit")
self._add_work("💡 Tab to toggle CODE/CHAT mode | /help for all commands")
if not MODEL:
self._add_work("⚠ No model chosen. Use /model to select one.")
self._add_work("")
while True:
self._draw()
try:
ch = self.stdscr.getch()
except KeyboardInterrupt:
break
if ch == -1:
continue
elif ch == 4: # Ctrl+D
self.stop_flag = True
self._add_work("⛔ Stop signal sent")
elif ch == 3: # Ctrl+C
break
elif ch == 9: # Tab — toggle code/chat mode
self.code_mode = not self.code_mode
elif ch == curses.KEY_MOUSE:
try:
_, _, _, _, bstate = curses.getmouse()
if bstate & curses.BUTTON4_PRESSED:
self.scroll_offset += 3
elif bstate & curses.BUTTON5_PRESSED:
self.scroll_offset = max(0, self.scroll_offset - 3)
except curses.error:
pass
elif ch == curses.KEY_SR or ch == 575: # Shift+Up or Ctrl+Up — scroll up
self.scroll_offset += 3
elif ch == curses.KEY_SF or ch == 534: # Shift+Down or Ctrl+Down — scroll down
self.scroll_offset = max(0, self.scroll_offset - 3)
elif ch == curses.KEY_PPAGE: # Page Up
h, _ = self.stdscr.getmaxyx()
self.scroll_offset += max(1, int(h * 0.9) - 5)
elif ch == curses.KEY_NPAGE: # Page Down
h, _ = self.stdscr.getmaxyx()
self.scroll_offset = max(0, self.scroll_offset - max(1, int(h * 0.9) - 5))
elif ch == curses.KEY_UP: # Arrow Up — previous history
if self.input_history:
if self.history_idx == -1:
self.saved_input = self.chat_input
self.history_idx = len(self.input_history) - 1
elif self.history_idx > 0:
self.history_idx -= 1
self.chat_input = self.input_history[self.history_idx]
self.cursor_pos = len(self.chat_input)
elif ch == curses.KEY_DOWN: # Arrow Down — next history
if self.history_idx != -1:
if self.history_idx < len(self.input_history) - 1:
self.history_idx += 1
self.chat_input = self.input_history[self.history_idx]
else:
self.history_idx = -1
self.chat_input = self.saved_input
self.cursor_pos = len(self.chat_input)
elif ch == curses.KEY_LEFT: # Arrow Left — back one char
if self.cursor_pos > 0:
self.cursor_pos -= 1
elif ch == curses.KEY_RIGHT: # Arrow Right — forward one char
if self.cursor_pos < len(self.chat_input):
self.cursor_pos += 1
elif ch in (curses.KEY_BACKSPACE, 127, 8):
if self.cursor_pos > 0:
self.chat_input = self.chat_input[:self.cursor_pos - 1] + self.chat_input[self.cursor_pos:]
self.cursor_pos -= 1
elif ch == 1: # Ctrl+A — beginning of line
self.cursor_pos = 0
elif ch == 5: # Ctrl+E — end of line
self.cursor_pos = len(self.chat_input)
elif ch == 6: # Ctrl+F — forward one char
if self.cursor_pos < len(self.chat_input):
self.cursor_pos += 1
elif ch == 2: # Ctrl+B — back one char
if self.cursor_pos > 0:
self.cursor_pos -= 1
elif ch == 23: # Ctrl+W — delete word backward
i = self.cursor_pos
while i > 0 and self.chat_input[i - 1] == ' ':
i -= 1
while i > 0 and self.chat_input[i - 1] != ' ':
i -= 1
self.chat_input = self.chat_input[:i] + self.chat_input[self.cursor_pos:]
self.cursor_pos = i
elif ch == 21: # Ctrl+U — clear to start
self.chat_input = self.chat_input[self.cursor_pos:]
self.cursor_pos = 0
elif ch == 11: # Ctrl+K — kill to end of line
self.chat_input = self.chat_input[:self.cursor_pos]
elif ch == 16: # Ctrl+P — previous history
if self.input_history:
if self.history_idx == -1:
self.saved_input = self.chat_input
self.history_idx = len(self.input_history) - 1
elif self.history_idx > 0:
self.history_idx -= 1
self.chat_input = self.input_history[self.history_idx]
self.cursor_pos = len(self.chat_input)
elif ch == 14: # Ctrl+N — next history
if self.history_idx != -1:
if self.history_idx < len(self.input_history) - 1:
self.history_idx += 1
self.chat_input = self.input_history[self.history_idx]
else:
self.history_idx = -1
self.chat_input = self.saved_input
self.cursor_pos = len(self.chat_input)
elif ch == 13 and self.multiline_mode: # Ctrl+M toggles multiline
pass # handled below
elif ch in (curses.KEY_ENTER, 10, 13):
# Detect paste: drain any buffered input rapidly
self.stdscr.nodelay(True)
paste_buf = self.chat_input
while True:
pc = self.stdscr.getch()
if pc == -1:
break
elif pc in (curses.KEY_ENTER, 10, 13):
paste_buf += "\n"
elif 32 <= pc <= 126:
paste_buf += chr(pc)
elif pc in (curses.KEY_BACKSPACE, 127, 8):
paste_buf = paste_buf[:-1] if paste_buf else paste_buf
self.stdscr.nodelay(False)
self.stdscr.timeout(100)
self.chat_input = paste_buf
self.cursor_pos = len(self.chat_input)
# In multiline mode, empty line sends; otherwise add to buffer
if self.multiline_mode:
if self.chat_input == "":
msg = "\n".join(self.multiline_buf).strip()
self.multiline_buf = []
else:
self.multiline_buf.append(self.chat_input)
self.chat_input = ""
self.cursor_pos = 0
continue
else:
msg = self.chat_input.strip()
self.chat_input = ""
self.cursor_pos = 0
if not msg:
continue
# Save to input history
if msg and (not self.input_history or self.input_history[-1] != msg):
self._save_input_history(msg)
self.history_idx = -1
self.saved_input = ""
# Reset scroll to bottom on new input
self.scroll_offset = 0
if msg.lower() in ("quit", "exit"):
break
if msg == "!" or msg.startswith("!"):
if msg == "!":
curses.endwin()
shell = os.environ.get("SHELL", "/bin/sh")
subprocess.run([shell], cwd=self.cwd)
self.stdscr.refresh()
self._add_work("📟 Returned from shell")
else:
cmd = msg[1:].strip()
# Detect interactive commands that need the full terminal
first_word = cmd.split()[0] if cmd.split() else ""
interactive = {"vi", "vim", "nvim", "nano", "pico", "emacs", "less", "more", "top", "htop", "man", "ssh", "bash", "sh", "zsh", "fish"}
if first_word in interactive:
curses.endwin()
subprocess.run(cmd, shell=True, cwd=self.cwd)
self.stdscr.refresh()
self._add_work(f"📟 Returned from: {cmd}")
else:
self._add_work(f"▶ $ {cmd}")
try:
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=60, cwd=self.cwd)
if result.stdout.strip():
self._add_work(result.stdout.strip())
if result.stderr.strip():
self._add_work(result.stderr.strip())
status = "" if result.returncode == 0 else f"✗ (exit {result.returncode})"
self._add_work(f" {status}")
except subprocess.TimeoutExpired:
self._add_work(" ✗ (timeout)")
self._draw()
continue
if msg.startswith("/open "):
filepath = msg[6:].strip()
self._load_file_context(filepath)
continue
if msg == "/memory":
if self.memory:
self._add_work("\n🧠 Memory:")
for k, v in self.memory.items():
self._add_work(f" [{k}]: {v}")
else:
self._add_work("🧠 Memory is empty")
continue
if msg == "/help":
self._add_work("\n📖 Commands:")
self._add_work(" /open <file> — load file as AI context")
self._add_work(" /memory — view AI memory scratchpad")
self._add_work(" /yolo — toggle dangerous command safety")
self._add_work(" /allow — approve a blocked command")
self._add_work(" /ml — toggle multi-line input")
self._add_work(" /clear — reset conversation and work window")
self._add_work(" /pwd — show current working directory")
self._add_work(" /cd <dir> — change working directory")
self._add_work(" /config — show current configuration")
self._add_work(" /model — pick model from server")
self._add_work(" /undo — restore last overwritten file")
self._add_work(" /diff — show diff of last overwritten file")
self._add_work(" /cat <file> — view a file in work window")
self._add_work(" /edit <file> — open file in editor ($VISUAL/$EDITOR/vi)")
self._add_work(" /grep <pat> — search project files")
self._add_work(" /history — show recent AI actions")
self._add_work(" /help — show this help")
self._add_work(" quit / exit — exit the program")
self._add_work(" Ctrl+D — stop AI mid-action")
self._add_work(" Ctrl+C — quit")
self._add_work(" Tab — toggle CODE/CHAT mode")
self._add_work(" PgUp/PgDn — scroll work window")
self._add_work(" ↑/↓ — input history")
self._add_work(" ←/→ — move cursor")
self._add_work(" Ctrl+A/E — beginning/end of line")
self._add_work(" Ctrl+F/B — forward/back one char")
self._add_work(" Ctrl+W — delete word backward")
self._add_work(" Ctrl+U — clear to start of line")
self._add_work(" Ctrl+K — kill to end of line")
self._add_work(" Ctrl+P/N — previous/next history")
continue
if msg == "/yolo":
self.yolo_mode = not self.yolo_mode
state = "ON ⚡" if self.yolo_mode else "OFF 🛡️"
self._add_work(f"🔧 YOLO mode: {state}")
continue
if msg == "/allow" and self.pending_cmd:
cmd = self.pending_cmd
self.pending_cmd = None
self._add_work(f"✅ Approved: {cmd}")
old = self.yolo_mode
self.yolo_mode = True
self._run_command(cmd)
self.yolo_mode = old
continue
if msg == "/ml":
self.multiline_mode = not self.multiline_mode
state = "ON (blank line to send)" if self.multiline_mode else "OFF"
self._add_work(f"📝 Multi-line mode: {state}")
continue
if msg == "/clear":
with self.lock:
self.work_lines.clear()
self.chat_history.clear()
self.messages = [{"role": "system", "content": SYSTEM_PROMPT}]
self.scroll_offset = 0
self._load_project_context()
self._add_work("🧹 Cleared conversation and work window")
continue
if msg == "/pwd":
self._add_work(f"📁 {self.cwd}")
continue
if msg.startswith("/cd "):
target = msg[4:].strip()
if not os.path.isabs(target):
target = os.path.join(self.cwd, target)
target = os.path.realpath(os.path.normpath(target))
if not target.startswith(self.root_dir):
self._add_work(f"❌ Cannot leave project directory: {self.root_dir}")
elif os.path.isdir(target):
self.cwd = target
self._add_work(f"📁 Changed to {self.cwd}")
else:
self._add_work(f"❌ Not a directory: {target}")
continue
if msg == "/config":
self._add_work(f"\n⚙️ Config ({CONFIG_FILE}):")
self._add_work(f" Host: {OLLAMA_URL}")
self._add_work(f" Model: {MODEL}")
self._add_work(f" User: {USER_NAME}")
self._add_work(f" Auth: {'set' if AUTH_HEADERS else 'none'}")
self._add_work(f" Ctx: {CONTEXT_SIZE}")
self._add_work(f" Back: {BACKEND}")
continue
if msg == "/model":
self._show_model_picker()
continue
if msg == "/undo":
if self._last_backup:
fname, content = self._last_backup
with open(fname, "w") as f:
f.write(content)
self._add_work(f"↩️ Restored {fname}")
self._last_backup = None
else:
self._add_work("❌ Nothing to undo")
continue
if msg.startswith("/cat "):
filepath = msg[5:].strip()
if not os.path.isabs(filepath):
filepath = os.path.join(self.cwd, filepath)
try:
with open(filepath) as f:
self._add_work(f"\n📄 {filepath}:")
self._add_work(f.read().rstrip())
except Exception as e:
self._add_work(f"{e}")
continue
if msg.startswith("/diff"):
if self._last_backup:
fname, old = self._last_backup
try:
with open(fname) as f:
new = f.read()
diff = difflib.unified_diff(old.splitlines(), new.splitlines(), fromfile="before", tofile="after", lineterm="")
self._add_work(f"\n📊 Diff: {fname}")
for line in diff:
self._add_work(line)
except Exception as e:
self._add_work(f"{e}")
else:
self._add_work("❌ No backup to diff against")
continue
if msg == "/history":
self._add_work("\n📜 Recent actions:")
for entry in self.ai_history[-20:]:
self._add_work(f" {entry}")
if not self.ai_history:
self._add_work(" (empty)")
continue
if msg.startswith("/edit "):
filepath = msg[6:].strip()
self._edit_in_editor(filepath)
continue
if msg.startswith("/grep "):
pattern = msg[6:].strip()
self._add_work(f"\n🔍 Searching for: {pattern}")
try:
result = subprocess.run(
["grep", "-rn", "--color=never", pattern, "."],
capture_output=True, text=True, timeout=10, cwd=self.cwd
)
if result.stdout.strip():
for line in result.stdout.strip().split("\n")[:30]:
self._add_work(f" {line}")
else:
self._add_work(" No matches found")
except Exception as e:
self._add_work(f"{e}")
continue
self._add_chat(f"{msg}")
if not self.busy:
t = threading.Thread(target=self._query_ollama, args=(msg,), daemon=True)
t.start()
else:
self._add_chat("⏳ AI is busy, wait or Ctrl+D to stop")
elif 32 <= ch <= 126:
self.chat_input = self.chat_input[:self.cursor_pos] + chr(ch) + self.chat_input[self.cursor_pos:]
self.cursor_pos += 1
def main(stdscr):
Vibe(stdscr).run()
if __name__ == "__main__":
curses.wrapper(main)
print(f"\n👋 Bye, {USER_NAME}! Don't forget to commit your changes to git.\n")