From 0d0bc7a672d8bfa291a17be8bd2f1785796efb42 Mon Sep 17 00:00:00 2001 From: Chen-I Lim Date: Fri, 6 Nov 2020 10:11:01 -0800 Subject: [PATCH 1/3] Incremental updates on websocket --- server/app/blocks.go | 5 +-- server/ws/websockets.go | 58 +++++++++++++++++++++++---- webapp/src/components/cardDetail.tsx | 13 +++--- webapp/src/octoListener.ts | 36 ++++++++++++++--- webapp/src/octoUtils.tsx | 8 ++++ webapp/src/pages/boardPage.tsx | 20 +++++++-- webapp/src/viewModel/boardTree.ts | 18 ++++++++- webapp/src/viewModel/cardTree.ts | 23 +++++++++-- webapp/src/viewModel/workspaceTree.ts | 34 +++++++++++++--- 9 files changed, 178 insertions(+), 37 deletions(-) diff --git a/server/app/blocks.go b/server/app/blocks.go index 7cdddb1c5..757057d01 100644 --- a/server/app/blocks.go +++ b/server/app/blocks.go @@ -44,11 +44,10 @@ func (a *App) InsertBlocks(blocks []model.Block) error { return err } + a.wsServer.BroadcastBlockChange(block) go a.webhook.NotifyUpdate(block) } - a.wsServer.BroadcastBlockChangeToWebsocketClients(blockIDsToNotify) - return nil } @@ -76,7 +75,7 @@ func (a *App) DeleteBlock(blockID string) error { return err } - a.wsServer.BroadcastBlockChangeToWebsocketClients(blockIDsToNotify) + a.wsServer.BroadcastBlockDelete(blockID, parentID) return nil } diff --git a/server/ws/websockets.go b/server/ws/websockets.go index c68ef4d78..a32029a87 100644 --- a/server/ws/websockets.go +++ b/server/ws/websockets.go @@ -5,9 +5,11 @@ import ( "log" "net/http" "sync" + "time" "github.com/gorilla/mux" "github.com/gorilla/websocket" + "github.com/mattermost/mattermost-octo-tasks/server/model" ) // RegisterRoutes registers routes. @@ -98,10 +100,10 @@ func NewServer() *Server { } } -// WebsocketMsg is sent on block changes. -type WebsocketMsg struct { - Action string `json:"action"` - BlockID string `json:"blockId"` +// UpdateMsg is sent on block updates +type UpdateMsg struct { + Action string `json:"action"` + Block model.Block `json:"block"` } // WebsocketCommand is an incoming command from the client. @@ -166,15 +168,27 @@ func (ws *Server) handleWebSocketOnChange(w http.ResponseWriter, r *http.Request } } -// BroadcastBlockChangeToWebsocketClients broadcasts change to clients. -func (ws *Server) BroadcastBlockChangeToWebsocketClients(blockIDs []string) { +// BroadcastBlockDelete broadcasts delete messages to clients +func (ws *Server) BroadcastBlockDelete(blockID string, parentID string) { + now := time.Now().Unix() + block := model.Block{} + block.ID = blockID + block.ParentID = parentID + block.UpdateAt = now + block.DeleteAt = now + + ws.BroadcastBlockChange(block) +} + +/* +func (ws *Server) BroadcastBlockDelete1(blockIDs []string) { for _, blockID := range blockIDs { listeners := ws.GetListeners(blockID) log.Printf("%d listener(s) for blockID: %s", len(listeners), blockID) if listeners != nil { - message := WebsocketMsg{ - Action: "UPDATE_BLOCK", + message := DeleteMsg{ + Action: "DELETE_BLOCK", BlockID: blockID, } @@ -190,3 +204,31 @@ func (ws *Server) BroadcastBlockChangeToWebsocketClients(blockIDs []string) { } } } +*/ + +// BroadcastBlockChange broadcasts update messages to clients +func (ws *Server) BroadcastBlockChange(block model.Block) { + blockIDsToNotify := []string{block.ID, block.ParentID} + + for _, blockID := range blockIDsToNotify { + listeners := ws.GetListeners(blockID) + log.Printf("%d listener(s) for blockID: %s", len(listeners), blockID) + + if listeners != nil { + message := UpdateMsg{ + Action: "UPDATE_BLOCK", + Block: block, + } + + for _, listener := range listeners { + log.Printf("Broadcast change, blockID: %s, remoteAddr: %s", blockID, listener.RemoteAddr()) + + err := listener.WriteJSON(message) + if err != nil { + log.Printf("broadcast error: %v", err) + listener.Close() + } + } + } + } +} diff --git a/webapp/src/components/cardDetail.tsx b/webapp/src/components/cardDetail.tsx index d52442a51..cd4d05d5f 100644 --- a/webapp/src/components/cardDetail.tsx +++ b/webapp/src/components/cardDetail.tsx @@ -54,13 +54,14 @@ class CardDetail extends React.Component { } componentDidMount() { - this.cardListener = new OctoListener() - this.cardListener.open([this.props.cardId], async (blockId) => { - Utils.log(`cardListener.onChanged: ${blockId}`) - await cardTree.sync() - this.setState({cardTree}) - }) const cardTree = new MutableCardTree(this.props.cardId) + this.cardListener = new OctoListener() + this.cardListener.open([this.props.cardId], async (blocks) => { + Utils.log(`cardListener.onChanged: ${blocks.length}`) + const newCardTree = cardTree.mutableCopy() + newCardTree.incrementalUpdate(blocks) + this.setState({cardTree: newCardTree}) + }) cardTree.sync().then(() => { this.setState({cardTree, title: cardTree.card.title}) setTimeout(() => { diff --git a/webapp/src/octoListener.ts b/webapp/src/octoListener.ts index ecf07dd09..8e8ff136f 100644 --- a/webapp/src/octoListener.ts +++ b/webapp/src/octoListener.ts @@ -1,5 +1,6 @@ // Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. // See LICENSE.txt for license information. +import {IBlock} from './blocks/block' import {Utils} from './utils' // These are outgoing commands to the server @@ -12,8 +13,11 @@ type WSCommand = { type WSMessage = { action: string blockId: string + block: IBlock } +type OnChangeHandler = (blocks: IBlock[]) => void + // // OctoListener calls a handler when a block or any of its children changes // @@ -27,6 +31,10 @@ class OctoListener { private blockIds: string[] = [] private isInitialized = false + private onChange: OnChangeHandler + private updatedBlocks: IBlock[] = [] + private updateTimeout: NodeJS.Timeout + notificationDelay = 200 reopenDelay = 3000 @@ -35,13 +43,15 @@ class OctoListener { Utils.log(`OctoListener serverUrl: ${this.serverUrl}`) } - open(blockIds: string[], onChange: (blockId: string) => void) { + open(blockIds: string[], onChange: OnChangeHandler) { let timeoutId: NodeJS.Timeout if (this.ws) { this.close() } + this.onChange = onChange + const url = new URL(this.serverUrl) const wsServerUrl = `ws://${url.host}${url.pathname}ws/onchange` Utils.log(`OctoListener open: ${wsServerUrl}`) @@ -84,10 +94,7 @@ class OctoListener { if (timeoutId) { clearTimeout(timeoutId) } - timeoutId = setTimeout(() => { - timeoutId = undefined - onChange(message.blockId) - }, this.notificationDelay) + this.queueUpdateNotification(message.block) break default: Utils.logError(`Unexpected action: ${message.action}`) @@ -109,6 +116,7 @@ class OctoListener { const ws = this.ws this.ws = undefined this.blockIds = [] + this.onChange = undefined this.isInitialized = false ws.close() } @@ -151,6 +159,24 @@ class OctoListener { } } } + + private queueUpdateNotification(block: IBlock) { + this.updatedBlocks = this.updatedBlocks.filter((o) => o.id !== block.id) // Remove existing queued update + this.updatedBlocks.push(block) + if (this.updateTimeout) { + clearTimeout(this.updateTimeout) + this.updateTimeout = undefined + } + + this.updateTimeout = setTimeout(() => { + this.flushUpdateNotifications() + }, this.notificationDelay) + } + + private flushUpdateNotifications() { + this.onChange(this.updatedBlocks) + this.updatedBlocks = [] + } } export {OctoListener} diff --git a/webapp/src/octoUtils.tsx b/webapp/src/octoUtils.tsx index 81b0d020d..14c10e596 100644 --- a/webapp/src/octoUtils.tsx +++ b/webapp/src/octoUtils.tsx @@ -80,6 +80,14 @@ class OctoUtils { static hydrateBlocks(blocks: IBlock[]): MutableBlock[] { return blocks.map((block) => this.hydrateBlock(block)) } + + static mergeBlocks(blocks: IBlock[], updatedBlocks: IBlock[]): IBlock[] { + const updatedBlockIds = updatedBlocks.map((o) => o.id) + const newBlocks = blocks.filter((o) => !updatedBlockIds.includes(o.id)) + const updatedAndNotDeletedBlocks = updatedBlocks.filter((o) => o.deleteAt === 0) + newBlocks.push(...updatedAndNotDeletedBlocks) + return newBlocks + } } export {OctoUtils} diff --git a/webapp/src/pages/boardPage.tsx b/webapp/src/pages/boardPage.tsx index 2a32c24a5..38ead3e99 100644 --- a/webapp/src/pages/boardPage.tsx +++ b/webapp/src/pages/boardPage.tsx @@ -10,6 +10,7 @@ import mutator from '../mutator' import {OctoListener} from '../octoListener' import {Utils} from '../utils' import {MutableWorkspaceTree} from '../viewModel/workspaceTree' +import {IBlock} from '../blocks/block' type Props = { setLanguage: (lang: string) => void @@ -151,9 +152,9 @@ export default class BoardPage extends React.Component { const boardIds = workspaceTree.boards.map((o) => o.id) // Listen to boards plus all blocks at root (Empty string for parentId) - this.workspaceListener.open(['', ...boardIds], async (blockId) => { - Utils.log(`workspaceListener.onChanged: ${blockId}`) - this.sync() + this.workspaceListener.open(['', ...boardIds], async (blocks) => { + Utils.log(`workspaceListener.onChanged: ${blocks.length}`) + this.incrementalUpdate(blocks) }) if (boardId) { @@ -179,6 +180,19 @@ export default class BoardPage extends React.Component { } } + private incrementalUpdate(blocks: IBlock[]) { + const {workspaceTree, boardTree, viewId} = this.state + + const newWorkspaceTree = workspaceTree.mutableCopy() + newWorkspaceTree.incrementalUpdate(blocks) + + const newBoardTree = boardTree.mutableCopy() + newBoardTree.incrementalUpdate(blocks) + newBoardTree.setActiveView(viewId) + + this.setState({workspaceTree: newWorkspaceTree, boardTree: newBoardTree}) + } + // IPageController showBoard(boardId: string): void { const {boardTree} = this.state diff --git a/webapp/src/viewModel/boardTree.ts b/webapp/src/viewModel/boardTree.ts index c961694d1..f0902eb1b 100644 --- a/webapp/src/viewModel/boardTree.ts +++ b/webapp/src/viewModel/boardTree.ts @@ -29,6 +29,8 @@ interface BoardTree { getSearchText(): string | undefined orderedCards(): Card[] + + mutableCopy(): MutableBoardTree } class MutableBoardTree implements BoardTree { @@ -41,6 +43,7 @@ class MutableBoardTree implements BoardTree { activeView?: MutableBoardView groupByProperty?: IPropertyTemplate + private rawBlocks: IBlock[] = [] private searchText?: string allCards: MutableCard[] = [] get allBlocks(): IBlock[] { @@ -51,8 +54,13 @@ class MutableBoardTree implements BoardTree { } async sync() { - const blocks = await octoClient.getSubtree(this.boardId) - this.rebuild(OctoUtils.hydrateBlocks(blocks)) + this.rawBlocks = await octoClient.getSubtree(this.boardId) + this.rebuild(OctoUtils.hydrateBlocks(this.rawBlocks)) + } + + incrementalUpdate(updatedBlocks: IBlock[]) { + this.rawBlocks = OctoUtils.mergeBlocks(this.rawBlocks, updatedBlocks) + this.rebuild(OctoUtils.hydrateBlocks(this.rawBlocks)) } private rebuild(blocks: IMutableBlock[]) { @@ -390,6 +398,12 @@ class MutableBoardTree implements BoardTree { return cards } + + mutableCopy(): MutableBoardTree { + const boardTree = new MutableBoardTree(this.boardId) + boardTree.incrementalUpdate(this.rawBlocks) + return boardTree + } } export {MutableBoardTree, BoardTree, Group as BoardTreeGroup} diff --git a/webapp/src/viewModel/cardTree.ts b/webapp/src/viewModel/cardTree.ts index c0b7dace1..98490fc85 100644 --- a/webapp/src/viewModel/cardTree.ts +++ b/webapp/src/viewModel/cardTree.ts @@ -10,19 +10,28 @@ interface CardTree { readonly card: Card readonly comments: readonly IBlock[] readonly contents: readonly IOrderedBlock[] + + mutableCopy(): MutableCardTree } class MutableCardTree implements CardTree { card: Card - comments: IBlock[] - contents: IOrderedBlock[] + comments: IBlock[] = [] + contents: IOrderedBlock[] = [] + + private rawBlocks: IBlock[] = [] constructor(private cardId: string) { } async sync() { - const blocks = await octoClient.getSubtree(this.cardId) - this.rebuild(OctoUtils.hydrateBlocks(blocks)) + this.rawBlocks = await octoClient.getSubtree(this.cardId) + this.rebuild(OctoUtils.hydrateBlocks(this.rawBlocks)) + } + + incrementalUpdate(updatedBlocks: IBlock[]) { + this.rawBlocks = OctoUtils.mergeBlocks(this.rawBlocks, updatedBlocks) + this.rebuild(OctoUtils.hydrateBlocks(this.rawBlocks)) } private rebuild(blocks: IBlock[]) { @@ -35,6 +44,12 @@ class MutableCardTree implements CardTree { const contentBlocks = blocks.filter((block) => block.type === 'text' || block.type === 'image' || block.type === 'divider') as IOrderedBlock[] this.contents = contentBlocks.sort((a, b) => a.order - b.order) } + + mutableCopy(): MutableCardTree { + const cardTree = new MutableCardTree(this.cardId) + cardTree.incrementalUpdate(this.rawBlocks) + return cardTree + } } export {MutableCardTree, CardTree} diff --git a/webapp/src/viewModel/workspaceTree.ts b/webapp/src/viewModel/workspaceTree.ts index cb8f595a4..65b43c0c9 100644 --- a/webapp/src/viewModel/workspaceTree.ts +++ b/webapp/src/viewModel/workspaceTree.ts @@ -9,18 +9,35 @@ import {BoardView} from '../blocks/boardView' interface WorkspaceTree { readonly boards: readonly Board[] readonly views: readonly BoardView[] + + mutableCopy(): MutableWorkspaceTree } class MutableWorkspaceTree { boards: Board[] = [] views: BoardView[] = [] + private rawBoards: IBlock[] = [] + private rawViews: IBlock[] = [] + async sync() { - const boards = await octoClient.getBlocksWithType('board') - const views = await octoClient.getBlocksWithType('view') + this.rawBoards = await octoClient.getBlocksWithType('board') + this.rawViews = await octoClient.getBlocksWithType('view') this.rebuild( - OctoUtils.hydrateBlocks(boards), - OctoUtils.hydrateBlocks(views), + OctoUtils.hydrateBlocks(this.rawBoards), + OctoUtils.hydrateBlocks(this.rawViews), + ) + } + + incrementalUpdate(updatedBlocks: IBlock[]) { + const updatedBoards = updatedBlocks.filter((o) => o.type === 'board') + const updatedViews = updatedBlocks.filter((o) => o.type === 'view') + + this.rawBoards = OctoUtils.mergeBlocks(this.rawBoards, updatedBoards) + this.rawViews = OctoUtils.mergeBlocks(this.rawViews, updatedViews) + this.rebuild( + OctoUtils.hydrateBlocks(this.rawBoards), + OctoUtils.hydrateBlocks(this.rawViews), ) } @@ -28,8 +45,13 @@ class MutableWorkspaceTree { this.boards = boards.filter((block) => block.type === 'board') as Board[] this.views = views.filter((block) => block.type === 'view') as BoardView[] } + + mutableCopy(): MutableWorkspaceTree { + const workspaceTree = new MutableWorkspaceTree() + const rawBlocks = [...this.rawBoards, ...this.rawViews] + workspaceTree.incrementalUpdate(rawBlocks) + return workspaceTree + } } -// type WorkspaceTree = Readonly - export {MutableWorkspaceTree, WorkspaceTree} From 40623db4c5c93aad58f96cccd3935b46e58299dd Mon Sep 17 00:00:00 2001 From: Chen-I Lim Date: Fri, 6 Nov 2020 10:17:33 -0800 Subject: [PATCH 2/3] Sync on reconnect --- webapp/src/components/cardDetail.tsx | 20 ++++++++++++++------ webapp/src/octoListener.ts | 8 +++++--- webapp/src/pages/boardPage.tsx | 14 ++++++++++---- 3 files changed, 29 insertions(+), 13 deletions(-) diff --git a/webapp/src/components/cardDetail.tsx b/webapp/src/components/cardDetail.tsx index cd4d05d5f..05ab1fcd0 100644 --- a/webapp/src/components/cardDetail.tsx +++ b/webapp/src/components/cardDetail.tsx @@ -56,12 +56,20 @@ class CardDetail extends React.Component { componentDidMount() { const cardTree = new MutableCardTree(this.props.cardId) this.cardListener = new OctoListener() - this.cardListener.open([this.props.cardId], async (blocks) => { - Utils.log(`cardListener.onChanged: ${blocks.length}`) - const newCardTree = cardTree.mutableCopy() - newCardTree.incrementalUpdate(blocks) - this.setState({cardTree: newCardTree}) - }) + this.cardListener.open( + [this.props.cardId], + async (blocks) => { + Utils.log(`cardListener.onChanged: ${blocks.length}`) + const newCardTree = cardTree.mutableCopy() + newCardTree.incrementalUpdate(blocks) + this.setState({cardTree: newCardTree, title: cardTree.card.title}) + }, + async () => { + Utils.log(`cardListener.onReconnect`) + const newCardTree = cardTree.mutableCopy() + await newCardTree.sync() + this.setState({cardTree: newCardTree, title: newCardTree.card.title}) + }) cardTree.sync().then(() => { this.setState({cardTree, title: cardTree.card.title}) setTimeout(() => { diff --git a/webapp/src/octoListener.ts b/webapp/src/octoListener.ts index 8e8ff136f..fd425111f 100644 --- a/webapp/src/octoListener.ts +++ b/webapp/src/octoListener.ts @@ -43,7 +43,7 @@ class OctoListener { Utils.log(`OctoListener serverUrl: ${this.serverUrl}`) } - open(blockIds: string[], onChange: OnChangeHandler) { + open(blockIds: string[], onChange: OnChangeHandler, onReconnect: () => void) { let timeoutId: NodeJS.Timeout if (this.ws) { @@ -75,13 +75,14 @@ class OctoListener { const reopenBlockIds = this.isInitialized ? this.blockIds.slice() : blockIds.slice() Utils.logError(`Unexpected close, re-opening with ${reopenBlockIds.length} blocks...`) setTimeout(() => { - this.open(reopenBlockIds, onChange) + this.open(reopenBlockIds, onChange, onReconnect) + onReconnect() }, this.reopenDelay) } } ws.onmessage = (e) => { - Utils.log(`OctoListener websocket onmessage. data: ${e.data}`) + // Utils.log(`OctoListener websocket onmessage. data: ${e.data}`) if (ws !== this.ws) { Utils.log('Ignoring closed ws') return @@ -94,6 +95,7 @@ class OctoListener { if (timeoutId) { clearTimeout(timeoutId) } + Utils.log(`OctoListener update block: ${message.block?.id}`) this.queueUpdateNotification(message.block) break default: diff --git a/webapp/src/pages/boardPage.tsx b/webapp/src/pages/boardPage.tsx index 38ead3e99..a74ae455a 100644 --- a/webapp/src/pages/boardPage.tsx +++ b/webapp/src/pages/boardPage.tsx @@ -152,10 +152,16 @@ export default class BoardPage extends React.Component { const boardIds = workspaceTree.boards.map((o) => o.id) // Listen to boards plus all blocks at root (Empty string for parentId) - this.workspaceListener.open(['', ...boardIds], async (blocks) => { - Utils.log(`workspaceListener.onChanged: ${blocks.length}`) - this.incrementalUpdate(blocks) - }) + this.workspaceListener.open( + ['', ...boardIds], + async (blocks) => { + Utils.log(`workspaceListener.onChanged: ${blocks.length}`) + this.incrementalUpdate(blocks) + }, + () => { + Utils.log(`workspaceListener.onReconnect`) + this.sync() + }) if (boardId) { const boardTree = new MutableBoardTree(boardId) From 4a7bf165c4a11e1a295ee0938ed390f086b79db7 Mon Sep 17 00:00:00 2001 From: Chen-I Lim Date: Fri, 6 Nov 2020 10:20:03 -0800 Subject: [PATCH 3/3] cleanup --- server/ws/websockets.go | 26 -------------------------- 1 file changed, 26 deletions(-) diff --git a/server/ws/websockets.go b/server/ws/websockets.go index a32029a87..a0c665efe 100644 --- a/server/ws/websockets.go +++ b/server/ws/websockets.go @@ -180,32 +180,6 @@ func (ws *Server) BroadcastBlockDelete(blockID string, parentID string) { ws.BroadcastBlockChange(block) } -/* -func (ws *Server) BroadcastBlockDelete1(blockIDs []string) { - for _, blockID := range blockIDs { - listeners := ws.GetListeners(blockID) - log.Printf("%d listener(s) for blockID: %s", len(listeners), blockID) - - if listeners != nil { - message := DeleteMsg{ - Action: "DELETE_BLOCK", - BlockID: blockID, - } - - for _, listener := range listeners { - log.Printf("Broadcast change, blockID: %s, remoteAddr: %s", blockID, listener.RemoteAddr()) - - err := listener.WriteJSON(message) - if err != nil { - log.Printf("broadcast error: %v", err) - listener.Close() - } - } - } - } -} -*/ - // BroadcastBlockChange broadcasts update messages to clients func (ws *Server) BroadcastBlockChange(block model.Block) { blockIDsToNotify := []string{block.ID, block.ParentID}