// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. // See LICENSE.txt for license information. import {ClientConfig} from './config/clientConfig' import {Utils, WSMessagePayloads} from './utils' import {Block} from './blocks/block' import {Board, BoardMember} from './blocks/board' import {OctoUtils} from './octoUtils' import {BlockCategoryWebsocketData, Category} from './store/sidebar' // These are outgoing commands to the server type WSCommand = { action: string teamId?: string readToken?: string blockIds?: string[] } // These are messages from the server export type WSMessage = { action?: string block?: Block board?: Board category?: Category blockCategories?: BlockCategoryWebsocketData error?: string teamId?: string member?: BoardMember } export const ACTION_UPDATE_BOARD = 'UPDATE_BOARD' export const ACTION_UPDATE_MEMBER = 'UPDATE_MEMBER' export const ACTION_DELETE_MEMBER = 'DELETE_MEMBER' export const ACTION_UPDATE_BLOCK = 'UPDATE_BLOCK' export const ACTION_AUTH = 'AUTH' export const ACTION_SUBSCRIBE_BLOCKS = 'SUBSCRIBE_BLOCKS' export const ACTION_SUBSCRIBE_TEAM = 'SUBSCRIBE_TEAM' export const ACTION_UNSUBSCRIBE_TEAM = 'UNSUBSCRIBE_TEAM' export const ACTION_UNSUBSCRIBE_BLOCKS = 'UNSUBSCRIBE_BLOCKS' export const ACTION_UPDATE_CLIENT_CONFIG = 'UPDATE_CLIENT_CONFIG' export const ACTION_UPDATE_CATEGORY = 'UPDATE_CATEGORY' export const ACTION_UPDATE_BLOCK_CATEGORY = 'UPDATE_BLOCK_CATEGORY' export const ACTION_UPDATE_SUBSCRIPTION = 'UPDATE_SUBSCRIPTION' type WSSubscriptionMsg = { action?: string subscription?: Subscription error?: string } export interface Subscription { blockId: string subscriberId: string blockType: string subscriberType: string notifiedAt?: number createAt?: number deleteAt?: number } // The Mattermost websocket client interface export interface MMWebSocketClient { conn: WebSocket | null; sendMessage(action: string, data: any, responseCallback?: () => void): void /* eslint-disable-line @typescript-eslint/no-explicit-any */ setFirstConnectCallback(callback: () => void): void setReconnectCallback(callback: () => void): void setErrorCallback(callback: (event: Event) => void): void setCloseCallback(callback: (connectFailCount: number) => void): void } type OnChangeHandler = (client: WSClient, items: any[]) => void type OnReconnectHandler = (client: WSClient) => void type OnStateChangeHandler = (client: WSClient, state: 'init' | 'open' | 'close') => void type OnErrorHandler = (client: WSClient, e: Event) => void type OnConfigChangeHandler = (client: WSClient, clientConfig: ClientConfig) => void type FollowChangeHandler = (client: WSClient, subscription: Subscription) => void export type ChangeHandlerType = 'block' | 'category' | 'blockCategories' | 'board' | 'boardMembers' type UpdatedData = { Blocks: Block[] Categories: Category[] BlockCategories: Array Boards: Board[] BoardMembers: BoardMember[] } type ChangeHandlers = { Block: OnChangeHandler[] Category: OnChangeHandler[] BlockCategory: OnChangeHandler[] Board: OnChangeHandler[] BoardMember: OnChangeHandler[] } class WSClient { ws: WebSocket|null = null client: MMWebSocketClient|null = null onPluginReconnect: null|(() => void) = null pluginId = '' pluginVersion = '' teamId = '' onAppVersionChangeHandler: ((versionHasChanged: boolean) => void) | null = null clientPrefix = '' serverUrl: string | undefined state: 'init'|'open'|'close' = 'init' onStateChange: OnStateChangeHandler[] = [] onReconnect: OnReconnectHandler[] = [] onChange: ChangeHandlers = {Block: [], Category: [], BlockCategory: [], Board: [], BoardMember: []} onError: OnErrorHandler[] = [] onConfigChange: OnConfigChangeHandler[] = [] onFollowBlock: FollowChangeHandler = () => {} onUnfollowBlock: FollowChangeHandler = () => {} private notificationDelay = 100 private reopenDelay = 3000 private updatedData: UpdatedData = {Blocks: [], Categories: [], BlockCategories: [], Boards: [], BoardMembers: []} private updateTimeout?: NodeJS.Timeout private errorPollId?: NodeJS.Timeout private logged = false // this need to be a function rather than a const because // one of the global variable (`window.baseURL`) is set at runtime // after the first instance of OctoClient is created. // Avoiding the race condition becomes more complex than making // the base URL dynamic though a function private getBaseURL(): string { const baseURL = (this.serverUrl || Utils.getBaseURL(true)).replace(/\/$/, '') // Logging this for debugging. // Logging just once to avoid log noise. if (!this.logged) { Utils.log(`WSClient serverUrl: ${baseURL}`) this.logged = true } return baseURL } constructor(serverUrl?: string) { this.serverUrl = serverUrl } initPlugin(pluginId: string, pluginVersion: string, client: MMWebSocketClient): void { this.pluginId = pluginId this.pluginVersion = pluginVersion this.clientPrefix = `custom_${pluginId}_` this.client = client Utils.log(`WSClient initialised for plugin id "${pluginId}"`) } sendCommand(command: WSCommand): void { if (this.client !== null) { const {action, ...data} = command this.client.sendMessage(this.clientPrefix + action, data) return } this.ws?.send(JSON.stringify(command)) } addOnChange(handler: OnChangeHandler, type: ChangeHandlerType): void { switch (type) { case 'block': this.onChange.Block.push(handler) break case 'category': this.onChange.Category.push(handler) break case 'blockCategories': this.onChange.BlockCategory.push(handler) break case 'board': this.onChange.Board.push(handler) break case 'boardMembers': this.onChange.BoardMember.push(handler) break } } removeOnChange(needle: OnChangeHandler, type: ChangeHandlerType): void { let haystack = [] switch (type) { case 'block': haystack = this.onChange.Block break case 'blockCategories': haystack = this.onChange.BlockCategory break case 'board': haystack = this.onChange.Board break case 'boardMembers': haystack = this.onChange.BoardMember break case 'category': haystack = this.onChange.Category break } if (!haystack) { return } const index = haystack.indexOf(needle) if (index !== -1) { haystack.splice(index, 1) } } addOnReconnect(handler: OnReconnectHandler): void { this.onReconnect.push(handler) } removeOnReconnect(handler: OnReconnectHandler): void { const index = this.onReconnect.indexOf(handler) if (index !== -1) { this.onReconnect.splice(index, 1) } } addOnStateChange(handler: OnStateChangeHandler): void { this.onStateChange.push(handler) } removeOnStateChange(handler: OnStateChangeHandler): void { const index = this.onStateChange.indexOf(handler) if (index !== -1) { this.onStateChange.splice(index, 1) } } addOnError(handler: OnErrorHandler): void { this.onError.push(handler) } removeOnError(handler: OnErrorHandler): void { const index = this.onError.indexOf(handler) if (index !== -1) { this.onError.splice(index, 1) } } addOnConfigChange(handler: OnConfigChangeHandler): void { this.onConfigChange.push(handler) } removeOnConfigChange(handler: OnConfigChangeHandler): void { const index = this.onConfigChange.indexOf(handler) if (index !== -1) { this.onConfigChange.splice(index, 1) } } open(): void { if (this.client !== null) { // configure the Mattermost websocket client callbacks const onConnect = () => { Utils.log('WSClient in plugin mode, reusing Mattermost WS connection') for (const handler of this.onStateChange) { handler(this, 'open') } this.state = 'open' } const onReconnect = () => { Utils.logWarn('WSClient reconnected') onConnect() for (const handler of this.onReconnect) { handler(this) } } this.onPluginReconnect = onReconnect const onClose = (connectFailCount: number) => { Utils.logError(`WSClient has been closed, connect fail count: ${connectFailCount}`) for (const handler of this.onStateChange) { handler(this, 'close') } this.state = 'close' // there is no way to react to a reconnection with the // reliable websockets schema, so we poll the raw // websockets client for its state directly until it // reconnects if (!this.errorPollId) { this.errorPollId = setInterval(() => { Utils.logWarn(`Polling websockets connection for state: ${this.client?.conn?.readyState}`) if (this.client?.conn?.readyState === 1) { onReconnect() clearInterval(this.errorPollId!) this.errorPollId = undefined } }, 500) } } const onError = (event: Event) => { Utils.logError(`WSClient websocket onerror. data: ${JSON.stringify(event)}`) for (const handler of this.onError) { handler(this, event) } } this.client.setFirstConnectCallback(onConnect) this.client.setErrorCallback(onError) this.client.setCloseCallback(onClose) this.client.setReconnectCallback(onReconnect) return } const url = new URL(this.getBaseURL()) const protocol = (url.protocol === 'https:') ? 'wss:' : 'ws:' const wsServerUrl = `${protocol}//${url.host}${url.pathname.replace(/\/$/, '')}/ws` Utils.log(`WSClient open: ${wsServerUrl}`) const ws = new WebSocket(wsServerUrl) this.ws = ws ws.onopen = () => { Utils.log('WSClient webSocket opened.') this.state = 'open' for (const handler of this.onStateChange) { handler(this, 'open') } } ws.onerror = (e) => { Utils.logError(`WSClient websocket onerror. data: ${e}`) for (const handler of this.onError) { handler(this, e) } } ws.onclose = (e) => { Utils.log(`WSClient websocket onclose, code: ${e.code}, reason: ${e.reason}`) if (ws === this.ws) { // Unexpected close, re-open Utils.logError('Unexpected close, re-opening websocket') for (const handler of this.onStateChange) { handler(this, 'close') } this.state = 'close' setTimeout(() => { this.open() for (const handler of this.onReconnect) { handler(this) } }, this.reopenDelay) } } ws.onmessage = (e) => { if (ws !== this.ws) { Utils.log('Ignoring closed ws') return } try { const message = JSON.parse(e.data) as WSMessage if (message.error) { Utils.logError(`Listener websocket error: ${message.error}`) return } switch (message.action) { case ACTION_UPDATE_BOARD: this.updateHandler(message) break case ACTION_UPDATE_MEMBER: this.updateHandler(message) break case ACTION_DELETE_MEMBER: this.updateHandler(message) break case ACTION_UPDATE_BLOCK: this.updateHandler(message) break case ACTION_UPDATE_CATEGORY: this.updateHandler(message) break case ACTION_UPDATE_BLOCK_CATEGORY: this.updateHandler(message) break case ACTION_UPDATE_SUBSCRIPTION: this.updateSubscriptionHandler(message) break default: Utils.logError(`Unexpected action: ${message.action}`) } } catch (err) { Utils.log('message is not an object') } } } hasConn(): boolean { return this.ws !== null || this.client !== null } updateHandler(message: WSMessage): void { // if messages are directed to a team, process only the ones // for the current team if (message.teamId && message.teamId !== this.teamId) { return } const [data, type] = Utils.fixWSData(message) if (data) { this.queueUpdateNotification(data, type) } } setOnFollowBlock(handler: FollowChangeHandler): void { this.onFollowBlock = handler } setOnUnfollowBlock(handler: FollowChangeHandler): void { this.onUnfollowBlock = handler } updateClientConfigHandler(config: ClientConfig): void { for (const handler of this.onConfigChange) { handler(this, config) } } updateSubscriptionHandler(message: WSSubscriptionMsg): void { Utils.log('updateSubscriptionHandler: ' + message.action + '; blockId=' + message.subscription?.blockId) if (!message.subscription) { return } const handler = message.subscription.deleteAt ? this.onUnfollowBlock : this.onFollowBlock handler(this, message.subscription) } setOnAppVersionChangeHandler(fn: (versionHasChanged: boolean) => void): void { this.onAppVersionChangeHandler = fn } pluginStatusesChangedHandler(data: any): void { if (this.pluginId === '' || !this.onAppVersionChangeHandler) { return } const focalboardStatusChange = data.plugin_statuses.find((s: any) => s.plugin_id === this.pluginId) if (focalboardStatusChange) { // if the plugin version is greater than the current one, // show the new version banner if (Utils.compareVersions(this.pluginVersion, focalboardStatusChange.version) > 0) { Utils.log('Boards plugin has been updated') this.onAppVersionChangeHandler(true) } // if the plugin version is greater or equal, trigger a // reconnect to resubscribe in case the interface hasn't // been reloaded if (Utils.compareVersions(this.pluginVersion, focalboardStatusChange.version) >= 0) { // this is a temporal solution that leaves a second // between the message and the reconnect so the server // has time to register the WS handler setTimeout(() => { if (this.onPluginReconnect) { Utils.log('Reconnecting after plugin update') this.onPluginReconnect() } }, 1000) } } } authenticate(teamId: string, token: string): void { if (!this.hasConn()) { Utils.assertFailure('WSClient.addBlocks: ws is not open') return } if (!token) { return } const command = { action: ACTION_AUTH, token, teamId, } this.sendCommand(command) } subscribeToBlocks(teamId: string, blockIds: string[], readToken = ''): void { if (!this.hasConn()) { Utils.assertFailure('WSClient.subscribeToBlocks: ws is not open') return } const command: WSCommand = { action: ACTION_SUBSCRIBE_BLOCKS, blockIds, teamId, readToken, } this.sendCommand(command) } unsubscribeToTeam(teamId: string): void { if (!this.hasConn()) { Utils.assertFailure('WSClient.subscribeToTeam: ws is not open') return } const command: WSCommand = { action: ACTION_UNSUBSCRIBE_TEAM, teamId, } this.sendCommand(command) } subscribeToTeam(teamId: string): void { if (!this.hasConn()) { Utils.assertFailure('WSClient.subscribeToTeam: ws is not open') return } const command: WSCommand = { action: ACTION_SUBSCRIBE_TEAM, teamId, } this.sendCommand(command) } unsubscribeFromBlocks(teamId: string, blockIds: string[], readToken = ''): void { if (!this.hasConn()) { Utils.assertFailure('WSClient.removeBlocks: ws is not open') return } const command: WSCommand = { action: ACTION_UNSUBSCRIBE_BLOCKS, blockIds, teamId, readToken, } this.sendCommand(command) } private queueUpdateNotification(data: WSMessagePayloads, type: ChangeHandlerType) { if (!data) { return } // Remove existing queued update if (type === 'block') { this.updatedData.Blocks = this.updatedData.Blocks.filter((o) => o.id !== (data as Block).id) this.updatedData.Blocks.push(OctoUtils.hydrateBlock(data as Block)) } else if (type === 'category') { this.updatedData.Categories = this.updatedData.Categories.filter((c) => c.id !== (data as Category).id) this.updatedData.Categories.push(data as Category) } else if (type === 'blockCategories') { this.updatedData.BlockCategories = this.updatedData.BlockCategories.filter((b) => b.blockID === (data as BlockCategoryWebsocketData).blockID) this.updatedData.BlockCategories.push(data as BlockCategoryWebsocketData) } else if (type === 'board') { this.updatedData.Boards = this.updatedData.Boards.filter((b) => b.id !== (data as Board).id) this.updatedData.Boards.push(data as Board) } else if (type === 'boardMembers') { this.updatedData.BoardMembers = this.updatedData.BoardMembers.filter((m) => m.userId !== (data as BoardMember).userId || m.boardId !== (data as BoardMember).boardId) this.updatedData.BoardMembers.push(data as BoardMember) } if (this.updateTimeout) { clearTimeout(this.updateTimeout) this.updateTimeout = undefined } this.updateTimeout = setTimeout(() => { this.flushUpdateNotifications() }, this.notificationDelay) } // private queueUpdateBoardNotification(board: Board) { // this.updatedBoards = this.updatedBoards.filter((o) => o.id !== board.id) // Remove existing queued update // // ToDo: hydrate required? // // this.updatedBoards.push(OctoUtils.hydrateBoard(board)) // this.updatedBoards.push(board) // if (this.updateTimeout) { // clearTimeout(this.updateTimeout) // this.updateTimeout = undefined // } // // this.updateTimeout = setTimeout(() => { // this.flushUpdateNotifications() // }, this.notificationDelay) // } private logUpdateNotification() { for (const block of this.updatedData.Blocks) { Utils.log(`WSClient flush update block: ${block.id}`) } for (const category of this.updatedData.Categories) { Utils.log(`WSClient flush update category: ${category.id}`) } for (const blockCategories of this.updatedData.BlockCategories) { Utils.log(`WSClient flush update blockCategory: ${blockCategories.blockID} ${blockCategories.categoryID}`) } for (const board of this.updatedData.Boards) { Utils.log(`WSClient flush update board: ${board.id}`) } for (const boardMember of this.updatedData.BoardMembers) { Utils.log(`WSClient flush update boardMember: ${boardMember.userId} ${boardMember.boardId}`) } } private flushUpdateNotifications() { this.logUpdateNotification() for (const handler of this.onChange.Block) { handler(this, this.updatedData.Blocks) } for (const handler of this.onChange.Category) { handler(this, this.updatedData.Categories) } for (const handler of this.onChange.BlockCategory) { handler(this, this.updatedData.BlockCategories) } for (const handler of this.onChange.Board) { handler(this, this.updatedData.Boards) } for (const handler of this.onChange.BoardMember) { handler(this, this.updatedData.BoardMembers) } this.updatedData = { Blocks: [], Categories: [], BlockCategories: [], Boards: [], BoardMembers: [], } } close(): void { if (!this.hasConn()) { return } Utils.log(`WSClient close: ${this.ws?.url}`) // Use this sequence so the onclose method doesn't try to re-open const ws = this.ws this.ws = null this.onChange = {Block: [], Category: [], BlockCategory: [], Board: [], BoardMember: []} this.onReconnect = [] this.onStateChange = [] this.onError = [] // if running in plugin mode, nothing else needs to be done if (this.client) { return } try { ws?.close() } catch { try { (ws as any)?.websocket?.close() } catch { Utils.log('WSClient unable to close the websocket') } } } } const wsClient = new WSClient() export {WSClient} export default wsClient