WebSocket listener: Handle add/remove blockIds

This commit is contained in:
Chen-I Lim 2020-10-21 12:36:43 -07:00
parent d7af819afc
commit 0b07f454bb
5 changed files with 144 additions and 89 deletions

View file

@ -1,56 +0,0 @@
package ws
import (
"sync"
"github.com/gorilla/websocket"
)
// BlockIDClientPair is a tuple of BlockID and WebSocket connection
type BlockIDClientPair struct {
BlockID string
Client *websocket.Conn
}
// ListenerSession is a WebSocket session that is notified of changes to blocks
type ListenerSession struct {
mu sync.RWMutex
blockIDClientPairs []BlockIDClientPair
}
// AddListener adds a listener for a blockID's change
func (s *ListenerSession) AddListener(client *websocket.Conn, blockID string) {
var p = BlockIDClientPair{Client: client, BlockID: blockID}
s.mu.Lock()
s.blockIDClientPairs = append(s.blockIDClientPairs, p)
s.mu.Unlock()
}
// RemoveListener removes a webSocket listener
func (s *ListenerSession) RemoveListener(client *websocket.Conn) {
s.mu.Lock()
var newValue = []BlockIDClientPair{}
for _, p := range s.blockIDClientPairs {
if p.Client != client {
newValue = append(newValue, p)
}
}
s.mu.Unlock()
s.blockIDClientPairs = newValue
}
// GetListeners returns the listeners to a blockID's changes
func (s *ListenerSession) GetListeners(blockID string) []*websocket.Conn {
var results = []*websocket.Conn{}
s.mu.Lock()
for _, p := range s.blockIDClientPairs {
if p.BlockID == blockID {
results = append(results, p.Client)
}
}
s.mu.Unlock()
return results
}

View file

