From 6a7d3f797b026e2ff4058c2b1e96ebdb88684834 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Espino?= Date: Wed, 28 Jul 2021 18:14:18 +0200 Subject: [PATCH] Websocket client now connects once and subscribe/desubscribe on the fly (#752) * Websocket client now connects once and subscribe/desubscribe on the fly * Fixing eslint error Co-authored-by: Harshil Sharma --- server/ws/websockets.go | 21 ++ webapp/src/app.tsx | 8 + webapp/src/components/cardDialog.tsx | 1 - webapp/src/components/gallery/gallery.tsx | 1 - webapp/src/components/kanban/kanban.tsx | 2 - .../components/sidebar/sidebarBoardItem.tsx | 2 +- webapp/src/components/table/table.tsx | 2 - webapp/src/components/viewMenu.tsx | 8 +- webapp/src/hooks/cardListener.tsx | 44 +-- webapp/src/octoListener.ts | 248 --------------- webapp/src/pages/boardPage.tsx | 70 ++--- webapp/src/wsclient.ts | 285 ++++++++++++++++++ 12 files changed, 346 insertions(+), 346 deletions(-) delete mode 100644 webapp/src/octoListener.ts create mode 100644 webapp/src/wsclient.ts diff --git a/server/ws/websockets.go b/server/ws/websockets.go index 75813f860..4c075ac41 100644 --- a/server/ws/websockets.go +++ b/server/ws/websockets.go @@ -21,6 +21,7 @@ const ( websocketActionSubscribeWorkspace = "SUBSCRIBE_WORKSPACE" websocketActionUnsubscribeWorkspace = "UNSUBSCRIBE_WORKSPACE" websocketActionSubscribeBlocks = "SUBSCRIBE_BLOCKS" + websocketActionUnsubscribeBlocks = "UNSUBSCRIBE_BLOCKS" ) type Hub interface { @@ -211,6 +212,26 @@ func (ws *Server) handleWebSocket(w http.ResponseWriter, r *http.Request) { continue } + if command.Action == websocketActionUnsubscribeBlocks { + ws.logger.Debug(`Command: UNSUBSCRIBE_BLOCKS`, + mlog.String("workspaceID", command.WorkspaceID), + mlog.Stringer("client", wsSession.client.RemoteAddr()), + ) + + if !ws.isCommandReadTokenValid(command) { + ws.logger.Error(`Rejected invalid read token`, + mlog.Stringer("client", wsSession.client.RemoteAddr()), + mlog.String("action", command.Action), + mlog.String("readToken", command.ReadToken), + ) + + continue + } + + ws.unsubscribeListenerFromBlocks(wsSession.client, command.BlockIDs) + continue + } + // if the command is not authenticated at this point, it will // not be processed if !wsSession.isAuthenticated() { diff --git a/webapp/src/app.tsx b/webapp/src/app.tsx index 2ef742cfd..065852ca2 100644 --- a/webapp/src/app.tsx +++ b/webapp/src/app.tsx @@ -22,6 +22,7 @@ import LoginPage from './pages/loginPage' import RegisterPage from './pages/registerPage' import {IUser} from './user' import {Utils} from './utils' +import wsClient from './wsclient' import {importNativeAppSettings} from './nativeApp' import {fetchCurrentUser, getCurrentUser} from './store/currentUser' import {getLanguage, fetchLanguage} from './store/language' @@ -43,6 +44,13 @@ const App = React.memo((): JSX.Element => { }) }, []) + useEffect(() => { + wsClient.open() + return () => { + wsClient.close() + } + }, []) + return ( { const [cardTree, setCardTree] = useState() const intl = useIntl() useCardListener( - [props.cardId], async (blocks) => { Utils.log(`cardListener.onChanged: ${blocks.length}`) const newCardTree = cardTree ? MutableCardTree.incrementalUpdate(cardTree, blocks) : await MutableCardTree.sync(props.cardId) diff --git a/webapp/src/components/gallery/gallery.tsx b/webapp/src/components/gallery/gallery.tsx index 7760f896b..7df75e426 100644 --- a/webapp/src/components/gallery/gallery.tsx +++ b/webapp/src/components/gallery/gallery.tsx @@ -54,7 +54,6 @@ const Gallery = (props: Props): JSX.Element => { const visibleTitle = boardTree.activeView.visiblePropertyIds.includes(Constants.titleColumnId) useCardListener( - cards.map((c) => c.id), async (blocks) => { cards.forEach(async (c) => { const cardTree = cardTrees[c.id] diff --git a/webapp/src/components/kanban/kanban.tsx b/webapp/src/components/kanban/kanban.tsx index 1183df056..73b60c016 100644 --- a/webapp/src/components/kanban/kanban.tsx +++ b/webapp/src/components/kanban/kanban.tsx @@ -53,7 +53,6 @@ const Kanban = (props: Props) => { cardTreeRef.current = cardTrees useCardListener( - cards.map((c) => c.id), async (blocks) => { for (const block of blocks) { const cardTree = cardTreeRef.current && cardTreeRef.current[block.parentId] @@ -68,7 +67,6 @@ const Kanban = (props: Props) => { setCardTrees((oldTree) => ({...oldTree, [c.id]: newCardTree})) }) }, - false, ) const propertyNameChanged = async (option: IPropertyOption, text: string): Promise => { diff --git a/webapp/src/components/sidebar/sidebarBoardItem.tsx b/webapp/src/components/sidebar/sidebarBoardItem.tsx index ffc6fc005..a1f0254aa 100644 --- a/webapp/src/components/sidebar/sidebarBoardItem.tsx +++ b/webapp/src/components/sidebar/sidebarBoardItem.tsx @@ -122,7 +122,7 @@ const SidebarBoardItem = React.memo((props: Props) => { intl.formatMessage({id: 'Sidebar.delete-board', defaultMessage: 'Delete board'}), async () => { if (props.nextBoardId) { - // This delay is needed because OctoListener has a default 100 ms notification delay before updates + // This delay is needed because WSClient has a default 100 ms notification delay before updates setTimeout(() => { showBoard(props.nextBoardId) }, 120) diff --git a/webapp/src/components/table/table.tsx b/webapp/src/components/table/table.tsx index eaeaf4de2..0f8adc409 100644 --- a/webapp/src/components/table/table.tsx +++ b/webapp/src/components/table/table.tsx @@ -46,7 +46,6 @@ const Table = (props: Props) => { cardTreeRef.current = cardTrees useCardListener( - cards.map((c) => c.id), async (blocks) => { for (const block of blocks) { const cardTree = cardTreeRef.current && cardTreeRef.current[block.parentId] @@ -67,7 +66,6 @@ const Table = (props: Props) => { setCardTrees((oldTree) => ({...oldTree, [c.id]: newCardTree})) }) }, - false, ) const {offset, resizingColumn} = useDragLayer((monitor) => { diff --git a/webapp/src/components/viewMenu.tsx b/webapp/src/components/viewMenu.tsx index bb0df6415..7dc13ca62 100644 --- a/webapp/src/components/viewMenu.tsx +++ b/webapp/src/components/viewMenu.tsx @@ -44,7 +44,7 @@ const ViewMenu = React.memo((props: Props) => { newView, 'duplicate view', async () => { - // This delay is needed because OctoListener has a default 100 ms notification delay before updates + // This delay is needed because WSClient has a default 100 ms notification delay before updates setTimeout(() => { showView(newView.id) }, 120) @@ -91,7 +91,7 @@ const ViewMenu = React.memo((props: Props) => { view, 'add view', async () => { - // This delay is needed because OctoListener has a default 100 ms notification delay before updates + // This delay is needed because WSClient has a default 100 ms notification delay before updates setTimeout(() => { showView(view.id) }, 120) @@ -120,7 +120,7 @@ const ViewMenu = React.memo((props: Props) => { view, 'add view', async () => { - // This delay is needed because OctoListener has a default 100 ms notification delay before updates + // This delay is needed because WSClient has a default 100 ms notification delay before updates setTimeout(() => { Utils.log(`showView: ${view.id}`) showView(view.id) @@ -148,7 +148,7 @@ const ViewMenu = React.memo((props: Props) => { view, 'add view', async () => { - // This delay is needed because OctoListener has a default 100 ms notification delay before updates + // This delay is needed because WSClient has a default 100 ms notification delay before updates setTimeout(() => { Utils.log(`showView: ${view.id}`) showView(view.id) diff --git a/webapp/src/hooks/cardListener.tsx b/webapp/src/hooks/cardListener.tsx index 02b43a987..59f4e2895 100644 --- a/webapp/src/hooks/cardListener.tsx +++ b/webapp/src/hooks/cardListener.tsx @@ -3,44 +3,16 @@ import {useEffect} from 'react' import {IBlock} from '../blocks/block' -import octoClient from '../octoClient' -import {OctoListener} from '../octoListener' -import {Utils} from '../utils' - -export default function useCardListener(cardIds: string[], onChange: (blocks: IBlock[]) => void, onReconnect: () => void, initialCall = true): void { - let cardListener: OctoListener | null = null - - const deleteListener = () => { - cardListener?.close() - cardListener = null - } - - const createListener = () => { - deleteListener() - - cardListener = new OctoListener() - cardListener.open( - octoClient.workspaceId, - cardIds, - onChange, - onReconnect, - ) - } - - const createCardTreeAndSync = async () => { - if (initialCall) { - onReconnect() - } - - createListener() - } +import wsClient, {WSClient} from '../wsclient' +export default function useCardListener(onChange: (blocks: IBlock[]) => void, onReconnect: () => void): void { useEffect(() => { - Utils.log(`useCardListener.connect: ${cardIds}`) - createCardTreeAndSync() + const onChangeHandler = (_: WSClient, blocks: IBlock[]) => onChange(blocks) + wsClient.addOnChange(onChangeHandler) + wsClient.addOnReconnect(onReconnect) return () => { - Utils.log(`useCardListener.disconnect: ${cardIds}`) - deleteListener() + wsClient.removeOnChange(onChangeHandler) + wsClient.removeOnReconnect(onReconnect) } - }, [cardIds.join('-')]) + }, []) } diff --git a/webapp/src/octoListener.ts b/webapp/src/octoListener.ts deleted file mode 100644 index 741cad633..000000000 --- a/webapp/src/octoListener.ts +++ /dev/null @@ -1,248 +0,0 @@ -// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. -// See LICENSE.txt for license information. -import {IBlock} from './blocks/block' -import {Utils} from './utils' - -// These are outgoing commands to the server -type WSCommand = { - action: string - workspaceId?: string - readToken?: string - blockIds?: string[] -} - -// These are messages from the server -type WSMessage = { - action?: string - block?: IBlock - error?: string -} - -type OnChangeHandler = (blocks: IBlock[]) => void -type OnStateChange = (state: 'open' | 'close') => void - -// -// OctoListener calls a handler when a block or any of its children changes -// -class OctoListener { - get isOpen(): boolean { - return Boolean(this.ws) - } - - readonly serverUrl: string - private workspaceId?: string - private token: string - private readToken: string - private ws?: WebSocket - private blockIds: string[] = [] - private isInitialized = false - - private onChange?: OnChangeHandler - private updatedBlocks: IBlock[] = [] - private updateTimeout?: NodeJS.Timeout - - notificationDelay = 100 - reopenDelay = 3000 - - constructor(serverUrl?: string, token?: string, readToken?: string) { - this.serverUrl = serverUrl || Utils.buildURL('', true).replace(/\/$/, '') - this.token = token || localStorage.getItem('focalboardSessionId') || '' - this.readToken = readToken || OctoListener.getReadToken() - Utils.log(`OctoListener serverUrl: ${this.serverUrl}`) - } - - static getReadToken(): string { - const queryString = new URLSearchParams(window.location.search) - const readToken = queryString.get('r') || '' - return readToken - } - - open(workspaceId: string, blockIds: string[], onChange: OnChangeHandler, onReconnect: () => void, onStateChange?: OnStateChange): void { - if (this.ws) { - this.close() - } - - this.onChange = onChange - this.workspaceId = workspaceId - - const url = new URL(this.serverUrl) - const protocol = (url.protocol === 'https:') ? 'wss:' : 'ws:' - const wsServerUrl = `${protocol}//${url.host}${url.pathname.replace(/\/$/, '')}/ws` - Utils.log(`OctoListener open: ${wsServerUrl}`) - const ws = new WebSocket(wsServerUrl) - this.ws = ws - - ws.onopen = () => { - Utils.log('OctoListener webSocket opened.') - this.authenticate(workspaceId) - this.subscribeToWorkspace(workspaceId) - this.isInitialized = true - onStateChange?.('open') - } - - ws.onerror = (e) => { - Utils.logError(`OctoListener websocket onerror. data: ${e}`) - } - - ws.onclose = (e) => { - Utils.log(`OctoListener websocket onclose, code: ${e.code}, reason: ${e.reason}`) - if (ws === this.ws) { - // Unexpected close, re-open - const reopenBlockIds = this.isInitialized ? this.blockIds.slice() : blockIds.slice() - Utils.logError(`Unexpected close, re-opening with ${reopenBlockIds.length} blocks...`) - onStateChange?.('close') - setTimeout(() => { - this.open(workspaceId, reopenBlockIds, onChange, onReconnect) - onReconnect() - }, this.reopenDelay) - } - } - - ws.onmessage = (e) => { - // Utils.log(`OctoListener websocket onmessage. data: ${e.data}`) - if (ws !== this.ws) { - Utils.log('Ignoring closed ws') - return - } - - try { - const message = JSON.parse(e.data) as WSMessage - if (message.error) { - Utils.logError(`Listener websocket error: ${message.error}`) - return - } - - switch (message.action) { - case 'UPDATE_BLOCK': - this.queueUpdateNotification(message.block!) - break - default: - Utils.logError(`Unexpected action: ${message.action}`) - } - } catch (err) { - Utils.log('message is not an object') - } - } - } - - close(): void { - if (!this.ws) { - return - } - - Utils.log(`OctoListener close: ${this.ws.url}`) - - // Use this sequence so the onclose method doesn't try to re-open - const ws = this.ws - this.ws = undefined - this.blockIds = [] - this.onChange = undefined - this.isInitialized = false - try { - ws?.close() - } catch { - try { - (ws as any)?.websocket?.close() - } catch { - Utils.log('OctoListener unable to close the websocket') - } - } - } - - private authenticate(workspaceId: string): void { - if (!this.ws) { - Utils.assertFailure('OctoListener.addBlocks: ws is not open') - return - } - - if (!this.token) { - return - } - const command = { - action: 'AUTH', - token: this.token, - workspaceId, - } - this.ws.send(JSON.stringify(command)) - } - - private subscribeToBlocks(blockIds: string[]): void { - if (!this.ws) { - Utils.assertFailure('OctoListener.subscribeToBlocks: ws is not open') - return - } - - const command: WSCommand = { - action: 'SUBSCRIBE_BLOCKS', - blockIds, - workspaceId: this.workspaceId, - readToken: this.readToken, - } - - this.ws.send(JSON.stringify(command)) - this.blockIds.push(...blockIds) - } - - private subscribeToWorkspace(workspaceId: string): void { - if (!this.ws) { - Utils.assertFailure('OctoListener.subscribeToWorkspace: ws is not open') - return - } - - const command: WSCommand = { - action: 'SUBSCRIBE_WORKSPACE', - workspaceId, - } - - this.ws.send(JSON.stringify(command)) - } - - private removeBlocks(blockIds: string[]): void { - if (!this.ws) { - Utils.assertFailure('OctoListener.removeBlocks: ws is not open') - return - } - - const command: WSCommand = { - action: 'REMOVE', - blockIds, - workspaceId: this.workspaceId, - readToken: this.readToken, - } - - this.ws.send(JSON.stringify(command)) - - // Remove registered blockIds, maintinging multiple copies (simple ref-counting) - for (let i = 0; i < this.blockIds.length; i++) { - for (let j = 0; j < blockIds.length; j++) { - if (this.blockIds[i] === blockIds[j]) { - this.blockIds.splice(i, 1) - blockIds.splice(j, 1) - } - } - } - } - - private queueUpdateNotification(block: IBlock) { - this.updatedBlocks = this.updatedBlocks.filter((o) => o.id !== block.id) // Remove existing queued update - this.updatedBlocks.push(block) - if (this.updateTimeout) { - clearTimeout(this.updateTimeout) - this.updateTimeout = undefined - } - - this.updateTimeout = setTimeout(() => { - this.flushUpdateNotifications() - }, this.notificationDelay) - } - - private flushUpdateNotifications() { - for (const block of this.updatedBlocks) { - Utils.log(`OctoListener flush update block: ${block.id}`) - } - this.onChange?.(this.updatedBlocks) - this.updatedBlocks = [] - } -} - -export {OctoListener} diff --git a/webapp/src/pages/boardPage.tsx b/webapp/src/pages/boardPage.tsx index 922c316eb..5974b11b4 100644 --- a/webapp/src/pages/boardPage.tsx +++ b/webapp/src/pages/boardPage.tsx @@ -13,8 +13,8 @@ import {sendFlashMessage} from '../components/flashMessages' import Workspace from '../components/workspace' import mutator from '../mutator' import octoClient from '../octoClient' -import {OctoListener} from '../octoListener' import {Utils} from '../utils' +import wsClient, {WSClient} from '../wsclient' import {BoardTree, MutableBoardTree} from '../viewModel/boardTree' import {MutableWorkspaceTree, WorkspaceTree} from '../viewModel/workspaceTree' import './boardPage.scss' @@ -38,8 +38,6 @@ type State = { } class BoardPage extends React.Component { - private workspaceListener = new OctoListener() - constructor(props: Props) { super(props) @@ -146,17 +144,32 @@ class BoardPage extends React.Component { } } + updateWebsocketState = (_: WSClient, newState: 'init'|'open'|'close'): void => { + if (newState === 'open') { + const token = localStorage.getItem('focalboardSessionId') || '' + wsClient.authenticate(this.props.match.params.workspaceId || '0', token) + wsClient.subscribeToWorkspace(this.props.match.params.workspaceId || '0') + } + this.setState({websocketClosed: newState === 'close'}) + } + componentDidMount(): void { if (this.props.match.params.boardId) { this.attachToBoard(this.props.match.params.boardId, this.props.match.params.viewId) } else { this.sync() } + wsClient.addOnChange(this.incrementalUpdate) + wsClient.addOnReconnect(this.sync) + wsClient.addOnStateChange(this.updateWebsocketState) } componentWillUnmount(): void { Utils.log(`boardPage.componentWillUnmount: ${this.props.match.params.boardId}`) - this.workspaceListener.close() + wsClient.unsubscribeToWorkspace(this.props.match.params.workspaceId || '0') + wsClient.removeOnChange(this.incrementalUpdate) + wsClient.removeOnReconnect(this.sync) + wsClient.removeOnStateChange(this.updateWebsocketState) } render(): JSX.Element { @@ -226,7 +239,7 @@ class BoardPage extends React.Component { } } - private async sync() { + private sync = async () => { Utils.log(`sync start: ${this.props.match.params.boardId}`) let workspace: IWorkspace | undefined @@ -239,53 +252,8 @@ class BoardPage extends React.Component { } const workspaceTree = await MutableWorkspaceTree.sync() - const boardIds = [...workspaceTree.boards.map((o) => o.id), ...workspaceTree.boardTemplates.map((o) => o.id)] this.setState({workspace, workspaceTree}) - let boardIdsToListen: string[] - if (boardIds.length > 0) { - boardIdsToListen = ['', ...boardIds] - } else { - // Read-only view - boardIdsToListen = [this.props.match.params.boardId || ''] - } - - // Listen to boards plus all blocks at root (Empty string for parentId) - this.workspaceListener.open( - octoClient.workspaceId, - boardIdsToListen, - async (blocks) => { - Utils.log(`workspaceListener.onChanged: ${blocks.length}`) - this.incrementalUpdate(blocks) - }, - () => { - Utils.log('workspaceListener.onReconnect') - this.sync() - }, - (state) => { - switch (state) { - case 'close': { - // Show error after a delay to ignore brief interruptions - if (!this.state.websocketClosed && !this.state.websocketClosedTimeOutId) { - const timeoutId = setTimeout(() => { - this.setState({websocketClosed: true, websocketClosedTimeOutId: undefined}) - }, 5000) - this.setState({websocketClosedTimeOutId: timeoutId}) - } - break - } - case 'open': { - if (this.state.websocketClosedTimeOutId) { - clearTimeout(this.state.websocketClosedTimeOutId) - } - this.setState({websocketClosed: false, websocketClosedTimeOutId: undefined}) - Utils.log('Connection established') - break - } - } - }, - ) - if (this.props.match.params.boardId) { const boardTree = await MutableBoardTree.sync(this.props.match.params.boardId || '', this.props.match.params.viewId || '', this.props.usersById) @@ -314,7 +282,7 @@ class BoardPage extends React.Component { } } - private async incrementalUpdate(blocks: IBlock[]) { + private incrementalUpdate = async (_: WSClient, blocks: IBlock[]) => { const {workspaceTree, boardTree} = this.state let newState = {workspaceTree, boardTree} diff --git a/webapp/src/wsclient.ts b/webapp/src/wsclient.ts new file mode 100644 index 000000000..7617a5af5 --- /dev/null +++ b/webapp/src/wsclient.ts @@ -0,0 +1,285 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. +import {Utils} from './utils' +import {IBlock} from './blocks/block' + +// These are outgoing commands to the server +type WSCommand = { + action: string + workspaceId?: string + readToken?: string + blockIds?: string[] +} + +// These are messages from the server +type WSMessage = { + action?: string + block?: IBlock + error?: string +} + +type OnChangeHandler = (client: WSClient, blocks: IBlock[]) => void +type OnReconnectHandler = (client: WSClient) => void +type OnStateChangeHandler = (client: WSClient, state: 'init' | 'open' | 'close') => void +type OnErrorHandler = (client: WSClient, e: Event) => void + +class WSClient { + ws: WebSocket|null = null + serverUrl: string + state: 'init'|'open'|'close' = 'init' + onStateChange: OnStateChangeHandler[] = [] + onReconnect: OnReconnectHandler[] = [] + onChange: OnChangeHandler[] = [] + onError: OnErrorHandler[] = [] + private notificationDelay = 100 + private reopenDelay = 3000 + private updatedBlocks: IBlock[] = [] + private updateTimeout?: NodeJS.Timeout + + constructor(serverUrl?: string) { + this.serverUrl = (serverUrl || Utils.getBaseURL(true)).replace(/\/$/, '') + Utils.log(`WSClient serverUrl: ${this.serverUrl}`) + } + + addOnChange(handler: OnChangeHandler): void { + this.onChange.push(handler) + } + + removeOnChange(handler: OnChangeHandler): void { + const index = this.onChange.indexOf(handler) + if (index !== -1) { + this.onChange.splice(index, 1) + } + } + + addOnReconnect(handler: OnReconnectHandler): void { + this.onReconnect.push(handler) + } + + removeOnReconnect(handler: OnReconnectHandler): void { + const index = this.onReconnect.indexOf(handler) + if (index !== -1) { + this.onReconnect.splice(index, 1) + } + } + + addOnStateChange(handler: OnStateChangeHandler): void { + this.onStateChange.push(handler) + } + + removeOnStateChange(handler: OnStateChangeHandler): void { + const index = this.onStateChange.indexOf(handler) + if (index !== -1) { + this.onStateChange.splice(index, 1) + } + } + + addOnError(handler: OnErrorHandler): void { + this.onError.push(handler) + } + + removeOnError(handler: OnErrorHandler): void { + const index = this.onError.indexOf(handler) + if (index !== -1) { + this.onError.splice(index, 1) + } + } + + open(): void { + const url = new URL(this.serverUrl) + const protocol = (url.protocol === 'https:') ? 'wss:' : 'ws:' + const wsServerUrl = `${protocol}//${url.host}${url.pathname.replace(/\/$/, '')}/ws` + Utils.log(`WSClient open: ${wsServerUrl}`) + const ws = new WebSocket(wsServerUrl) + this.ws = ws + + ws.onopen = () => { + Utils.log('WSClient webSocket opened.') + for (const handler of this.onStateChange) { + handler(this, 'open') + } + this.state = 'open' + } + + ws.onerror = (e) => { + Utils.logError(`WSClient websocket onerror. data: ${e}`) + for (const handler of this.onError) { + handler(this, e) + } + } + + ws.onclose = (e) => { + Utils.log(`WSClient websocket onclose, code: ${e.code}, reason: ${e.reason}`) + if (ws === this.ws) { + // Unexpected close, re-open + Utils.logError('Unexpected close, re-opening websocket') + for (const handler of this.onStateChange) { + handler(this, 'close') + } + this.state = 'close' + setTimeout(() => { + this.open() + for (const handler of this.onReconnect) { + handler(this) + } + }, this.reopenDelay) + } + } + + ws.onmessage = (e) => { + // Utils.log(`WSClient websocket onmessage. data: ${e.data}`) + if (ws !== this.ws) { + Utils.log('Ignoring closed ws') + return + } + + try { + const message = JSON.parse(e.data) as WSMessage + if (message.error) { + Utils.logError(`Listener websocket error: ${message.error}`) + return + } + + switch (message.action) { + case 'UPDATE_BLOCK': + this.queueUpdateNotification(message.block!) + break + default: + Utils.logError(`Unexpected action: ${message.action}`) + } + } catch (err) { + Utils.log('message is not an object') + } + } + } + + authenticate(workspaceId: string, token: string): void { + if (!this.ws) { + Utils.assertFailure('WSClient.addBlocks: ws is not open') + return + } + + if (!token) { + return + } + const command = { + action: 'AUTH', + token, + workspaceId, + } + this.ws.send(JSON.stringify(command)) + } + + subscribeToBlocks(workspaceId: string, blockIds: string[], readToken = ''): void { + if (!this.ws) { + Utils.assertFailure('WSClient.subscribeToBlocks: ws is not open') + return + } + + const command: WSCommand = { + action: 'SUBSCRIBE_BLOCKS', + blockIds, + workspaceId, + readToken, + } + + this.ws.send(JSON.stringify(command)) + } + + unsubscribeToWorkspace(workspaceId: string): void { + if (!this.ws) { + Utils.assertFailure('WSClient.subscribeToWorkspace: ws is not open') + return + } + + const command: WSCommand = { + action: 'UNSUBSCRIBE_WORKSPACE', + workspaceId, + } + + this.ws.send(JSON.stringify(command)) + } + + subscribeToWorkspace(workspaceId: string): void { + if (!this.ws) { + Utils.assertFailure('WSClient.subscribeToWorkspace: ws is not open') + return + } + + const command: WSCommand = { + action: 'SUBSCRIBE_WORKSPACE', + workspaceId, + } + + this.ws.send(JSON.stringify(command)) + } + + unsubscribeFromBlocks(workspaceId: string, blockIds: string[], readToken = ''): void { + if (!this.ws) { + Utils.assertFailure('WSClient.removeBlocks: ws is not open') + return + } + + const command: WSCommand = { + action: 'UNSUBSCRIBE_BLOCKS', + blockIds, + workspaceId, + readToken, + } + + this.ws.send(JSON.stringify(command)) + } + + private queueUpdateNotification(block: IBlock) { + this.updatedBlocks = this.updatedBlocks.filter((o) => o.id !== block.id) // Remove existing queued update + this.updatedBlocks.push(block) + if (this.updateTimeout) { + clearTimeout(this.updateTimeout) + this.updateTimeout = undefined + } + + this.updateTimeout = setTimeout(() => { + this.flushUpdateNotifications() + }, this.notificationDelay) + } + + private flushUpdateNotifications() { + for (const block of this.updatedBlocks) { + Utils.log(`WSClient flush update block: ${block.id}`) + } + for (const handler of this.onChange) { + handler(this, this.updatedBlocks) + } + this.updatedBlocks = [] + } + + close(): void { + if (!this.ws) { + return + } + + Utils.log(`WSClient close: ${this.ws.url}`) + + // Use this sequence so the onclose method doesn't try to re-open + const ws = this.ws + this.ws = null + this.onChange = [] + this.onReconnect = [] + this.onStateChange = [] + this.onError = [] + try { + ws?.close() + } catch { + try { + (ws as any)?.websocket?.close() + } catch { + Utils.log('WSClient unable to close the websocket') + } + } + } +} + +const wsClient = new WSClient() + +export {WSClient} +export default wsClient