ed4d0b261f
- Go WebSocket hub: rooms per document, broadcast updates, persist state - Yjs integration: connect/disconnect, sync document state - Collab toggle button in toolbar (Solo/Live) - When Live: edits broadcast to all connected users in real-time - Yjs state persisted to SQLite (survives server restart) - gorilla/websocket dependency added
134 lines
2.9 KiB
Go
134 lines
2.9 KiB
Go
package collab
|
|
|
|
import (
|
|
"database/sql"
|
|
"log"
|
|
"net/http"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/gorilla/websocket"
|
|
)
|
|
|
|
var upgrader = websocket.Upgrader{
|
|
CheckOrigin: func(r *http.Request) bool { return true },
|
|
}
|
|
|
|
// Room holds all connected clients for a single document.
|
|
type Room struct {
|
|
mu sync.Mutex
|
|
clients map[*websocket.Conn]bool
|
|
}
|
|
|
|
// Hub manages all active collaboration rooms.
|
|
type Hub struct {
|
|
mu sync.Mutex
|
|
rooms map[string]*Room
|
|
db *sql.DB
|
|
}
|
|
|
|
func NewHub(db *sql.DB) *Hub {
|
|
return &Hub{
|
|
rooms: make(map[string]*Room),
|
|
db: db,
|
|
}
|
|
}
|
|
|
|
func (h *Hub) getRoom(fileID string) *Room {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
if r, ok := h.rooms[fileID]; ok {
|
|
return r
|
|
}
|
|
r := &Room{clients: make(map[*websocket.Conn]bool)}
|
|
h.rooms[fileID] = r
|
|
return r
|
|
}
|
|
|
|
func (h *Hub) removeClient(fileID string, conn *websocket.Conn) {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
if r, ok := h.rooms[fileID]; ok {
|
|
r.mu.Lock()
|
|
delete(r.clients, conn)
|
|
empty := len(r.clients) == 0
|
|
r.mu.Unlock()
|
|
if empty {
|
|
delete(h.rooms, fileID)
|
|
}
|
|
}
|
|
}
|
|
|
|
// HandleWebSocket handles the Yjs WebSocket sync protocol.
|
|
// Clients send binary Yjs update messages; the hub broadcasts to all others in the room.
|
|
func (h *Hub) HandleWebSocket(w http.ResponseWriter, r *http.Request) {
|
|
// Extract file ID from path: /ws/collab/{fileID}
|
|
path := r.URL.Path
|
|
parts := strings.Split(strings.TrimPrefix(path, "/ws/collab/"), "/")
|
|
fileID := parts[0]
|
|
if fileID == "" {
|
|
http.Error(w, "file ID required", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
conn, err := upgrader.Upgrade(w, r, nil)
|
|
if err != nil {
|
|
log.Printf("WebSocket upgrade failed: %v", err)
|
|
return
|
|
}
|
|
defer conn.Close()
|
|
|
|
room := h.getRoom(fileID)
|
|
room.mu.Lock()
|
|
room.clients[conn] = true
|
|
room.mu.Unlock()
|
|
|
|
defer h.removeClient(fileID, conn)
|
|
|
|
// Send stored Yjs state if available
|
|
var storedState []byte
|
|
h.db.QueryRow("SELECT yjs_state FROM collab_state WHERE file_id = ?", fileID).Scan(&storedState)
|
|
if storedState != nil {
|
|
conn.WriteMessage(websocket.BinaryMessage, storedState)
|
|
}
|
|
|
|
// Read loop: receive updates from this client, broadcast to others, persist
|
|
for {
|
|
msgType, msg, err := conn.ReadMessage()
|
|
if err != nil {
|
|
break
|
|
}
|
|
if msgType != websocket.BinaryMessage {
|
|
continue
|
|
}
|
|
|
|
// Persist latest state
|
|
h.db.Exec(
|
|
`INSERT INTO collab_state (file_id, yjs_state, updated_at) VALUES (?, ?, datetime('now'))
|
|
ON CONFLICT(file_id) DO UPDATE SET yjs_state = ?, updated_at = datetime('now')`,
|
|
fileID, msg, msg,
|
|
)
|
|
|
|
// Broadcast to other clients in the room
|
|
room.mu.Lock()
|
|
for client := range room.clients {
|
|
if client != conn {
|
|
client.WriteMessage(websocket.BinaryMessage, msg)
|
|
}
|
|
}
|
|
room.mu.Unlock()
|
|
}
|
|
}
|
|
|
|
// ActiveUsers returns the number of connected users for a file.
|
|
func (h *Hub) ActiveUsers(fileID string) int {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
if r, ok := h.rooms[fileID]; ok {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
return len(r.clients)
|
|
}
|
|
return 0
|
|
}
|