@ -1,6 +1,7 @@
package ws
import (
"encoding/json"
"log"
"net/http"
"sync"
@ -9,21 +10,24 @@ import (
"github.com/gorilla/websocket"
)
// RegisterRoutes registeres routes
func (ws *WSServer) RegisterRoutes(r *mux.Router) {
r.HandleFunc("/ws/onchange", ws.handleWebSocketOnChange)
}
// AddListener adds a listener for a blockID's change
func (ws *WSServer) AddListener(client *websocket.Conn, blockID string) {
// AddListener adds a listener for a block's change
func (ws *WSServer) AddListener(client *websocket.Conn, blockIDs []string) {
ws.mu.Lock()
if ws.listeners[blockID] == nil {
ws.listeners[blockID] = []*websocket.Conn{}
for _, blockID := range blockIDs {
if ws.listeners[blockID] == nil {
ws.listeners[blockID] = []*websocket.Conn{}
}
ws.listeners[blockID] = append(ws.listeners[blockID], client)
}
ws.listeners[blockID] = append(ws.listeners[blockID], client)
ws.mu.Unlock()
}
// RemoveListener removes a webSocket listener
// RemoveListener removes a webSocket listener from all blocks
func (ws *WSServer) RemoveListener(client *websocket.Conn) {
ws.mu.Lock()
for key, clients := range ws.listeners {
@ -38,6 +42,30 @@ func (ws *WSServer) RemoveListener(client *websocket.Conn) {
ws.mu.Unlock()
}
// RemoveListenerFromBlocks removes a webSocket listener from a set of block
func (ws *WSServer) RemoveListenerFromBlocks(client *websocket.Conn, blockIDs []string) {
ws.mu.Lock()
for _, blockID := range blockIDs {
listeners := ws.listeners[blockID]
if listeners == nil {
return
}
// Remove the first instance of this client that's listening to this block
// Note: A client can listen multiple times to the same block
for index, listener := range listeners {
if client == listener {
newListeners := append(listeners[:index], listeners[index+1:]...)
ws.listeners[blockID] = newListeners
break
}
}
}
ws.mu.Unlock()
}
// GetListeners returns the listeners to a blockID's changes
func (ws *WSServer) GetListeners(blockID string) []*websocket.Conn {
ws.mu.Lock()
@ -72,6 +100,12 @@ type WebsocketMsg struct {
BlockID string `json:"blockId"`
}
// WebsocketCommand is an incoming command from the client
type WebsocketCommand struct {
Action string `json:"action"`
BlockIDs []string `json:"blockIds"`
}
func (ws *WSServer) handleWebSocketOnChange(w http.ResponseWriter, r *http.Request) {
// Upgrade initial GET request to a websocket
client, err := ws.upgrader.Upgrade(w, r, nil)
@ -81,13 +115,11 @@ func (ws *WSServer) handleWebSocketOnChange(w http.ResponseWriter, r *http.Reque
// TODO: Auth
query := r.URL.Query()
blockID := query.Get("id")
log.Printf("CONNECT WebSocket onChange, blockID: %s, client: %s", blockID, client.RemoteAddr())
log.Printf("CONNECT WebSocket onChange, client: %s", client.RemoteAddr())
// Make sure we close the connection when the function returns
defer func() {
log.Printf("DISCONNECT WebSocket onChange, blockID: %s, client: %s", blockID, client.RemoteAddr())
log.Printf("DISCONNECT WebSocket onChange, client: %s", client.RemoteAddr())
// Remove client from listeners
ws.RemoveListener(client)
@ -95,21 +127,37 @@ func (ws *WSServer) handleWebSocketOnChange(w http.ResponseWriter, r *http.Reque
client.Close()
}()
// Register our new client
ws.AddListener(client, blockID)
// TODO: Implement WebSocket message pump
// Simple message handling loop
for {
_, _, err := client.ReadMessage()
_, p, err := client.ReadMessage()
if err != nil {
log.Printf("ERROR WebSocket onChange, blockID: %s, client: %s, err: %v", blockID, client.RemoteAddr(), err)
log.Printf("ERROR WebSocket onChange, client: %s, err: %v", client.RemoteAddr(), err)
ws.RemoveListener(client)
break
}
var command WebsocketCommand
err = json.Unmarshal(p, &command)
if err != nil {
// handle this error
log.Printf(`ERROR webSocket parsing command JSON: %v`, string(p))
continue
}
switch command.Action {
case "ADD":
log.Printf(`Command: Add blockID: %v, client: %s`, command.BlockIDs, client.RemoteAddr())
ws.AddListener(client, command.BlockIDs)
case "REMOVE":
log.Printf(`Command: Remove blockID: %v, client: %s`, command.BlockIDs, client.RemoteAddr())
ws.RemoveListenerFromBlocks(client, command.BlockIDs)
default:
log.Printf(`ERROR webSocket command, invalid action: %v`, command.Action)
}
}
}
// BroadcastBlockChangeToWebsocketClients broadcasts change to clients
func (ws *WSServer) BroadcastBlockChangeToWebsocketClients(blockIDs []string) {
for _, blockID := range blockIDs {
listeners := ws.GetListeners(blockID)

View file

@ -41,7 +41,7 @@ export default class CardDetail extends React.Component<Props, State> {
componentDidMount() {
this.cardListener = new OctoListener()
this.cardListener.open(this.props.cardId, async (blockId) => {
this.cardListener.open([this.props.cardId], async (blockId) => {
Utils.log(`cardListener.onChanged: ${blockId}`)
await cardTree.sync()
this.setState({...this.state, cardTree})

View file

@ -2,6 +2,18 @@
// See LICENSE.txt for license information.
import {Utils} from './utils'
// These are outgoing commands to the server
type WSCommand = {
action: string
blockIds: string[]
}
// These are messages from the server
type WSMessage = {
action: string
blockId: string
}
//
// OctoListener calls a handler when a block or any of its children changes
//
@ -12,15 +24,18 @@ class OctoListener {
readonly serverUrl: string
private ws?: WebSocket
private blockIds: string[] = []
private isInitialized = false
notificationDelay = 200
reopenDelay = 3000
constructor(serverUrl?: string) {
this.serverUrl = serverUrl || window.location.origin
Utils.log(`OctoListener serverUrl: ${this.serverUrl}`)
}
open(blockId: string, onChange: (blockId: string) => void) {
open(blockIds: string[], onChange: (blockId: string) => void) {
let timeoutId: NodeJS.Timeout
if (this.ws) {
@ -28,38 +43,42 @@ class OctoListener {
}
const url = new URL(this.serverUrl)
const wsServerUrl = `ws://${url.host}${url.pathname}ws/onchange?id=${encodeURIComponent(blockId)}`
const wsServerUrl = `ws://${url.host}${url.pathname}ws/onchange`
Utils.log(`OctoListener open: ${wsServerUrl}`)
const ws = new WebSocket(wsServerUrl)
this.ws = ws
ws.onopen = () => {
Utils.log(`OctoListener webSocket opened. blockId: ${blockId}`)
ws.send('{}')
Utils.log(`OctoListener webSocket opened.`)
this.addBlocks(blockIds)
this.isInitialized = true
}
ws.onerror = (e) => {
Utils.logError(`OctoListener websocket onerror. blockId: ${blockId}, data: ${e}`)
Utils.logError(`OctoListener websocket onerror. data: ${e}`)
}
ws.onclose = (e) => {
Utils.log(`OctoListener websocket onclose, blockId: ${blockId}, code: ${e.code}, reason: ${e.reason}`)
Utils.log(`OctoListener websocket onclose, code: ${e.code}, reason: ${e.reason}`)
if (ws === this.ws) {
// Unexpected close, re-open
Utils.logError('Unexpected close, re-opening...')
this.open(blockId, onChange)
const reopenBlockIds = this.isInitialized ? this.blockIds.slice() : blockIds.slice()
Utils.logError(`Unexpected close, re-opening with ${reopenBlockIds.length} blocks...`)
setTimeout(() => {
this.open(reopenBlockIds, onChange)
}, this.reopenDelay)
}
}
ws.onmessage = (e) => {
Utils.log(`OctoListener websocket onmessage. blockId: ${blockId}, data: ${e.data}`)
Utils.log(`OctoListener websocket onmessage. data: ${e.data}`)
if (ws !== this.ws) {
Utils.log(`Ignoring closed ws`)
return
}
try {
const message = JSON.parse(e.data)
const message = JSON.parse(e.data) as WSMessage
switch (message.action) {
case 'UPDATE_BLOCK':
if (timeoutId) {
@ -89,8 +108,49 @@ class OctoListener {
// Use this sequence so the onclose method doesn't try to re-open
const ws = this.ws
this.ws = undefined
this.blockIds = []
this.isInitialized = false
ws.close()
}
addBlocks(blockIds: string[]): void {
if (!this.isOpen) {
Utils.assertFailure(`OctoListener.addBlocks: ws is not open`)
return
}
const command: WSCommand = {
action: 'ADD',
blockIds
}
this.ws.send(JSON.stringify(command))
this.blockIds.push(...blockIds)
}
removeBlocks(blockIds: string[]): void {
if (!this.isOpen) {
Utils.assertFailure(`OctoListener.removeBlocks: ws is not open`)
return
}
const command: WSCommand = {
action: 'REMOVE',
blockIds
}
this.ws.send(JSON.stringify(command))
// Remove registered blockIds, maintinging multiple copies (simple ref-counting)
for (let i=0; i<this.blockIds.length; i++) {
for (let j=0; j<blockIds.length; j++) {
if (this.blockIds[i] === blockIds[j]) {
this.blockIds.splice(i, 1)
blockIds.splice(j, 1)
}
}
}
}
}
export {OctoListener}

View file

@ -161,12 +161,15 @@ export default class BoardPage extends React.Component<Props, State> {
private async attachToBoard(boardId: string, viewId?: string) {
Utils.log(`attachToBoard: ${boardId}`)
// this.boardListener.close()
// this.boardListener = new OctoListener()
this.boardListener.open(boardId, (blockId: string) => {
Utils.log(`boardListener.onChanged: ${blockId}`)
this.sync()
})
if (!this.boardListener.isOpen) {
this.boardListener.open([boardId], (blockId: string) => {
Utils.log(`boardListener.onChanged: ${blockId}`)
this.sync()
})
} else {
this.boardListener.removeBlocks([this.state.boardId])
this.boardListener.addBlocks([boardId])
}
this.sync(boardId, viewId)
}