Files
markdown-hub/internal/collab/hub.go
T
anders ed4d0b261f Real-time collaboration (Yjs + WebSocket)
- Go WebSocket hub: rooms per document, broadcast updates, persist state
- Yjs integration: connect/disconnect, sync document state
- Collab toggle button in toolbar (Solo/Live)
- When Live: edits broadcast to all connected users in real-time
- Yjs state persisted to SQLite (survives server restart)
- gorilla/websocket dependency added
2026-05-22 23:49:12 +02:00

134 lines
2.9 KiB
Go

package collab
import (
"database/sql"
"log"
"net/http"
"strings"
"sync"
"github.com/gorilla/websocket"
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}
// Room holds all connected clients for a single document.
type Room struct {
mu sync.Mutex
clients map[*websocket.Conn]bool
}
// Hub manages all active collaboration rooms.
type Hub struct {
mu sync.Mutex
rooms map[string]*Room
db *sql.DB
}
func NewHub(db *sql.DB) *Hub {
return &Hub{
rooms: make(map[string]*Room),
db: db,
}
}
func (h *Hub) getRoom(fileID string) *Room {
h.mu.Lock()
defer h.mu.Unlock()
if r, ok := h.rooms[fileID]; ok {
return r
}
r := &Room{clients: make(map[*websocket.Conn]bool)}
h.rooms[fileID] = r
return r
}
func (h *Hub) removeClient(fileID string, conn *websocket.Conn) {
h.mu.Lock()
defer h.mu.Unlock()
if r, ok := h.rooms[fileID]; ok {
r.mu.Lock()
delete(r.clients, conn)
empty := len(r.clients) == 0
r.mu.Unlock()
if empty {
delete(h.rooms, fileID)
}
}
}
// HandleWebSocket handles the Yjs WebSocket sync protocol.
// Clients send binary Yjs update messages; the hub broadcasts to all others in the room.
func (h *Hub) HandleWebSocket(w http.ResponseWriter, r *http.Request) {
// Extract file ID from path: /ws/collab/{fileID}
path := r.URL.Path
parts := strings.Split(strings.TrimPrefix(path, "/ws/collab/"), "/")
fileID := parts[0]
if fileID == "" {
http.Error(w, "file ID required", http.StatusBadRequest)
return
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("WebSocket upgrade failed: %v", err)
return
}
defer conn.Close()
room := h.getRoom(fileID)
room.mu.Lock()
room.clients[conn] = true
room.mu.Unlock()
defer h.removeClient(fileID, conn)
// Send stored Yjs state if available
var storedState []byte
h.db.QueryRow("SELECT yjs_state FROM collab_state WHERE file_id = ?", fileID).Scan(&storedState)
if storedState != nil {
conn.WriteMessage(websocket.BinaryMessage, storedState)
}
// Read loop: receive updates from this client, broadcast to others, persist
for {
msgType, msg, err := conn.ReadMessage()
if err != nil {
break
}
if msgType != websocket.BinaryMessage {
continue
}
// Persist latest state
h.db.Exec(
`INSERT INTO collab_state (file_id, yjs_state, updated_at) VALUES (?, ?, datetime('now'))
ON CONFLICT(file_id) DO UPDATE SET yjs_state = ?, updated_at = datetime('now')`,
fileID, msg, msg,
)
// Broadcast to other clients in the room
room.mu.Lock()
for client := range room.clients {
if client != conn {
client.WriteMessage(websocket.BinaryMessage, msg)
}
}
room.mu.Unlock()
}
}
// ActiveUsers returns the number of connected users for a file.
func (h *Hub) ActiveUsers(fileID string) int {
h.mu.Lock()
defer h.mu.Unlock()
if r, ok := h.rooms[fileID]; ok {
r.mu.Lock()
defer r.mu.Unlock()
return len(r.clients)
}
return 0
}