From f1d1deab0ff7dabef1d14cd71ab22356388b2692 Mon Sep 17 00:00:00 2001 From: Anders Holck Date: Wed, 8 Apr 2026 16:06:34 +0200 Subject: [PATCH] Pushing structure --- README.md | 1 + vedit/vedit.py | 1511 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 1512 insertions(+) create mode 100644 README.md create mode 100755 vedit/vedit.py diff --git a/README.md b/README.md new file mode 100644 index 0000000..9f67398 --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +Random tools. Can be risky ;) diff --git a/vedit/vedit.py b/vedit/vedit.py new file mode 100755 index 0000000..474b4ae --- /dev/null +++ b/vedit/vedit.py @@ -0,0 +1,1511 @@ +#!/usr/bin/env python3 +"""Vibe - A simple vibe coding tool bridging Ollama LLM and bash shell.""" + +import curses +import difflib +import json +import os +import subprocess +import threading +import time +import re +import requests +import sys +import unicodedata +from datetime import datetime +from urllib.parse import urlparse + +CONFIG_FILE = os.path.expanduser("~/.ahvibe.conf") +REQUIRED_KEYS = ["ollama_host", "user_name"] +OPTIONAL_KEYS = ["model", "auth_key", "context_size", "backend"] +PROMPTS = { + "user_name": "Your name: ", + "ollama_host": "Server host URL (e.g. http://localhost:11434): ", + "model": "Model name (e.g. qwen2.5-coder:14b-instruct): ", + "auth_key": "Auth key (optional, press Enter to skip): ", + "backend": "Backend (ollama or vllm, default ollama): ", +} + +def _load_or_create_config(): + """Load config from ~/.ahvibe.conf, prompting for any missing values.""" + conf = {} + if os.path.exists(CONFIG_FILE): + with open(CONFIG_FILE) as f: + for line in f: + line = line.strip() + if "=" in line and not line.startswith("#"): + k, v = line.split("=", 1) + conf[k.strip()] = v.strip() + # Default context_size silently + if "context_size" not in conf: + conf["context_size"] = "16384" + if "backend" not in conf: + conf["backend"] = "ollama" + missing = [k for k in REQUIRED_KEYS if not conf.get(k)] + missing += [k for k in OPTIONAL_KEYS if k not in conf and k not in ("context_size", "model", "backend")] + if missing: + try: + print(f"Config: {CONFIG_FILE}") + print() + for k in missing: + val = input(PROMPTS[k]).strip() + if k in REQUIRED_KEYS and not val: + print(f"Error: {k} is required.") + sys.exit(1) + conf[k] = val + except (KeyboardInterrupt, EOFError): + print("\nBye.") + sys.exit(0) + with open(CONFIG_FILE, "w") as f: + for k in REQUIRED_KEYS + OPTIONAL_KEYS: + if k in conf: + f.write(f"{k}={conf[k]}\n") + print(f"Saved to {CONFIG_FILE}\n") + backend = conf.get("backend", "ollama").lower() + host = conf["ollama_host"].rstrip("/") + headers = {} + if conf.get("auth_key"): + headers["Authorization"] = f"Bearer {conf['auth_key']}" + # Validate connection + try: + if backend == "vllm": + r = requests.get(host + "/v1/models", headers=headers, timeout=10) + else: + r = requests.get(host + "/api/tags", headers=headers, timeout=10) + if r.status_code == 401: + print("Error: Authentication failed. Check your auth key.") + sys.exit(1) + r.raise_for_status() + if backend == "vllm": + models = [m["id"] for m in r.json().get("data", [])] + else: + models = [m["name"] for m in r.json().get("models", [])] + if conf.get("model") and conf["model"] not in models: + print(f"Warning: Model '{conf['model']}' not found on server.") + print(f"Available: {', '.join(models) if models else 'none'}") + conf["model"] = "" + except requests.ConnectionError: + print(f"Error: Cannot connect to {host}") + sys.exit(1) + except SystemExit: + raise + except Exception as e: + print(f"Error connecting to server: {e}") + sys.exit(1) + if backend == "vllm": + chat_url = host + "/v1/chat/completions" + else: + chat_url = host + "/api/chat" + return chat_url, conf.get("model", ""), conf["user_name"], headers, int(conf["context_size"]), backend + +OLLAMA_URL, MODEL, USER_NAME, AUTH_HEADERS, CONTEXT_SIZE, BACKEND = _load_or_create_config() +_h = urlparse(OLLAMA_URL).hostname +HOST_SHORT = _h if _h[0].isdigit() else _h.split(".")[0] +WORKLOG_FILE = "worklog.md" +DESCRIPTION_FILE = "description.md" +AI_HISTORY_FILE = ".ai-history" +AI_HISTORY_MAX = 50 +INPUT_HISTORY_FILE = ".input-history" +INPUT_HISTORY_MAX = 200 +MAX_CONTEXT_MESSAGES = 40 # keep last N messages before summarizing +DANGEROUS_PATTERNS = re.compile( + r"\brm\s+(-[a-z]*f|-[a-z]*r|--force|--recursive)|\bdd\b.*of=/|" + r"\bmkfs\b|\bformat\b|\b>\s*/dev/|:>\s*/|chmod\s+-R\s+777|" + r"\bsudo\b|\breboot\b|\bshutdown\b|\bsystemctl\b", + re.IGNORECASE, +) + +SPINNER_FRAMES = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"] + +SYSTEM_PROMPT = f"""You are a vibe coding assistant. The user's name is {USER_NAME}. Use their name occasionally in conversation but NEVER embed it in code, filenames, or output unless explicitly asked. + +You help {USER_NAME} by writing code and running shell commands. + +TOOLS — use fenced code blocks with these tags: + +bash — run a shell command, example: +```bash +ls -la +``` + +write: — create/overwrite a file, example: +```write:hello.c +#include +int main() {{ printf("Hello\\n"); return 0; }} +``` + +edit: — search/replace in a file, example: +```edit:hello.c +<<>> +``` + +memory: — store a note, forget: — clear a note. + +read: — read a file's contents, example: +```read:main.c +``` + +IMPORTANT: Replace the examples above with ACTUAL commands and code for the task. Never output placeholder text like "command here" or "file content here". +Only use tools (bash, write, edit, memory) when the user asks you to create, modify, or run something. For general questions, just answer in plain text — do NOT create files or memory entries for conversational replies. + +BEHAVIOR: +- Do ONLY what is asked. Nothing more. +- Do NOT compile, run, test, or launch emulators (qemu, etc.) unless explicitly asked. +- Do NOT add extra steps beyond the request (no ls, cd, descriptions, verification). +- If a description.md exists, follow it — but only do what it says, not more. +- For small changes, prefer ```edit:filename``` over rewriting the whole file. +- If a command fails and the user asked you to run it, read the error output carefully, fix the code, and retry until it works. +- If a program runs but produces wrong output, analyze the output and fix the code. +- cd commands persist between bash blocks. File writes also respect the current directory. +- CRITICAL: All commands, file writes, edits, and memory ops MUST be inside ```fenced code blocks```. Never output them as plain text. + +OUTPUT: +- Be EXTREMELY concise. No filler, no preamble. +- Do NOT repeat code you just wrote — the user sees it in the work window. +- ONE LINE summary after completing a task. Nothing more. +- If there's nothing to add, say nothing.""" + + +def _dw(s): + """Display width of a string, accounting for wide chars (emoji etc).""" + w = 0 + for c in s: + w += 2 if unicodedata.east_asian_width(c) in ('W', 'F') else 1 + return w + +def _wrap_dw(s, maxw): + """Word-wrap string by display width, returns list of lines.""" + if _dw(s) <= maxw: + return [s] + lines = [] + cur = "" + cw = 0 + for word in re.split(r'(\s+)', s): + ww = _dw(word) + if cw + ww <= maxw: + cur += word + cw += ww + elif ww > maxw: + # Word itself is wider than maxw — character-wrap it + for c in word: + cw2 = 2 if unicodedata.east_asian_width(c) in ('W', 'F') else 1 + if cw + cw2 > maxw: + lines.append(cur) + cur = c + cw = cw2 + else: + cur += c + cw += cw2 + else: + if cur: + lines.append(cur) + cur = word.lstrip() if not lines else word + cw = _dw(cur) + if cur: + lines.append(cur) + return lines if lines else [""] + +class Vibe: + def __init__(self, stdscr): + self.stdscr = stdscr + self.messages = [{"role": "system", "content": SYSTEM_PROMPT}] + self.work_lines = [] + self.chat_input = "" + self.chat_history = [] + self.busy = False + self.stop_flag = False + self.spinner_idx = 0 + self.memory = {} + self.ai_history = [] + self.recent_files = [] + self.lock = threading.Lock() + self.scroll_offset = 0 # 0 = bottom (latest), >0 = scrolled up + self.yolo_mode = False # skip dangerous command confirmation + self.pending_cmd = None # command awaiting confirmation + self.multiline_mode = False + self.multiline_buf = [] + self.code_mode = True # True = code mode, False = chat mode + self.chat_messages = [] # separate message history for chat mode + self.cursor_pos = 0 # cursor position within chat_input + self.code_input_history = self._load_input_history("code") + self.chat_input_history = self._load_input_history("chat") + self.history_idx = -1 # -1 = not browsing history + self.saved_input = "" # saved current input when browsing history + self.cwd = os.getcwd() # persistent working directory for commands + self.root_dir = self.cwd # sandbox — never allow navigating above this + self._last_backup = None # (filename, content) for /undo + + curses.curs_set(1) + curses.start_color() + curses.use_default_colors() + curses.init_pair(1, curses.COLOR_GREEN, -1) + curses.init_pair(2, curses.COLOR_CYAN, -1) + curses.init_pair(3, curses.COLOR_YELLOW, -1) + curses.init_pair(4, curses.COLOR_RED, -1) + curses.init_pair(5, curses.COLOR_BLACK, curses.COLOR_YELLOW) + curses.init_pair(6, curses.COLOR_WHITE, curses.COLOR_BLUE) + curses.init_pair(7, curses.COLOR_MAGENTA, -1) # for warnings + self.stdscr.nodelay(False) + self.stdscr.timeout(100) + try: + curses.mousemask(curses.ALL_MOUSE_EVENTS) + except Exception: + pass + self.ollama_status = "" # GPU/CPU status string + self._start_status_poller() + + self._load_project_context() + self._load_worklog() + + def _start_status_poller(self): + def poll(): + host = OLLAMA_URL.rsplit("/api/chat", 1)[0] if BACKEND != "vllm" else OLLAMA_URL.rsplit("/v1/chat/completions", 1)[0] + while True: + try: + if BACKEND == "vllm": + r = requests.get(host + "/v1/models", headers=AUTH_HEADERS, timeout=5) + if r.status_code == 200: + self.ollama_status = "online" + else: + self.ollama_status = "error" + else: + r = requests.get(host + "/api/ps", headers=AUTH_HEADERS, timeout=5) + models = r.json().get("models", []) + if models: + parts = [] + for m in models: + total = m.get("size", 1) + vram = m.get("size_vram", 0) + gpu_pct = int(vram * 100 / total) if total else 0 + cpu_pct = 100 - gpu_pct + parts.append(f"GPU:{gpu_pct}% CPU:{cpu_pct}%") + self.ollama_status = " | ".join(parts) + else: + self.ollama_status = "idle" + except Exception: + self.ollama_status = "offline" + time.sleep(15) + t = threading.Thread(target=poll, daemon=True) + t.start() + + def _load_file_context(self, filepath): + """Load a file as project context for the AI.""" + if not os.path.isabs(filepath): + filepath = os.path.join(self.cwd, filepath) + try: + with open(filepath) as f: + content = f.read().strip() + self.messages.append( + {"role": "system", "content": f"Contents of {filepath}:\n{content}"} + ) + self._add_work(f"📋 Loaded '{filepath}' as context") + except Exception as e: + self._add_work(f"❌ Could not read '{filepath}': {e}") + + def _load_project_context(self): + """Auto-scan project tree and load description file on startup.""" + # Ensure .gitignore excludes session files + gi_entries = {".ai-history", ".input-history"} + gi_path = os.path.join(self.root_dir, ".gitignore") + existing = set() + if os.path.exists(gi_path): + with open(gi_path) as f: + existing = set(l.strip() for l in f) + missing = gi_entries - existing + if missing: + with open(gi_path, "a") as f: + for entry in sorted(missing): + f.write(f"{entry}\n") + # Load description file if present + if os.path.exists(DESCRIPTION_FILE): + try: + with open(DESCRIPTION_FILE) as f: + desc = f.read().strip() + if desc: + self.messages.append( + {"role": "system", "content": f"Project description:\n{desc}"} + ) + self._add_work(f"📋 Loaded project description") + except Exception: + pass + else: + self._add_work(f"💡 Tip: Create '{DESCRIPTION_FILE}' to describe your project for the AI") + # Build a compact file tree (max 100 entries, skip hidden/build dirs) + skip = {'.git', '__pycache__', 'node_modules', '.venv', 'venv', 'build', 'dist', '.ai-history'} + tree = [] + for root, dirs, files in os.walk("."): + dirs[:] = [d for d in dirs if d not in skip and not d.startswith('.')] + depth = root.count(os.sep) + if depth > 3: + dirs.clear() + continue + for f in files: + path = os.path.join(root, f) + if len(tree) >= 100: + break + tree.append(path[2:] if path.startswith("./") else path) + if len(tree) >= 100: + break + if tree: + self.messages.append( + {"role": "system", "content": "Project files:\n" + "\n".join(tree)} + ) + self._add_work(f"📂 Scanned project: {len(tree)} files") + + def _trim_context(self): + """Keep message list manageable by summarizing old messages.""" + # Count non-system messages + non_sys = [(i, m) for i, m in enumerate(self.messages) if m["role"] != "system"] + if len(non_sys) <= MAX_CONTEXT_MESSAGES: + return + # Summarize the oldest half of non-system messages + cut = len(non_sys) // 2 + to_remove = non_sys[:cut] + summary_parts = [] + for _, m in to_remove: + role = m["role"] + text = m["content"][:150] + summary_parts.append(f"{role}: {text}") + summary = "Conversation summary (older messages):\n" + "\n".join(summary_parts) + # Remove old messages and insert summary after system messages + indices_to_remove = set(i for i, _ in to_remove) + self.messages = [m for i, m in enumerate(self.messages) if i not in indices_to_remove] + # Find insertion point (after last system message) + insert_at = 0 + for i, m in enumerate(self.messages): + if m["role"] == "system": + insert_at = i + 1 + self.messages.insert(insert_at, {"role": "system", "content": summary}) + + def _is_dangerous(self, cmd): + """Check if a command matches dangerous patterns.""" + return bool(DANGEROUS_PATTERNS.search(cmd)) + + def _edit_file(self, filename, content): + """Apply search/replace edits to an existing file.""" + if not os.path.isabs(filename): + filename = os.path.join(self.cwd, filename) + if not os.path.realpath(filename).startswith(self.root_dir): + self._add_work(f"❌ Edit blocked: {filename} is outside project directory") + return f"Error: {filename} is outside project directory" + if not os.path.exists(filename): + self._add_work(f"❌ Edit failed: {filename} not found") + return f"Error: {filename} not found" + with open(filename) as f: + original = f.read() + modified = original + # Parse <<>> or <<< ... === ... >>> blocks + edits = re.findall(r"<<<\s*(?:SEARCH\s*)?\n?(.*?)\n\s*===\s*\n(.*?)\n\s*>>>", content, re.DOTALL) + if not edits: + # Fallback: strip any stray markers and treat as full file rewrite + clean = re.sub(r"^<<<\s*(?:SEARCH)?\s*\n?|^\s*===\s*\n|^\s*>>>\s*$", "", content, flags=re.MULTILINE).strip() + self._write_file(filename, clean + "\n") + return f"Rewrote {filename} (no edit markers, used as full rewrite)" + applied = 0 + for search, replace in edits: + search = search.rstrip() + replace = replace.rstrip() + if search in modified: + modified = modified.replace(search, replace, 1) + applied += 1 + else: + self._add_work(f"⚠ Search text not found in {filename}") + if applied > 0: + with open(filename, "w") as f: + f.write(modified) + self._add_work(f"✏️ Edited {filename} ({applied} change{'s' if applied != 1 else ''})") + self._log_history(f"EDITED: {filename}") + if filename in self.recent_files: + self.recent_files.remove(filename) + self.recent_files.insert(0, filename) + self.recent_files = self.recent_files[:10] + return f"Edited {filename}: {applied} change(s) applied" + + @property + def input_history(self): + return self.code_input_history if self.code_mode else self.chat_input_history + + def _load_input_history(self, prefix): + hist = [] + if os.path.exists(INPUT_HISTORY_FILE): + with open(INPUT_HISTORY_FILE) as f: + for l in f: + l = l.rstrip("\n") + if l.startswith(f"{prefix}:"): + hist.append(l[len(prefix) + 1:]) + return hist[-INPUT_HISTORY_MAX:] + + def _save_input_history(self, entry): + hist = self.input_history + hist.append(entry) + while len(hist) > INPUT_HISTORY_MAX: + hist.pop(0) + # Rewrite file with all entries from both modes + prefix = "code" if self.code_mode else "chat" + other_prefix = "chat" if self.code_mode else "code" + other = self.chat_input_history if self.code_mode else self.code_input_history + try: + lines = [f"{other_prefix}:{e}" for e in other] + lines += [f"{prefix}:{e}" for e in hist] + with open(INPUT_HISTORY_FILE, "w") as f: + f.write("\n".join(lines) + "\n") + except Exception: + pass + + def _load_worklog(self): + if os.path.exists(WORKLOG_FILE): + with open(WORKLOG_FILE) as f: + lines = f.readlines() + # Only load last 100 lines to avoid filling context + log = "".join(lines[-100:]).strip() + if log: + self.messages.append( + {"role": "system", "content": f"Previous worklog (recent):\n{log}"} + ) + self._add_work(f"📖 Resumed from '{WORKLOG_FILE}'") + + def _write_worklog(self, entry): + with open(WORKLOG_FILE, "a") as f: + f.write(f"\n## {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n{entry}\n") + + def _log_history(self, action): + """Log an action to .ai-history, keeping last N entries.""" + ts = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + self.ai_history.append(f"[{ts}] {action}") + self.ai_history = self.ai_history[-AI_HISTORY_MAX:] + try: + with open(AI_HISTORY_FILE, "w") as f: + f.write("\n".join(self.ai_history) + "\n") + except Exception: + pass + + def _add_work(self, text): + with self.lock: + try: + w = self.stdscr.getmaxyx()[1] + except Exception: + w = 80 + for line in text.split("\n"): + for wl in _wrap_dw(line, w): + self.work_lines.append(wl) + + def _add_chat(self, text): + with self.lock: + for line in text.split("\n"): + self.chat_history.append(line) + + def _run_command(self, cmd): + # Check for dangerous commands + if not self.yolo_mode and self._is_dangerous(cmd): + self._add_work(f"⚠️ Dangerous command blocked: {cmd}") + self._add_work(" Use /yolo to enable, or /allow to run this one") + self.pending_cmd = cmd + return "Command blocked as potentially dangerous. User must approve.", 1 + self.pending_cmd = None + self._add_work(f"▶ [{self.cwd}] $ {cmd}") + self._log_history(f"RAN: {cmd}") + # Append a pwd at the end so we can track cd changes + wrapped = cmd + '\necho "___CWD___:$(pwd)"' + try: + result = subprocess.run( + wrapped, shell=True, capture_output=True, text=True, timeout=60, + cwd=self.cwd + ) + stdout = result.stdout.strip() + stderr = result.stderr.strip() + # Extract new cwd from output + out_lines = stdout.split("\n") + clean_lines = [] + for line in out_lines: + if line.startswith("___CWD___:"): + new_cwd = line[10:] + if os.path.isdir(new_cwd) and os.path.realpath(new_cwd).startswith(self.root_dir): + with self.lock: + self.cwd = new_cwd + else: + clean_lines.append(line) + stdout = "\n".join(clean_lines).strip() + if stdout: + self._add_work(stdout) + if stderr and stderr != stdout: + self._add_work(stderr) + status = "✓" if result.returncode == 0 else f"✗ (exit {result.returncode})" + self._add_work(f" {status}") + parts = [f"exit_code: {result.returncode}"] + if stdout: + parts.append(f"stdout: {stdout}") + if stderr: + parts.append(f"stderr: {stderr}") + return "\n".join(parts), result.returncode + except subprocess.TimeoutExpired: + self._add_work(" ✗ (timeout)") + return "Command timed out after 60s", 1 + + def _write_file(self, filename, content): + if not os.path.isabs(filename): + filename = os.path.join(self.cwd, filename) + if not os.path.realpath(filename).startswith(self.root_dir): + self._add_work(f"❌ Write blocked: {filename} is outside project directory") + return False + # Backup existing file before overwriting + if os.path.exists(filename): + try: + with open(filename) as f: + self._last_backup = (filename, f.read()) + except Exception: + pass + os.makedirs(os.path.dirname(filename), exist_ok=True) if os.path.dirname(filename) else None + with open(filename, "w") as f: + f.write(content) + self._add_work(f"📝 Wrote {filename}") + self._log_history(f"WROTE: {filename}") + # Track recent files + if filename in self.recent_files: + self.recent_files.remove(filename) + self.recent_files.insert(0, filename) + self.recent_files = self.recent_files[:10] + return True + + def _process_response(self, text): + """Extract and execute bash commands and file writes from AI response.""" + results = [] + # Find fenced code blocks (``` or ~~~) + blocks = re.findall(r"```\s*(\w[\w:./\-]*)\n(.*?)```", text, re.DOTALL) + blocks += re.findall(r"~~~\s*(\w[\w:./\-]*)\n(.*?)~~~", text, re.DOTALL) + # Also catch unfenced edit blocks (AI sometimes forgets fences) + unfenced = re.findall(r"(?:^|\n)\s*(edit:[\w./\-]+)\s*\n(<<<\s*SEARCH\s*\n.*?>>>)", text, re.DOTALL) + seen = set() + for lang, content in blocks: + seen.add(content.strip()) + for lang, content in unfenced: + if content.strip() not in seen: + blocks.append((lang, content)) + for lang, content in blocks: + lang_lower = lang.lower() + if lang_lower == "bash": + if self.stop_flag: + break + # Detect misplaced edit/write blocks inside bash + cmd = content.strip() + m = re.match(r"^(edit|write):(\S+)\s*\n", cmd, re.IGNORECASE) + if m: + action, fname = m.group(1).lower(), m.group(2) + body = cmd[m.end():] + if action == "edit": + result = self._edit_file(fname, body) + results.append(result) + else: + if self._write_file(fname, body): + results.append(f"Wrote file: {fname}") + else: + results.append(f"Write blocked: {fname}") + elif re.match(r"^(Command\s|exit_code:|stdout:|stderr:|Results:)", cmd): + # AI echoed back feedback format — skip it + pass + elif re.search(r"(^import |^from |^def |^class |^#include\s|= init_|\.connect\()", cmd, re.MULTILINE): + # AI put source code in a bash block — skip it + self._add_work("⚠ Skipped: source code detected in bash block") + pass + else: + output, rc = self._run_command(cmd) + results.append(f"Command `{cmd}`:\n{output}") + elif lang_lower.startswith("write:"): + filename = lang[6:] + if self._write_file(filename, content): + results.append(f"Wrote file: {filename}") + else: + results.append(f"Write blocked: {filename}") + elif lang_lower.startswith("edit:"): + filename = lang[5:] + result = self._edit_file(filename, content) + results.append(result) + elif lang_lower.startswith("memory:"): + key = lang[7:] + self.memory[key] = content.strip() + self._add_work(f"🧠 Memorized '{key}'") + self._log_history(f"MEMORIZED: {key}") + results.append(f"Stored memory: {key}") + elif lang_lower.startswith("forget:"): + key = lang[7:].strip() or content.strip() + if self.memory.pop(key, None) is not None: + self._add_work(f"🧠 Forgot '{key}'") + self._log_history(f"FORGOT: {key}") + results.append(f"Cleared memory: {key}") + elif lang_lower.startswith("read:"): + filename = lang[5:] + if not os.path.isabs(filename): + filename = os.path.join(self.cwd, filename) + try: + with open(filename) as f: + file_content = f.read() + self._add_work(f"📄 Read {filename}") + results.append(f"Contents of {filename}:\n{file_content}") + except Exception as e: + results.append(f"Error reading {filename}: {e}") + else: + # Unknown language tag — check if first line is write:/edit: + first_line = content.strip().split("\n", 1)[0] + m = re.match(r"^(write|edit):(\S+)", first_line, re.IGNORECASE) + if m: + action, fname = m.group(1).lower(), m.group(2) + body = content.strip().split("\n", 1)[1] if "\n" in content.strip() else "" + if action == "write": + if self._write_file(fname, body): + results.append(f"Wrote file: {fname}") + else: + results.append(self._edit_file(fname, body)) + return results + + def _memory_context(self): + """Build a system message with current memory contents.""" + if not self.memory: + return None + lines = [f"[{k}]: {v}" for k, v in self.memory.items()] + return {"role": "system", "content": "Current memory:\n" + "\n".join(lines)} + + def _build_messages(self): + """Build message list with memory injected, trimming old context.""" + self._trim_context() + msgs = list(self.messages) + mem = self._memory_context() + if mem: + msgs.insert(1, mem) # after system prompt + return msgs + + def _stream_response(self): + """Stream a response from the LLM backend, return full text.""" + self._add_work("🔌 Connecting to server...") + resp = None + if BACKEND == "vllm": + payload = {"model": MODEL, "messages": self._build_messages(), "stream": True, "max_tokens": CONTEXT_SIZE} + else: + payload = {"model": MODEL, "messages": self._build_messages(), "stream": True, "options": {"num_ctx": CONTEXT_SIZE}} + for attempt in range(3): + try: + resp = requests.post( + OLLAMA_URL, + json=payload, + headers=AUTH_HEADERS, + stream=True, + timeout=120, + ) + break + except (requests.ConnectionError, requests.Timeout) as e: + if attempt < 2: + self._add_work(f"⚠ Connection failed (attempt {attempt + 1}/3), retrying...") + time.sleep(2) + else: + self._add_work(f"❌ Failed to connect after 3 attempts: {e}") + return "" + if resp.status_code != 200: + err = resp.text[:200] + self._add_work(f"❌ Server error ({resp.status_code}): {err}") + return "" + # Remove the "Connecting" line now that we're streaming + with self.lock: + self.work_lines = [l for l in self.work_lines if l != "🔌 Connecting to server..."] + full = "" + for line in resp.iter_lines(): + if self.stop_flag: + self._add_work("\n⛔ Stopped by user") + break + if line: + if BACKEND == "vllm": + line_str = line.decode("utf-8") if isinstance(line, bytes) else line + if not line_str.startswith("data: "): + continue + line_str = line_str[6:] + if line_str.strip() == "[DONE]": + break + data = json.loads(line_str) + token = (data.get("choices", [{}])[0].get("delta", {}).get("content") or "") + else: + data = json.loads(line) + token = data.get("message", {}).get("content", "") + if token: + full += token + with self.lock: + try: + w = self.stdscr.getmaxyx()[1] + except Exception: + w = 80 + parts = token.split("\n") + self.work_lines[-1] += parts[0] + # Wrap current line if it exceeds display width + if _dw(self.work_lines[-1]) > w: + wrapped = _wrap_dw(self.work_lines[-1], w) + self.work_lines[-1] = wrapped[0] + self.work_lines.extend(wrapped[1:]) + for p in parts[1:]: + for wl in _wrap_dw(p, w): + self.work_lines.append(wl) + # Clean up code fence markers from display + with self.lock: + self.work_lines = [ + l for l in self.work_lines + if not re.match(r"^`{3}(?:bash|write:\S+|edit:\S+|memory:\S+|forget:\S+|\w+)?$", l.strip()) + ] + return full + + def _query_ollama(self, user_msg): + self.busy = True + self.stop_flag = False + + if not self.code_mode: + # Chat mode — use separate message list, don't touch code history + self.chat_messages.append({"role": "user", "content": user_msg}) + self._add_work(f"\n🧑 {USER_NAME}: {user_msg}") + self._log_history(f"USER [chat]: {user_msg}") + try: + self._add_work("📠 AI: ") + # Build chat messages: system prompt + chat history + chat_msgs = [{"role": "system", "content": f"You are chatting with {USER_NAME}. The user's name is {USER_NAME} — use it. Respond in plain text only. No code blocks, no file writes, no commands. Just discuss and reason naturally."}] + chat_msgs += self.chat_messages[-MAX_CONTEXT_MESSAGES:] + # Temporarily swap messages for streaming + orig = self.messages + self.messages = chat_msgs + full_response = self._stream_response() + self.messages = orig + if full_response: + self.chat_messages.append({"role": "assistant", "content": full_response}) + self._log_history(f"AI [chat]: {full_response[:100]}") + self._write_worklog(f"User [chat]: {user_msg}\nAI: {full_response[:200]}") + except Exception as e: + self._add_work(f"❌ Error: {e}") + finally: + self.busy = False + return + + # Code mode — use main message list + self.messages.append({"role": "user", "content": user_msg}) + # Include chat summary if there's been chat discussion + if self.chat_messages: + summary = "\n".join(f"{m['role']}: {m['content'][:150]}" for m in self.chat_messages[-10:]) + self.messages.append({"role": "system", "content": f"Recent chat discussion for context:\n{summary}"}) + self._add_work(f"\n🧑 {USER_NAME}: {user_msg}") + self._log_history(f"USER: {user_msg}") + + try: + self._add_work("📠 AI: ") + full_response = self._stream_response() + if not full_response: + return + self.messages.append({"role": "assistant", "content": full_response}) + self._log_history(f"AI: {full_response[:100]}") + + # Autonomous loop: keep processing actions and feeding back results + max_rounds = 5 + last_feedback = None + for _ in range(max_rounds): + if self.stop_flag: + break + action_results = self._process_response(full_response) + if not action_results: + break + # Stop looping if a command was blocked — wait for user to /allow + if any("blocked" in r.lower() for r in action_results): + self._write_worklog(f"User: {user_msg}\nAI acted.\nResults:\n" + "\n".join(action_results)) + break + feedback = "Results:\n" + "\n".join(action_results) + # Stop if same results as last round (no progress) + if feedback == last_feedback: + self._add_work("⚠ AI is stuck (same results), stopping.") + self._write_worklog(f"User: {user_msg}\nAI acted.\n{feedback}") + break + last_feedback = feedback + self._write_worklog(f"User: {user_msg}\nAI acted.\n{feedback}") + self.messages.append({"role": "user", "content": feedback}) + self._add_work("\n📠 AI (continuing): ") + full_response = self._stream_response() + if not full_response: + break + self.messages.append({"role": "assistant", "content": full_response}) + self._log_history(f"AI: {full_response[:100]}") + + if not any(self._process_response(full_response) for _ in [0]): + self._write_worklog(f"User: {user_msg}\nAI: {full_response[:200]}") + + except Exception as e: + self._add_work(f"❌ Error: {e}") + finally: + self.busy = False + + def _save_config_model(self, new_model): + """Update model in config file.""" + conf = {} + if os.path.exists(CONFIG_FILE): + with open(CONFIG_FILE) as f: + for line in f: + line = line.strip() + if "=" in line and not line.startswith("#"): + k, v = line.split("=", 1) + conf[k.strip()] = v.strip() + conf["model"] = new_model + with open(CONFIG_FILE, "w") as f: + for k in REQUIRED_KEYS + OPTIONAL_KEYS: + if k in conf: + f.write(f"{k}={conf[k]}\n") + + def _show_model_picker(self): + """Show a popup to pick a model from the server.""" + global MODEL + self._add_work("⏳ Fetching models...") + self._draw() + host = OLLAMA_URL.rsplit("/api/chat", 1)[0] if BACKEND != "vllm" else OLLAMA_URL.rsplit("/v1/chat/completions", 1)[0] + try: + if BACKEND == "vllm": + r = requests.get(host + "/v1/models", headers=AUTH_HEADERS, timeout=10) + r.raise_for_status() + models = sorted(m["id"] for m in r.json().get("data", [])) + else: + r = requests.get(host + "/api/tags", headers=AUTH_HEADERS, timeout=10) + r.raise_for_status() + models = sorted(m["name"] for m in r.json().get("models", [])) + except Exception as e: + self._add_work(f"❌ Failed to fetch models: {e}") + return + if not models: + self._add_work("❌ No models available on server") + return + sel = models.index(MODEL) if MODEL in models else 0 + self.stdscr.nodelay(False) + self.stdscr.timeout(-1) + while True: + h, w = self.stdscr.getmaxyx() + # Popup dimensions + box_w = min(60, w - 4) + box_h = min(len(models) + 4, h - 4) + visible = box_h - 4 + x = (w - box_w) // 2 + y = (h - box_h) // 2 + # Scroll window + scroll = max(0, sel - visible + 1) + # Draw popup + for row in range(box_h): + self._safe_addstr(y + row, x, " " * box_w, w, curses.color_pair(6)) + title = " SELECT MODEL " + tpad = box_w - len(title) + self._safe_addstr(y, x, "─" * (tpad // 2) + title + "─" * ((tpad + 1) // 2), w, curses.color_pair(6)) + self._safe_addstr(y + 1, x, f" Current: {MODEL}"[:box_w], w, curses.color_pair(6)) + self._safe_addstr(y + box_h - 1, x, " ↑↓ select Enter confirm Esc cancel "[:box_w], w, curses.color_pair(3)) + for i in range(visible): + idx = scroll + i + if idx >= len(models): + break + name = models[idx][:box_w - 4] + row = y + 2 + i + if idx == sel: + self._safe_addstr(row, x, f" ▸ {name:<{box_w - 3}}", w, curses.color_pair(5) | curses.A_BOLD) + else: + self._safe_addstr(row, x, f" {name:<{box_w - 3}}", w, curses.color_pair(6)) + self.stdscr.refresh() + ch = self.stdscr.getch() + if ch == 27: # Esc + break + elif ch == curses.KEY_UP and sel > 0: + sel -= 1 + elif ch == curses.KEY_DOWN and sel < len(models) - 1: + sel += 1 + elif ch in (curses.KEY_ENTER, 10, 13): + MODEL = models[sel] + self._save_config_model(MODEL) + self._add_work(f"✅ Model changed to {MODEL}") + break + self.stdscr.timeout(100) + + def _get_editor(self): + """Get user's preferred editor.""" + return os.environ.get("VISUAL") or os.environ.get("EDITOR") or "vi" + + def _edit_in_editor(self, filepath): + """Open a file in the user's editor, suspending curses.""" + if not os.path.isabs(filepath): + filepath = os.path.join(self.cwd, filepath) + if not os.path.exists(filepath): + self._add_work(f"❌ File not found: {filepath}") + return + editor = self._get_editor() + curses.endwin() + try: + subprocess.run([editor, filepath]) + except Exception as e: + pass + self.stdscr.refresh() + self._add_work(f"📝 Returned from {editor}: {filepath}") + + def _safe_addstr(self, row, col, text, maxw, attr=0): + """Write text to screen, safely clamped to bounds.""" + if row < 0 or col >= maxw: + return + text = text[:maxw - col] + if not text: + return + try: + self.stdscr.addnstr(row, col, text, maxw - col, attr) + except curses.error: + pass + + def _draw(self): + h, w = self.stdscr.getmaxyx() + if h < 4 or w < 10: + return + work_h = max(2, int(h * 0.9)) + chat_h = h - work_h + + self.stdscr.erase() + + # --- Top: work window --- + title = " WORK " + bar_attr = curses.color_pair(2) + if self.busy: + frame = SPINNER_FRAMES[self.spinner_idx % len(SPINNER_FRAMES)] + self.spinner_idx += 1 + title = f" {frame} WORKING {frame} " + if int(time.time() * 3) % 2: + title_attr = curses.color_pair(5) | curses.A_BOLD + else: + title_attr = curses.color_pair(4) | curses.A_BOLD + else: + title_attr = bar_attr + pad = w - len(title) + left_pad = pad // 2 + right_pad = (pad + 1) // 2 + self._safe_addstr(0, 0, "─" * left_pad, w, bar_attr) + self._safe_addstr(0, left_pad, title, w, title_attr) + self._safe_addstr(0, left_pad + len(title), "─" * right_pad, w, bar_attr) + # Holck Editor label on top-left + he_label = " Holck Editor " + if len(he_label) < left_pad: + self._safe_addstr(0, 0, he_label, w, curses.color_pair(3)) + # Ollama status in top-right + if self.ollama_status: + status_str = f" {self.ollama_status} " + sx = w - len(status_str) + if sx > left_pad + len(title): + self._safe_addstr(0, sx, status_str, w, curses.color_pair(3)) + + with self.lock: + total = len(self.work_lines) + view_h = work_h - 1 + # Clamp scroll offset + max_scroll = max(0, total - view_h) + self.scroll_offset = max(0, min(self.scroll_offset, max_scroll)) + if self.scroll_offset > 0: + start = total - view_h - self.scroll_offset + start = max(0, start) + visible = self.work_lines[start:start + view_h] + else: + visible = self.work_lines[-view_h:] + # Wrap long lines at display time + wrapped_visible = [] + for line in visible: + for wl in _wrap_dw(line, w): + wrapped_visible.append(wl) + visible = wrapped_visible[-view_h:] if len(wrapped_visible) > view_h else wrapped_visible + # Determine initial color by scanning backwards from visible start + cur_attr = 0 + with self.lock: + # Find where visible starts in work_lines + total_wl = len(self.work_lines) + if self.scroll_offset > 0: + vis_start = max(0, total_wl - (work_h - 1) - self.scroll_offset) + else: + vis_start = max(0, total_wl - (work_h - 1)) + for j in range(vis_start - 1, -1, -1): + pl = self.work_lines[j] + if pl.startswith("🧑 "): + cur_attr = curses.color_pair(1); break + elif pl.startswith("📠 AI"): + cur_attr = curses.color_pair(2); break + elif pl.startswith("▶ "): + cur_attr = curses.color_pair(3); break + elif pl.startswith(" ✗") or pl.startswith("❌"): + cur_attr = curses.color_pair(4); break + elif pl.startswith("📝 ") or pl.startswith("✏️"): + cur_attr = curses.color_pair(7); break + for i, line in enumerate(visible): + row = 1 + i + if row >= work_h: + break + # Colorize by content type — sticky for continuation lines + if line.startswith("🧑 ") or line.startswith(f"🧑 {USER_NAME}"): + cur_attr = curses.color_pair(1) # green — user + elif line.startswith("📠 AI"): + cur_attr = curses.color_pair(2) # cyan — AI + elif line.startswith("▶ "): + cur_attr = curses.color_pair(3) # yellow — command + elif line.startswith(" ✓"): + cur_attr = curses.color_pair(1) # green — success + elif line.startswith(" ✗") or line.startswith("❌"): + cur_attr = curses.color_pair(4) # red — error + elif line.startswith("⚠"): + cur_attr = curses.color_pair(3) # yellow — warning + elif line.startswith("📝 ") or line.startswith("✏️"): + cur_attr = curses.color_pair(7) # magenta — file write/edit + elif line.startswith("🔌 ") or line.startswith("📋 ") or line.startswith("📂 ") or line.startswith("📖 ") or line.startswith("💡"): + cur_attr = curses.color_pair(2) # cyan — info + self._safe_addstr(row, 0, line[:w], w, cur_attr) + + # --- Recent files panel (top-right) --- + if self.scroll_offset > 0: + indicator = f" ↑ {self.scroll_offset} lines above " + self._safe_addstr(1, 0, indicator, w, curses.color_pair(3)) + + if self.recent_files: + panel_w = min(30, w // 3) + panel_h = min(len(self.recent_files) + 2, work_h - 1) + px = w - panel_w + py = 1 + # Draw panel + header = " FILES " + hpad = panel_w - len(header) + hbar = ("─" * (hpad // 2)) + header + ("─" * ((hpad + 1) // 2)) + self._safe_addstr(py, px, hbar, w, curses.color_pair(6)) + for fi, fname in enumerate(self.recent_files): + row = py + 1 + fi + if row >= py + panel_h - 1: + break + label = os.path.relpath(fname, self.cwd) if os.path.isabs(fname) else fname + label = label[:panel_w - 2] + self._safe_addstr(row, px, f" {label:<{panel_w - 1}}", w, curses.color_pair(6)) + bot_row = min(py + panel_h - 1, work_h - 1) + self._safe_addstr(bot_row, px, "─" * panel_w, w, curses.color_pair(6)) + + # --- Bottom: chat window --- + sep_row = work_h + mode_label = "CODE" if self.code_mode else "CHAT" + sep_title = f" {mode_label} {MODEL}@{HOST_SHORT} " + sep_color = curses.color_pair(1) if self.code_mode else curses.color_pair(3) + spad = w - len(sep_title) + sep = ("─" * (spad // 2)) + sep_title + ("─" * ((spad + 1) // 2)) + self._safe_addstr(sep_row, 0, sep, w, sep_color) + + # Chat history lines + chat_display_h = chat_h - 2 # minus separator and input line + with self.lock: + chat_vis = self.chat_history[-chat_display_h:] if chat_display_h > 0 else [] + for i, line in enumerate(chat_vis): + row = sep_row + 1 + i + if row >= h - 1: + break + self._safe_addstr(row, 0, line[:w], w) + + # Input area with word-wrapping + prompt = "▸▸ " if self.multiline_mode else ("▸ " if self.code_mode else "▸ ") + plen = len(prompt) + avail_first = w - plen # chars on first line (after prompt) + avail_rest = w # chars on continuation lines + # Build wrapped lines: first line has prompt, rest are full-width + inp = self.chat_input + wrapped = [] + if avail_first > 0: + wrapped.append(inp[:avail_first]) + inp = inp[avail_first:] + while inp and avail_rest > 0: + wrapped.append(inp[:avail_rest]) + inp = inp[avail_rest:] + if not wrapped: + wrapped = [""] + num_input_lines = len(wrapped) + input_start_row = h - num_input_lines + try: + for li, chunk in enumerate(wrapped): + row = input_start_row + li + if row < sep_row + 1: + continue + if li == 0: + self.stdscr.attron(curses.color_pair(1)) + self.stdscr.addnstr(row, 0, prompt, w) + self.stdscr.attroff(curses.color_pair(1)) + self.stdscr.addnstr(row, plen, chunk, avail_first) + else: + self.stdscr.addnstr(row, 0, chunk, avail_rest) + self.stdscr.clrtoeol() + # Position cursor + cp = self.cursor_pos + if cp <= avail_first: + cur_row = input_start_row + cur_col = plen + cp + else: + cp2 = cp - avail_first + cur_row = input_start_row + 1 + (cp2 // avail_rest if avail_rest else 0) + cur_col = cp2 % avail_rest if avail_rest else 0 + self.stdscr.move(min(cur_row, h - 1), min(cur_col, w - 1)) + except curses.error: + pass + + self.stdscr.refresh() + + def run(self): + self._add_work(f"🚀 Welcome back, {USER_NAME}! — Ctrl+D to stop AI, Ctrl+C to quit") + self._add_work("💡 Tab to toggle CODE/CHAT mode | /help for all commands") + if not MODEL: + self._add_work("⚠ No model chosen. Use /model to select one.") + self._add_work("") + + while True: + self._draw() + try: + ch = self.stdscr.getch() + except KeyboardInterrupt: + break + + if ch == -1: + continue + elif ch == 4: # Ctrl+D + self.stop_flag = True + self._add_work("⛔ Stop signal sent") + elif ch == 3: # Ctrl+C + break + elif ch == 9: # Tab — toggle code/chat mode + self.code_mode = not self.code_mode + elif ch == curses.KEY_MOUSE: + try: + _, _, _, _, bstate = curses.getmouse() + if bstate & curses.BUTTON4_PRESSED: + self.scroll_offset += 3 + elif bstate & curses.BUTTON5_PRESSED: + self.scroll_offset = max(0, self.scroll_offset - 3) + except curses.error: + pass + elif ch == curses.KEY_SR or ch == 575: # Shift+Up or Ctrl+Up — scroll up + self.scroll_offset += 3 + elif ch == curses.KEY_SF or ch == 534: # Shift+Down or Ctrl+Down — scroll down + self.scroll_offset = max(0, self.scroll_offset - 3) + elif ch == curses.KEY_PPAGE: # Page Up + h, _ = self.stdscr.getmaxyx() + self.scroll_offset += max(1, int(h * 0.9) - 5) + elif ch == curses.KEY_NPAGE: # Page Down + h, _ = self.stdscr.getmaxyx() + self.scroll_offset = max(0, self.scroll_offset - max(1, int(h * 0.9) - 5)) + elif ch == curses.KEY_UP: # Arrow Up — previous history + if self.input_history: + if self.history_idx == -1: + self.saved_input = self.chat_input + self.history_idx = len(self.input_history) - 1 + elif self.history_idx > 0: + self.history_idx -= 1 + self.chat_input = self.input_history[self.history_idx] + self.cursor_pos = len(self.chat_input) + elif ch == curses.KEY_DOWN: # Arrow Down — next history + if self.history_idx != -1: + if self.history_idx < len(self.input_history) - 1: + self.history_idx += 1 + self.chat_input = self.input_history[self.history_idx] + else: + self.history_idx = -1 + self.chat_input = self.saved_input + self.cursor_pos = len(self.chat_input) + elif ch == curses.KEY_LEFT: # Arrow Left — back one char + if self.cursor_pos > 0: + self.cursor_pos -= 1 + elif ch == curses.KEY_RIGHT: # Arrow Right — forward one char + if self.cursor_pos < len(self.chat_input): + self.cursor_pos += 1 + elif ch in (curses.KEY_BACKSPACE, 127, 8): + if self.cursor_pos > 0: + self.chat_input = self.chat_input[:self.cursor_pos - 1] + self.chat_input[self.cursor_pos:] + self.cursor_pos -= 1 + elif ch == 1: # Ctrl+A — beginning of line + self.cursor_pos = 0 + elif ch == 5: # Ctrl+E — end of line + self.cursor_pos = len(self.chat_input) + elif ch == 6: # Ctrl+F — forward one char + if self.cursor_pos < len(self.chat_input): + self.cursor_pos += 1 + elif ch == 2: # Ctrl+B — back one char + if self.cursor_pos > 0: + self.cursor_pos -= 1 + elif ch == 23: # Ctrl+W — delete word backward + i = self.cursor_pos + while i > 0 and self.chat_input[i - 1] == ' ': + i -= 1 + while i > 0 and self.chat_input[i - 1] != ' ': + i -= 1 + self.chat_input = self.chat_input[:i] + self.chat_input[self.cursor_pos:] + self.cursor_pos = i + elif ch == 21: # Ctrl+U — clear to start + self.chat_input = self.chat_input[self.cursor_pos:] + self.cursor_pos = 0 + elif ch == 11: # Ctrl+K — kill to end of line + self.chat_input = self.chat_input[:self.cursor_pos] + elif ch == 16: # Ctrl+P — previous history + if self.input_history: + if self.history_idx == -1: + self.saved_input = self.chat_input + self.history_idx = len(self.input_history) - 1 + elif self.history_idx > 0: + self.history_idx -= 1 + self.chat_input = self.input_history[self.history_idx] + self.cursor_pos = len(self.chat_input) + elif ch == 14: # Ctrl+N — next history + if self.history_idx != -1: + if self.history_idx < len(self.input_history) - 1: + self.history_idx += 1 + self.chat_input = self.input_history[self.history_idx] + else: + self.history_idx = -1 + self.chat_input = self.saved_input + self.cursor_pos = len(self.chat_input) + elif ch == 13 and self.multiline_mode: # Ctrl+M toggles multiline + pass # handled below + elif ch in (curses.KEY_ENTER, 10, 13): + # Detect paste: drain any buffered input rapidly + self.stdscr.nodelay(True) + paste_buf = self.chat_input + while True: + pc = self.stdscr.getch() + if pc == -1: + break + elif pc in (curses.KEY_ENTER, 10, 13): + paste_buf += "\n" + elif 32 <= pc <= 126: + paste_buf += chr(pc) + elif pc in (curses.KEY_BACKSPACE, 127, 8): + paste_buf = paste_buf[:-1] if paste_buf else paste_buf + self.stdscr.nodelay(False) + self.stdscr.timeout(100) + self.chat_input = paste_buf + self.cursor_pos = len(self.chat_input) + # In multiline mode, empty line sends; otherwise add to buffer + if self.multiline_mode: + if self.chat_input == "": + msg = "\n".join(self.multiline_buf).strip() + self.multiline_buf = [] + else: + self.multiline_buf.append(self.chat_input) + self.chat_input = "" + self.cursor_pos = 0 + continue + else: + msg = self.chat_input.strip() + self.chat_input = "" + self.cursor_pos = 0 + if not msg: + continue + # Save to input history + if msg and (not self.input_history or self.input_history[-1] != msg): + self._save_input_history(msg) + self.history_idx = -1 + self.saved_input = "" + # Reset scroll to bottom on new input + self.scroll_offset = 0 + if msg.lower() in ("quit", "exit"): + break + if msg == "!" or msg.startswith("!"): + if msg == "!": + curses.endwin() + shell = os.environ.get("SHELL", "/bin/sh") + subprocess.run([shell], cwd=self.cwd) + self.stdscr.refresh() + self._add_work("📟 Returned from shell") + else: + cmd = msg[1:].strip() + # Detect interactive commands that need the full terminal + first_word = cmd.split()[0] if cmd.split() else "" + interactive = {"vi", "vim", "nvim", "nano", "pico", "emacs", "less", "more", "top", "htop", "man", "ssh", "bash", "sh", "zsh", "fish"} + if first_word in interactive: + curses.endwin() + subprocess.run(cmd, shell=True, cwd=self.cwd) + self.stdscr.refresh() + self._add_work(f"📟 Returned from: {cmd}") + else: + self._add_work(f"▶ $ {cmd}") + try: + result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=60, cwd=self.cwd) + if result.stdout.strip(): + self._add_work(result.stdout.strip()) + if result.stderr.strip(): + self._add_work(result.stderr.strip()) + status = "✓" if result.returncode == 0 else f"✗ (exit {result.returncode})" + self._add_work(f" {status}") + except subprocess.TimeoutExpired: + self._add_work(" ✗ (timeout)") + self._draw() + continue + if msg.startswith("/open "): + filepath = msg[6:].strip() + self._load_file_context(filepath) + continue + if msg == "/memory": + if self.memory: + self._add_work("\n🧠 Memory:") + for k, v in self.memory.items(): + self._add_work(f" [{k}]: {v}") + else: + self._add_work("🧠 Memory is empty") + continue + if msg == "/help": + self._add_work("\n📖 Commands:") + self._add_work(" /open — load file as AI context") + self._add_work(" /memory — view AI memory scratchpad") + self._add_work(" /yolo — toggle dangerous command safety") + self._add_work(" /allow — approve a blocked command") + self._add_work(" /ml — toggle multi-line input") + self._add_work(" /clear — reset conversation and work window") + self._add_work(" /pwd — show current working directory") + self._add_work(" /cd — change working directory") + self._add_work(" /config — show current configuration") + self._add_work(" /model — pick model from server") + self._add_work(" /undo — restore last overwritten file") + self._add_work(" /diff — show diff of last overwritten file") + self._add_work(" /cat — view a file in work window") + self._add_work(" /edit — open file in editor ($VISUAL/$EDITOR/vi)") + self._add_work(" /grep — search project files") + self._add_work(" /history — show recent AI actions") + self._add_work(" /help — show this help") + self._add_work(" quit / exit — exit the program") + self._add_work(" Ctrl+D — stop AI mid-action") + self._add_work(" Ctrl+C — quit") + self._add_work(" Tab — toggle CODE/CHAT mode") + self._add_work(" PgUp/PgDn — scroll work window") + self._add_work(" ↑/↓ — input history") + self._add_work(" ←/→ — move cursor") + self._add_work(" Ctrl+A/E — beginning/end of line") + self._add_work(" Ctrl+F/B — forward/back one char") + self._add_work(" Ctrl+W — delete word backward") + self._add_work(" Ctrl+U — clear to start of line") + self._add_work(" Ctrl+K — kill to end of line") + self._add_work(" Ctrl+P/N — previous/next history") + continue + if msg == "/yolo": + self.yolo_mode = not self.yolo_mode + state = "ON ⚡" if self.yolo_mode else "OFF 🛡️" + self._add_work(f"🔧 YOLO mode: {state}") + continue + if msg == "/allow" and self.pending_cmd: + cmd = self.pending_cmd + self.pending_cmd = None + self._add_work(f"✅ Approved: {cmd}") + old = self.yolo_mode + self.yolo_mode = True + self._run_command(cmd) + self.yolo_mode = old + continue + if msg == "/ml": + self.multiline_mode = not self.multiline_mode + state = "ON (blank line to send)" if self.multiline_mode else "OFF" + self._add_work(f"📝 Multi-line mode: {state}") + continue + if msg == "/clear": + with self.lock: + self.work_lines.clear() + self.chat_history.clear() + self.messages = [{"role": "system", "content": SYSTEM_PROMPT}] + self.scroll_offset = 0 + self._load_project_context() + self._add_work("🧹 Cleared conversation and work window") + continue + if msg == "/pwd": + self._add_work(f"📁 {self.cwd}") + continue + if msg.startswith("/cd "): + target = msg[4:].strip() + if not os.path.isabs(target): + target = os.path.join(self.cwd, target) + target = os.path.realpath(os.path.normpath(target)) + if not target.startswith(self.root_dir): + self._add_work(f"❌ Cannot leave project directory: {self.root_dir}") + elif os.path.isdir(target): + self.cwd = target + self._add_work(f"📁 Changed to {self.cwd}") + else: + self._add_work(f"❌ Not a directory: {target}") + continue + if msg == "/config": + self._add_work(f"\n⚙️ Config ({CONFIG_FILE}):") + self._add_work(f" Host: {OLLAMA_URL}") + self._add_work(f" Model: {MODEL}") + self._add_work(f" User: {USER_NAME}") + self._add_work(f" Auth: {'set' if AUTH_HEADERS else 'none'}") + self._add_work(f" Ctx: {CONTEXT_SIZE}") + self._add_work(f" Back: {BACKEND}") + continue + if msg == "/model": + self._show_model_picker() + continue + if msg == "/undo": + if self._last_backup: + fname, content = self._last_backup + with open(fname, "w") as f: + f.write(content) + self._add_work(f"↩️ Restored {fname}") + self._last_backup = None + else: + self._add_work("❌ Nothing to undo") + continue + if msg.startswith("/cat "): + filepath = msg[5:].strip() + if not os.path.isabs(filepath): + filepath = os.path.join(self.cwd, filepath) + try: + with open(filepath) as f: + self._add_work(f"\n📄 {filepath}:") + self._add_work(f.read().rstrip()) + except Exception as e: + self._add_work(f"❌ {e}") + continue + if msg.startswith("/diff"): + if self._last_backup: + fname, old = self._last_backup + try: + with open(fname) as f: + new = f.read() + diff = difflib.unified_diff(old.splitlines(), new.splitlines(), fromfile="before", tofile="after", lineterm="") + self._add_work(f"\n📊 Diff: {fname}") + for line in diff: + self._add_work(line) + except Exception as e: + self._add_work(f"❌ {e}") + else: + self._add_work("❌ No backup to diff against") + continue + if msg == "/history": + self._add_work("\n📜 Recent actions:") + for entry in self.ai_history[-20:]: + self._add_work(f" {entry}") + if not self.ai_history: + self._add_work(" (empty)") + continue + if msg.startswith("/edit "): + filepath = msg[6:].strip() + self._edit_in_editor(filepath) + continue + if msg.startswith("/grep "): + pattern = msg[6:].strip() + self._add_work(f"\n🔍 Searching for: {pattern}") + try: + result = subprocess.run( + ["grep", "-rn", "--color=never", pattern, "."], + capture_output=True, text=True, timeout=10, cwd=self.cwd + ) + if result.stdout.strip(): + for line in result.stdout.strip().split("\n")[:30]: + self._add_work(f" {line}") + else: + self._add_work(" No matches found") + except Exception as e: + self._add_work(f"❌ {e}") + continue + self._add_chat(f"▸ {msg}") + if not self.busy: + t = threading.Thread(target=self._query_ollama, args=(msg,), daemon=True) + t.start() + else: + self._add_chat("⏳ AI is busy, wait or Ctrl+D to stop") + elif 32 <= ch <= 126: + self.chat_input = self.chat_input[:self.cursor_pos] + chr(ch) + self.chat_input[self.cursor_pos:] + self.cursor_pos += 1 + + +def main(stdscr): + Vibe(stdscr).run() + + +if __name__ == "__main__": + curses.wrapper(main) + print(f"\n👋 Bye, {USER_NAME}! Don't forget to commit your changes to git.\n")