package collab import ( "database/sql" "log" "net/http" "strings" "sync" "github.com/gorilla/websocket" ) var upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } // Room holds all connected clients for a single document. type Room struct { mu sync.Mutex clients map[*websocket.Conn]bool } // Hub manages all active collaboration rooms. type Hub struct { mu sync.Mutex rooms map[string]*Room db *sql.DB } func NewHub(db *sql.DB) *Hub { return &Hub{ rooms: make(map[string]*Room), db: db, } } func (h *Hub) getRoom(fileID string) *Room { h.mu.Lock() defer h.mu.Unlock() if r, ok := h.rooms[fileID]; ok { return r } r := &Room{clients: make(map[*websocket.Conn]bool)} h.rooms[fileID] = r return r } func (h *Hub) removeClient(fileID string, conn *websocket.Conn) { h.mu.Lock() defer h.mu.Unlock() if r, ok := h.rooms[fileID]; ok { r.mu.Lock() delete(r.clients, conn) empty := len(r.clients) == 0 r.mu.Unlock() if empty { delete(h.rooms, fileID) } } } // HandleWebSocket handles the Yjs WebSocket sync protocol. // Clients send binary Yjs update messages; the hub broadcasts to all others in the room. func (h *Hub) HandleWebSocket(w http.ResponseWriter, r *http.Request) { // Extract file ID from path: /ws/collab/{fileID} path := r.URL.Path parts := strings.Split(strings.TrimPrefix(path, "/ws/collab/"), "/") fileID := parts[0] if fileID == "" { http.Error(w, "file ID required", http.StatusBadRequest) return } conn, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Printf("WebSocket upgrade failed: %v", err) return } defer conn.Close() room := h.getRoom(fileID) room.mu.Lock() room.clients[conn] = true room.mu.Unlock() defer h.removeClient(fileID, conn) // Send stored Yjs state if available var storedState []byte h.db.QueryRow("SELECT yjs_state FROM collab_state WHERE file_id = ?", fileID).Scan(&storedState) if storedState != nil { conn.WriteMessage(websocket.BinaryMessage, storedState) } // Read loop: receive updates from this client, broadcast to others, persist for { msgType, msg, err := conn.ReadMessage() if err != nil { break } if msgType != websocket.BinaryMessage { continue } // Persist latest state h.db.Exec( `INSERT INTO collab_state (file_id, yjs_state, updated_at) VALUES (?, ?, datetime('now')) ON CONFLICT(file_id) DO UPDATE SET yjs_state = ?, updated_at = datetime('now')`, fileID, msg, msg, ) // Broadcast to other clients in the room room.mu.Lock() for client := range room.clients { if client != conn { client.WriteMessage(websocket.BinaryMessage, msg) } } room.mu.Unlock() } } // ActiveUsers returns the number of connected users for a file. func (h *Hub) ActiveUsers(fileID string) int { h.mu.Lock() defer h.mu.Unlock() if r, ok := h.rooms[fileID]; ok { r.mu.Lock() defer r.mu.Unlock() return len(r.clients) } return 0 }