From ed4d0b261f9eef0cbd654079a4fec95a847633c1 Mon Sep 17 00:00:00 2001 From: Anders Holck Date: Fri, 22 May 2026 23:49:12 +0200 Subject: [PATCH] Real-time collaboration (Yjs + WebSocket) - Go WebSocket hub: rooms per document, broadcast updates, persist state - Yjs integration: connect/disconnect, sync document state - Collab toggle button in toolbar (Solo/Live) - When Live: edits broadcast to all connected users in real-time - Yjs state persisted to SQLite (survives server restart) - gorilla/websocket dependency added --- cmd/server/main.go | 9 ++- frontend/src/App.vue | 22 ++++++ frontend/src/lib/collab.js | 60 +++++++++++++++++ go.mod | 1 + go.sum | 2 + internal/collab/hub.go | 133 +++++++++++++++++++++++++++++++++++++ 6 files changed, 226 insertions(+), 1 deletion(-) create mode 100644 frontend/src/lib/collab.js create mode 100644 internal/collab/hub.go diff --git a/cmd/server/main.go b/cmd/server/main.go index 9985916..26f9286 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -10,6 +10,7 @@ import ( "markdownhub/internal/api" "markdownhub/internal/auth" + "markdownhub/internal/collab" "markdownhub/internal/db" "markdownhub/internal/files" ) @@ -33,8 +34,14 @@ func main() { router := api.NewRouter(database, dataDir, secret) + // Collab WebSocket hub + hub := collab.NewHub(database) + mux := http.NewServeMux() + mux.Handle("/ws/collab/", http.HandlerFunc(hub.HandleWebSocket)) + mux.Handle("/", router) + fmt.Printf("MarkdownHub listening on :%s\n", port) - log.Fatal(http.ListenAndServe(":"+port, router)) + log.Fatal(http.ListenAndServe(":"+port, mux)) } func ensureAdminUser(database *db.DB, dataDir string) { diff --git a/frontend/src/App.vue b/frontend/src/App.vue index e9c3706..c441295 100644 --- a/frontend/src/App.vue +++ b/frontend/src/App.vue @@ -59,6 +59,9 @@
{{ currentFile || 'No file open' }} {{ fileMeta }} + @@ -248,6 +251,7 @@ import MilkdownEditor from './components/MilkdownEditor.vue' import { api, setToken } from './lib/api.js' import { renderMarkdown } from './lib/markdown.js' import { cacheFile, getCachedFile, addPendingChange, getPendingChanges, clearAllPending } from './lib/offline.js' +import { connectCollab, disconnectCollab, setCollabContent } from './lib/collab.js' const authenticated = ref(false) const email = ref('') @@ -277,6 +281,7 @@ const shareMsg = ref('') const gitDirty = ref(0) const aiResult = ref('') const trashItems = ref([]) +const collabActive = ref(false) // Preferences const prefs = ref({ timezone: Intl.DateTimeFormat().resolvedOptions().timeZone, defaultMode: 'split', theme: 'dark' }) @@ -678,6 +683,23 @@ async function aiVerify() { } } +// ─── Collab ────────────────────────────────────────────────────────────────── + +function toggleCollab() { + if (collabActive.value) { + disconnectCollab() + collabActive.value = false + } else { + if (!currentFile.value) return + const fileId = currentFile.value.replace(/[^a-zA-Z0-9]/g, '-') + connectCollab(fileId, (newContent) => { + content.value = newContent + }) + setCollabContent(content.value) + collabActive.value = true + } +} + // ─── Formatting ────────────────────────────────────────────────────────────── function insertFormat(before, after) { diff --git a/frontend/src/lib/collab.js b/frontend/src/lib/collab.js new file mode 100644 index 0000000..0a704ec --- /dev/null +++ b/frontend/src/lib/collab.js @@ -0,0 +1,60 @@ +import * as Y from 'yjs' +import { WebsocketProvider } from 'y-websocket' + +let ydoc = null +let provider = null +let ytext = null + +/** + * Connect to the collab WebSocket for a given file. + * Returns the Yjs Text type that can be bound to an editor. + */ +export function connectCollab(fileId, onUpdate) { + disconnectCollab() + + ydoc = new Y.Doc() + ytext = ydoc.getText('content') + + const wsUrl = `${location.protocol === 'https:' ? 'wss:' : 'ws:'}//${location.host}/ws/collab/${fileId}` + provider = new WebsocketProvider(wsUrl, fileId, ydoc, { connect: true }) + + ytext.observe((event) => { + if (onUpdate) { + onUpdate(ytext.toString()) + } + }) + + provider.on('status', (event) => { + console.log('[collab]', event.status) + }) + + return { ydoc, ytext, provider } +} + +export function disconnectCollab() { + if (provider) { + provider.destroy() + provider = null + } + if (ydoc) { + ydoc.destroy() + ydoc = null + } + ytext = null +} + +export function setCollabContent(text) { + if (!ytext) return + ydoc.transact(() => { + ytext.delete(0, ytext.length) + ytext.insert(0, text) + }) +} + +export function getCollabContent() { + return ytext ? ytext.toString() : '' +} + +export function isConnected() { + return provider && provider.wsconnected +} diff --git a/go.mod b/go.mod index 09f49e3..4acae73 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.23 require ( github.com/golang-jwt/jwt/v5 v5.2.1 github.com/google/uuid v1.6.0 + github.com/gorilla/websocket v1.5.3 golang.org/x/crypto v0.32.0 modernc.org/sqlite v1.34.5 ) diff --git a/go.sum b/go.sum index 0dd1833..1cfaa5f 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,8 @@ github.com/google/pprof v0.0.0-20240409012703-83162a5b38cd h1:gbpYu9NMq8jhDVbvlG github.com/google/pprof v0.0.0-20240409012703-83162a5b38cd/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4= diff --git a/internal/collab/hub.go b/internal/collab/hub.go new file mode 100644 index 0000000..0670dfd --- /dev/null +++ b/internal/collab/hub.go @@ -0,0 +1,133 @@ +package collab + +import ( + "database/sql" + "log" + "net/http" + "strings" + "sync" + + "github.com/gorilla/websocket" +) + +var upgrader = websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { return true }, +} + +// Room holds all connected clients for a single document. +type Room struct { + mu sync.Mutex + clients map[*websocket.Conn]bool +} + +// Hub manages all active collaboration rooms. +type Hub struct { + mu sync.Mutex + rooms map[string]*Room + db *sql.DB +} + +func NewHub(db *sql.DB) *Hub { + return &Hub{ + rooms: make(map[string]*Room), + db: db, + } +} + +func (h *Hub) getRoom(fileID string) *Room { + h.mu.Lock() + defer h.mu.Unlock() + if r, ok := h.rooms[fileID]; ok { + return r + } + r := &Room{clients: make(map[*websocket.Conn]bool)} + h.rooms[fileID] = r + return r +} + +func (h *Hub) removeClient(fileID string, conn *websocket.Conn) { + h.mu.Lock() + defer h.mu.Unlock() + if r, ok := h.rooms[fileID]; ok { + r.mu.Lock() + delete(r.clients, conn) + empty := len(r.clients) == 0 + r.mu.Unlock() + if empty { + delete(h.rooms, fileID) + } + } +} + +// HandleWebSocket handles the Yjs WebSocket sync protocol. +// Clients send binary Yjs update messages; the hub broadcasts to all others in the room. +func (h *Hub) HandleWebSocket(w http.ResponseWriter, r *http.Request) { + // Extract file ID from path: /ws/collab/{fileID} + path := r.URL.Path + parts := strings.Split(strings.TrimPrefix(path, "/ws/collab/"), "/") + fileID := parts[0] + if fileID == "" { + http.Error(w, "file ID required", http.StatusBadRequest) + return + } + + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Printf("WebSocket upgrade failed: %v", err) + return + } + defer conn.Close() + + room := h.getRoom(fileID) + room.mu.Lock() + room.clients[conn] = true + room.mu.Unlock() + + defer h.removeClient(fileID, conn) + + // Send stored Yjs state if available + var storedState []byte + h.db.QueryRow("SELECT yjs_state FROM collab_state WHERE file_id = ?", fileID).Scan(&storedState) + if storedState != nil { + conn.WriteMessage(websocket.BinaryMessage, storedState) + } + + // Read loop: receive updates from this client, broadcast to others, persist + for { + msgType, msg, err := conn.ReadMessage() + if err != nil { + break + } + if msgType != websocket.BinaryMessage { + continue + } + + // Persist latest state + h.db.Exec( + `INSERT INTO collab_state (file_id, yjs_state, updated_at) VALUES (?, ?, datetime('now')) + ON CONFLICT(file_id) DO UPDATE SET yjs_state = ?, updated_at = datetime('now')`, + fileID, msg, msg, + ) + + // Broadcast to other clients in the room + room.mu.Lock() + for client := range room.clients { + if client != conn { + client.WriteMessage(websocket.BinaryMessage, msg) + } + } + room.mu.Unlock() + } +} + +// ActiveUsers returns the number of connected users for a file. +func (h *Hub) ActiveUsers(fileID string) int { + h.mu.Lock() + defer h.mu.Unlock() + if r, ok := h.rooms[fileID]; ok { + r.mu.Lock() + defer r.mu.Unlock() + return len(r.clients) + } + return 0 +}