diff --git a/server/ws/listenerSession.go b/server/ws/listenerSession.go deleted file mode 100644 index f0c65804a..000000000 --- a/server/ws/listenerSession.go +++ /dev/null @@ -1,56 +0,0 @@ -package ws - -import ( - "sync" - - "github.com/gorilla/websocket" -) - -// BlockIDClientPair is a tuple of BlockID and WebSocket connection -type BlockIDClientPair struct { - BlockID string - Client *websocket.Conn -} - -// ListenerSession is a WebSocket session that is notified of changes to blocks -type ListenerSession struct { - mu sync.RWMutex - blockIDClientPairs []BlockIDClientPair -} - -// AddListener adds a listener for a blockID's change -func (s *ListenerSession) AddListener(client *websocket.Conn, blockID string) { - var p = BlockIDClientPair{Client: client, BlockID: blockID} - s.mu.Lock() - s.blockIDClientPairs = append(s.blockIDClientPairs, p) - s.mu.Unlock() -} - -// RemoveListener removes a webSocket listener -func (s *ListenerSession) RemoveListener(client *websocket.Conn) { - s.mu.Lock() - var newValue = []BlockIDClientPair{} - for _, p := range s.blockIDClientPairs { - if p.Client != client { - newValue = append(newValue, p) - } - } - s.mu.Unlock() - - s.blockIDClientPairs = newValue -} - -// GetListeners returns the listeners to a blockID's changes -func (s *ListenerSession) GetListeners(blockID string) []*websocket.Conn { - var results = []*websocket.Conn{} - - s.mu.Lock() - for _, p := range s.blockIDClientPairs { - if p.BlockID == blockID { - results = append(results, p.Client) - } - } - s.mu.Unlock() - - return results -} diff --git a/server/ws/websockets.go b/server/ws/websockets.go index 03a0b91e3..ae16608d1 100644 --- a/server/ws/websockets.go +++ b/server/ws/websockets.go @@ -1,6 +1,7 @@ package ws import ( + "encoding/json" "log" "net/http" "sync" @@ -9,21 +10,24 @@ import ( "github.com/gorilla/websocket" ) +// RegisterRoutes registeres routes func (ws *WSServer) RegisterRoutes(r *mux.Router) { r.HandleFunc("/ws/onchange", ws.handleWebSocketOnChange) } -// AddListener adds a listener for a blockID's change -func (ws *WSServer) AddListener(client *websocket.Conn, blockID string) { +// AddListener adds a listener for a block's change +func (ws *WSServer) AddListener(client *websocket.Conn, blockIDs []string) { ws.mu.Lock() - if ws.listeners[blockID] == nil { - ws.listeners[blockID] = []*websocket.Conn{} + for _, blockID := range blockIDs { + if ws.listeners[blockID] == nil { + ws.listeners[blockID] = []*websocket.Conn{} + } + ws.listeners[blockID] = append(ws.listeners[blockID], client) } - ws.listeners[blockID] = append(ws.listeners[blockID], client) ws.mu.Unlock() } -// RemoveListener removes a webSocket listener +// RemoveListener removes a webSocket listener from all blocks func (ws *WSServer) RemoveListener(client *websocket.Conn) { ws.mu.Lock() for key, clients := range ws.listeners { @@ -38,6 +42,30 @@ func (ws *WSServer) RemoveListener(client *websocket.Conn) { ws.mu.Unlock() } +// RemoveListenerFromBlocks removes a webSocket listener from a set of block +func (ws *WSServer) RemoveListenerFromBlocks(client *websocket.Conn, blockIDs []string) { + ws.mu.Lock() + + for _, blockID := range blockIDs { + listeners := ws.listeners[blockID] + if listeners == nil { + return + } + + // Remove the first instance of this client that's listening to this block + // Note: A client can listen multiple times to the same block + for index, listener := range listeners { + if client == listener { + newListeners := append(listeners[:index], listeners[index+1:]...) + ws.listeners[blockID] = newListeners + break + } + } + } + + ws.mu.Unlock() +} + // GetListeners returns the listeners to a blockID's changes func (ws *WSServer) GetListeners(blockID string) []*websocket.Conn { ws.mu.Lock() @@ -72,6 +100,12 @@ type WebsocketMsg struct { BlockID string `json:"blockId"` } +// WebsocketCommand is an incoming command from the client +type WebsocketCommand struct { + Action string `json:"action"` + BlockIDs []string `json:"blockIds"` +} + func (ws *WSServer) handleWebSocketOnChange(w http.ResponseWriter, r *http.Request) { // Upgrade initial GET request to a websocket client, err := ws.upgrader.Upgrade(w, r, nil) @@ -81,13 +115,11 @@ func (ws *WSServer) handleWebSocketOnChange(w http.ResponseWriter, r *http.Reque // TODO: Auth - query := r.URL.Query() - blockID := query.Get("id") - log.Printf("CONNECT WebSocket onChange, blockID: %s, client: %s", blockID, client.RemoteAddr()) + log.Printf("CONNECT WebSocket onChange, client: %s", client.RemoteAddr()) // Make sure we close the connection when the function returns defer func() { - log.Printf("DISCONNECT WebSocket onChange, blockID: %s, client: %s", blockID, client.RemoteAddr()) + log.Printf("DISCONNECT WebSocket onChange, client: %s", client.RemoteAddr()) // Remove client from listeners ws.RemoveListener(client) @@ -95,21 +127,37 @@ func (ws *WSServer) handleWebSocketOnChange(w http.ResponseWriter, r *http.Reque client.Close() }() - // Register our new client - ws.AddListener(client, blockID) - - // TODO: Implement WebSocket message pump // Simple message handling loop for { - _, _, err := client.ReadMessage() + _, p, err := client.ReadMessage() if err != nil { - log.Printf("ERROR WebSocket onChange, blockID: %s, client: %s, err: %v", blockID, client.RemoteAddr(), err) + log.Printf("ERROR WebSocket onChange, client: %s, err: %v", client.RemoteAddr(), err) ws.RemoveListener(client) break } + + var command WebsocketCommand + err = json.Unmarshal(p, &command) + if err != nil { + // handle this error + log.Printf(`ERROR webSocket parsing command JSON: %v`, string(p)) + continue + } + + switch command.Action { + case "ADD": + log.Printf(`Command: Add blockID: %v, client: %s`, command.BlockIDs, client.RemoteAddr()) + ws.AddListener(client, command.BlockIDs) + case "REMOVE": + log.Printf(`Command: Remove blockID: %v, client: %s`, command.BlockIDs, client.RemoteAddr()) + ws.RemoveListenerFromBlocks(client, command.BlockIDs) + default: + log.Printf(`ERROR webSocket command, invalid action: %v`, command.Action) + } } } +// BroadcastBlockChangeToWebsocketClients broadcasts change to clients func (ws *WSServer) BroadcastBlockChangeToWebsocketClients(blockIDs []string) { for _, blockID := range blockIDs { listeners := ws.GetListeners(blockID) diff --git a/webapp/src/components/cardDetail.tsx b/webapp/src/components/cardDetail.tsx index 2f69ee4ee..bb056ab44 100644 --- a/webapp/src/components/cardDetail.tsx +++ b/webapp/src/components/cardDetail.tsx @@ -41,7 +41,7 @@ export default class CardDetail extends React.Component { componentDidMount() { this.cardListener = new OctoListener() - this.cardListener.open(this.props.cardId, async (blockId) => { + this.cardListener.open([this.props.cardId], async (blockId) => { Utils.log(`cardListener.onChanged: ${blockId}`) await cardTree.sync() this.setState({...this.state, cardTree}) diff --git a/webapp/src/octoListener.ts b/webapp/src/octoListener.ts index 556da7183..6a9297037 100644 --- a/webapp/src/octoListener.ts +++ b/webapp/src/octoListener.ts @@ -2,6 +2,18 @@ // See LICENSE.txt for license information. import {Utils} from './utils' +// These are outgoing commands to the server +type WSCommand = { + action: string + blockIds: string[] +} + +// These are messages from the server +type WSMessage = { + action: string + blockId: string +} + // // OctoListener calls a handler when a block or any of its children changes // @@ -12,15 +24,18 @@ class OctoListener { readonly serverUrl: string private ws?: WebSocket + private blockIds: string[] = [] + private isInitialized = false notificationDelay = 200 + reopenDelay = 3000 constructor(serverUrl?: string) { this.serverUrl = serverUrl || window.location.origin Utils.log(`OctoListener serverUrl: ${this.serverUrl}`) } - open(blockId: string, onChange: (blockId: string) => void) { + open(blockIds: string[], onChange: (blockId: string) => void) { let timeoutId: NodeJS.Timeout if (this.ws) { @@ -28,38 +43,42 @@ class OctoListener { } const url = new URL(this.serverUrl) - const wsServerUrl = `ws://${url.host}${url.pathname}ws/onchange?id=${encodeURIComponent(blockId)}` + const wsServerUrl = `ws://${url.host}${url.pathname}ws/onchange` Utils.log(`OctoListener open: ${wsServerUrl}`) const ws = new WebSocket(wsServerUrl) this.ws = ws ws.onopen = () => { - Utils.log(`OctoListener webSocket opened. blockId: ${blockId}`) - ws.send('{}') + Utils.log(`OctoListener webSocket opened.`) + this.addBlocks(blockIds) + this.isInitialized = true } ws.onerror = (e) => { - Utils.logError(`OctoListener websocket onerror. blockId: ${blockId}, data: ${e}`) + Utils.logError(`OctoListener websocket onerror. data: ${e}`) } ws.onclose = (e) => { - Utils.log(`OctoListener websocket onclose, blockId: ${blockId}, code: ${e.code}, reason: ${e.reason}`) + Utils.log(`OctoListener websocket onclose, code: ${e.code}, reason: ${e.reason}`) if (ws === this.ws) { // Unexpected close, re-open - Utils.logError('Unexpected close, re-opening...') - this.open(blockId, onChange) + const reopenBlockIds = this.isInitialized ? this.blockIds.slice() : blockIds.slice() + Utils.logError(`Unexpected close, re-opening with ${reopenBlockIds.length} blocks...`) + setTimeout(() => { + this.open(reopenBlockIds, onChange) + }, this.reopenDelay) } } ws.onmessage = (e) => { - Utils.log(`OctoListener websocket onmessage. blockId: ${blockId}, data: ${e.data}`) + Utils.log(`OctoListener websocket onmessage. data: ${e.data}`) if (ws !== this.ws) { Utils.log(`Ignoring closed ws`) return } try { - const message = JSON.parse(e.data) + const message = JSON.parse(e.data) as WSMessage switch (message.action) { case 'UPDATE_BLOCK': if (timeoutId) { @@ -89,8 +108,49 @@ class OctoListener { // Use this sequence so the onclose method doesn't try to re-open const ws = this.ws this.ws = undefined + this.blockIds = [] + this.isInitialized = false ws.close() } + + addBlocks(blockIds: string[]): void { + if (!this.isOpen) { + Utils.assertFailure(`OctoListener.addBlocks: ws is not open`) + return + } + + const command: WSCommand = { + action: 'ADD', + blockIds + } + + this.ws.send(JSON.stringify(command)) + this.blockIds.push(...blockIds) + } + + removeBlocks(blockIds: string[]): void { + if (!this.isOpen) { + Utils.assertFailure(`OctoListener.removeBlocks: ws is not open`) + return + } + + const command: WSCommand = { + action: 'REMOVE', + blockIds + } + + this.ws.send(JSON.stringify(command)) + + // Remove registered blockIds, maintinging multiple copies (simple ref-counting) + for (let i=0; i { private async attachToBoard(boardId: string, viewId?: string) { Utils.log(`attachToBoard: ${boardId}`) - // this.boardListener.close() - // this.boardListener = new OctoListener() - this.boardListener.open(boardId, (blockId: string) => { - Utils.log(`boardListener.onChanged: ${blockId}`) - this.sync() - }) + if (!this.boardListener.isOpen) { + this.boardListener.open([boardId], (blockId: string) => { + Utils.log(`boardListener.onChanged: ${blockId}`) + this.sync() + }) + } else { + this.boardListener.removeBlocks([this.state.boardId]) + this.boardListener.addBlocks([boardId]) + } this.sync(boardId, viewId) }