Merge pull request #22 from mattermost/incremental-updates
Incremental updates
This commit is contained in:
commit
1acdeafc83
9 changed files with 173 additions and 42 deletions
|
@ -44,11 +44,10 @@ func (a *App) InsertBlocks(blocks []model.Block) error {
|
|||
return err
|
||||
}
|
||||
|
||||
a.wsServer.BroadcastBlockChange(block)
|
||||
go a.webhook.NotifyUpdate(block)
|
||||
}
|
||||
|
||||
a.wsServer.BroadcastBlockChangeToWebsocketClients(blockIDsToNotify)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -76,7 +75,7 @@ func (a *App) DeleteBlock(blockID string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
a.wsServer.BroadcastBlockChangeToWebsocketClients(blockIDsToNotify)
|
||||
a.wsServer.BroadcastBlockDelete(blockID, parentID)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -5,9 +5,11 @@ import (
|
|||
"log"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/mattermost/mattermost-octo-tasks/server/model"
|
||||
)
|
||||
|
||||
// RegisterRoutes registers routes.
|
||||
|
@ -98,10 +100,10 @@ func NewServer() *Server {
|
|||
}
|
||||
}
|
||||
|
||||
// WebsocketMsg is sent on block changes.
|
||||
type WebsocketMsg struct {
|
||||
Action string `json:"action"`
|
||||
BlockID string `json:"blockId"`
|
||||
// UpdateMsg is sent on block updates
|
||||
type UpdateMsg struct {
|
||||
Action string `json:"action"`
|
||||
Block model.Block `json:"block"`
|
||||
}
|
||||
|
||||
// WebsocketCommand is an incoming command from the client.
|
||||
|
@ -166,16 +168,30 @@ func (ws *Server) handleWebSocketOnChange(w http.ResponseWriter, r *http.Request
|
|||
}
|
||||
}
|
||||
|
||||
// BroadcastBlockChangeToWebsocketClients broadcasts change to clients.
|
||||
func (ws *Server) BroadcastBlockChangeToWebsocketClients(blockIDs []string) {
|
||||
for _, blockID := range blockIDs {
|
||||
// BroadcastBlockDelete broadcasts delete messages to clients
|
||||
func (ws *Server) BroadcastBlockDelete(blockID string, parentID string) {
|
||||
now := time.Now().Unix()
|
||||
block := model.Block{}
|
||||
block.ID = blockID
|
||||
block.ParentID = parentID
|
||||
block.UpdateAt = now
|
||||
block.DeleteAt = now
|
||||
|
||||
ws.BroadcastBlockChange(block)
|
||||
}
|
||||
|
||||
// BroadcastBlockChange broadcasts update messages to clients
|
||||
func (ws *Server) BroadcastBlockChange(block model.Block) {
|
||||
blockIDsToNotify := []string{block.ID, block.ParentID}
|
||||
|
||||
for _, blockID := range blockIDsToNotify {
|
||||
listeners := ws.GetListeners(blockID)
|
||||
log.Printf("%d listener(s) for blockID: %s", len(listeners), blockID)
|
||||
|
||||
if listeners != nil {
|
||||
message := WebsocketMsg{
|
||||
Action: "UPDATE_BLOCK",
|
||||
BlockID: blockID,
|
||||
message := UpdateMsg{
|
||||
Action: "UPDATE_BLOCK",
|
||||
Block: block,
|
||||
}
|
||||
|
||||
for _, listener := range listeners {
|
||||
|
|
|
@ -54,13 +54,22 @@ class CardDetail extends React.Component<Props, State> {
|
|||
}
|
||||
|
||||
componentDidMount() {
|
||||
this.cardListener = new OctoListener()
|
||||
this.cardListener.open([this.props.cardId], async (blockId) => {
|
||||
Utils.log(`cardListener.onChanged: ${blockId}`)
|
||||
await cardTree.sync()
|
||||
this.setState({cardTree})
|
||||
})
|
||||
const cardTree = new MutableCardTree(this.props.cardId)
|
||||
this.cardListener = new OctoListener()
|
||||
this.cardListener.open(
|
||||
[this.props.cardId],
|
||||
async (blocks) => {
|
||||
Utils.log(`cardListener.onChanged: ${blocks.length}`)
|
||||
const newCardTree = cardTree.mutableCopy()
|
||||
newCardTree.incrementalUpdate(blocks)
|
||||
this.setState({cardTree: newCardTree, title: cardTree.card.title})
|
||||
},
|
||||
async () => {
|
||||
Utils.log(`cardListener.onReconnect`)
|
||||
const newCardTree = cardTree.mutableCopy()
|
||||
await newCardTree.sync()
|
||||
this.setState({cardTree: newCardTree, title: newCardTree.card.title})
|
||||
})
|
||||
cardTree.sync().then(() => {
|
||||
this.setState({cardTree, title: cardTree.card.title})
|
||||
setTimeout(() => {
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
|
||||
// See LICENSE.txt for license information.
|
||||
import {IBlock} from './blocks/block'
|
||||
import {Utils} from './utils'
|
||||
|
||||
// These are outgoing commands to the server
|
||||
|
@ -12,8 +13,11 @@ type WSCommand = {
|
|||
type WSMessage = {
|
||||
action: string
|
||||
blockId: string
|
||||
block: IBlock
|
||||
}
|
||||
|
||||
type OnChangeHandler = (blocks: IBlock[]) => void
|
||||
|
||||
//
|
||||
// OctoListener calls a handler when a block or any of its children changes
|
||||
//
|
||||
|
@ -27,6 +31,10 @@ class OctoListener {
|
|||
private blockIds: string[] = []
|
||||
private isInitialized = false
|
||||
|
||||
private onChange: OnChangeHandler
|
||||
private updatedBlocks: IBlock[] = []
|
||||
private updateTimeout: NodeJS.Timeout
|
||||
|
||||
notificationDelay = 200
|
||||
reopenDelay = 3000
|
||||
|
||||
|
@ -35,13 +43,15 @@ class OctoListener {
|
|||
Utils.log(`OctoListener serverUrl: ${this.serverUrl}`)
|
||||
}
|
||||
|
||||
open(blockIds: string[], onChange: (blockId: string) => void) {
|
||||
open(blockIds: string[], onChange: OnChangeHandler, onReconnect: () => void) {
|
||||
let timeoutId: NodeJS.Timeout
|
||||
|
||||
if (this.ws) {
|
||||
this.close()
|
||||
}
|
||||
|
||||
this.onChange = onChange
|
||||
|
||||
const url = new URL(this.serverUrl)
|
||||
const wsServerUrl = `ws://${url.host}${url.pathname}ws/onchange`
|
||||
Utils.log(`OctoListener open: ${wsServerUrl}`)
|
||||
|
@ -65,13 +75,14 @@ class OctoListener {
|
|||
const reopenBlockIds = this.isInitialized ? this.blockIds.slice() : blockIds.slice()
|
||||
Utils.logError(`Unexpected close, re-opening with ${reopenBlockIds.length} blocks...`)
|
||||
setTimeout(() => {
|
||||
this.open(reopenBlockIds, onChange)
|
||||
this.open(reopenBlockIds, onChange, onReconnect)
|
||||
onReconnect()
|
||||
}, this.reopenDelay)
|
||||
}
|
||||
}
|
||||
|
||||
ws.onmessage = (e) => {
|
||||
Utils.log(`OctoListener websocket onmessage. data: ${e.data}`)
|
||||
// Utils.log(`OctoListener websocket onmessage. data: ${e.data}`)
|
||||
if (ws !== this.ws) {
|
||||
Utils.log('Ignoring closed ws')
|
||||
return
|
||||
|
@ -84,10 +95,8 @@ class OctoListener {
|
|||
if (timeoutId) {
|
||||
clearTimeout(timeoutId)
|
||||
}
|
||||
timeoutId = setTimeout(() => {
|
||||
timeoutId = undefined
|
||||
onChange(message.blockId)
|
||||
}, this.notificationDelay)
|
||||
Utils.log(`OctoListener update block: ${message.block?.id}`)
|
||||
this.queueUpdateNotification(message.block)
|
||||
break
|
||||
default:
|
||||
Utils.logError(`Unexpected action: ${message.action}`)
|
||||
|
@ -109,6 +118,7 @@ class OctoListener {
|
|||
const ws = this.ws
|
||||
this.ws = undefined
|
||||
this.blockIds = []
|
||||
this.onChange = undefined
|
||||
this.isInitialized = false
|
||||
ws.close()
|
||||
}
|
||||
|
@ -151,6 +161,24 @@ class OctoListener {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
private queueUpdateNotification(block: IBlock) {
|
||||
this.updatedBlocks = this.updatedBlocks.filter((o) => o.id !== block.id) // Remove existing queued update
|
||||
this.updatedBlocks.push(block)
|
||||
if (this.updateTimeout) {
|
||||
clearTimeout(this.updateTimeout)
|
||||
this.updateTimeout = undefined
|
||||
}
|
||||
|
||||
this.updateTimeout = setTimeout(() => {
|
||||
this.flushUpdateNotifications()
|
||||
}, this.notificationDelay)
|
||||
}
|
||||
|
||||
private flushUpdateNotifications() {
|
||||
this.onChange(this.updatedBlocks)
|
||||
this.updatedBlocks = []
|
||||
}
|
||||
}
|
||||
|
||||
export {OctoListener}
|
||||
|
|
|
@ -80,6 +80,14 @@ class OctoUtils {
|
|||
static hydrateBlocks(blocks: IBlock[]): MutableBlock[] {
|
||||
return blocks.map((block) => this.hydrateBlock(block))
|
||||
}
|
||||
|
||||
static mergeBlocks(blocks: IBlock[], updatedBlocks: IBlock[]): IBlock[] {
|
||||
const updatedBlockIds = updatedBlocks.map((o) => o.id)
|
||||
const newBlocks = blocks.filter((o) => !updatedBlockIds.includes(o.id))
|
||||
const updatedAndNotDeletedBlocks = updatedBlocks.filter((o) => o.deleteAt === 0)
|
||||
newBlocks.push(...updatedAndNotDeletedBlocks)
|
||||
return newBlocks
|
||||
}
|
||||
}
|
||||
|
||||
export {OctoUtils}
|
||||
|
|
|
@ -10,6 +10,7 @@ import mutator from '../mutator'
|
|||
import {OctoListener} from '../octoListener'
|
||||
import {Utils} from '../utils'
|
||||
import {MutableWorkspaceTree} from '../viewModel/workspaceTree'
|
||||
import {IBlock} from '../blocks/block'
|
||||
|
||||
type Props = {
|
||||
setLanguage: (lang: string) => void
|
||||
|
@ -151,10 +152,16 @@ export default class BoardPage extends React.Component<Props, State> {
|
|||
const boardIds = workspaceTree.boards.map((o) => o.id)
|
||||
|
||||
// Listen to boards plus all blocks at root (Empty string for parentId)
|
||||
this.workspaceListener.open(['', ...boardIds], async (blockId) => {
|
||||
Utils.log(`workspaceListener.onChanged: ${blockId}`)
|
||||
this.sync()
|
||||
})
|
||||
this.workspaceListener.open(
|
||||
['', ...boardIds],
|
||||
async (blocks) => {
|
||||
Utils.log(`workspaceListener.onChanged: ${blocks.length}`)
|
||||
this.incrementalUpdate(blocks)
|
||||
},
|
||||
() => {
|
||||
Utils.log(`workspaceListener.onReconnect`)
|
||||
this.sync()
|
||||
})
|
||||
|
||||
if (boardId) {
|
||||
const boardTree = new MutableBoardTree(boardId)
|
||||
|
@ -179,6 +186,19 @@ export default class BoardPage extends React.Component<Props, State> {
|
|||
}
|
||||
}
|
||||
|
||||
private incrementalUpdate(blocks: IBlock[]) {
|
||||
const {workspaceTree, boardTree, viewId} = this.state
|
||||
|
||||
const newWorkspaceTree = workspaceTree.mutableCopy()
|
||||
newWorkspaceTree.incrementalUpdate(blocks)
|
||||
|
||||
const newBoardTree = boardTree.mutableCopy()
|
||||
newBoardTree.incrementalUpdate(blocks)
|
||||
newBoardTree.setActiveView(viewId)
|
||||
|
||||
this.setState({workspaceTree: newWorkspaceTree, boardTree: newBoardTree})
|
||||
}
|
||||
|
||||
// IPageController
|
||||
showBoard(boardId: string): void {
|
||||
const {boardTree} = this.state
|
||||
|
|
|
@ -29,6 +29,8 @@ interface BoardTree {
|
|||
|
||||
getSearchText(): string | undefined
|
||||
orderedCards(): Card[]
|
||||
|
||||
mutableCopy(): MutableBoardTree
|
||||
}
|
||||
|
||||
class MutableBoardTree implements BoardTree {
|
||||
|
@ -41,6 +43,7 @@ class MutableBoardTree implements BoardTree {
|
|||
activeView?: MutableBoardView
|
||||
groupByProperty?: IPropertyTemplate
|
||||
|
||||
private rawBlocks: IBlock[] = []
|
||||
private searchText?: string
|
||||
allCards: MutableCard[] = []
|
||||
get allBlocks(): IBlock[] {
|
||||
|
@ -51,8 +54,13 @@ class MutableBoardTree implements BoardTree {
|
|||
}
|
||||
|
||||
async sync() {
|
||||
const blocks = await octoClient.getSubtree(this.boardId)
|
||||
this.rebuild(OctoUtils.hydrateBlocks(blocks))
|
||||
this.rawBlocks = await octoClient.getSubtree(this.boardId)
|
||||
this.rebuild(OctoUtils.hydrateBlocks(this.rawBlocks))
|
||||
}
|
||||
|
||||
incrementalUpdate(updatedBlocks: IBlock[]) {
|
||||
this.rawBlocks = OctoUtils.mergeBlocks(this.rawBlocks, updatedBlocks)
|
||||
this.rebuild(OctoUtils.hydrateBlocks(this.rawBlocks))
|
||||
}
|
||||
|
||||
private rebuild(blocks: IMutableBlock[]) {
|
||||
|
@ -390,6 +398,12 @@ class MutableBoardTree implements BoardTree {
|
|||
|
||||
return cards
|
||||
}
|
||||
|
||||
mutableCopy(): MutableBoardTree {
|
||||
const boardTree = new MutableBoardTree(this.boardId)
|
||||
boardTree.incrementalUpdate(this.rawBlocks)
|
||||
return boardTree
|
||||
}
|
||||
}
|
||||
|
||||
export {MutableBoardTree, BoardTree, Group as BoardTreeGroup}
|
||||
|
|
|
@ -10,19 +10,28 @@ interface CardTree {
|
|||
readonly card: Card
|
||||
readonly comments: readonly IBlock[]
|
||||
readonly contents: readonly IOrderedBlock[]
|
||||
|
||||
mutableCopy(): MutableCardTree
|
||||
}
|
||||
|
||||
class MutableCardTree implements CardTree {
|
||||
card: Card
|
||||
comments: IBlock[]
|
||||
contents: IOrderedBlock[]
|
||||
comments: IBlock[] = []
|
||||
contents: IOrderedBlock[] = []
|
||||
|
||||
private rawBlocks: IBlock[] = []
|
||||
|
||||
constructor(private cardId: string) {
|
||||
}
|
||||
|
||||
async sync() {
|
||||
const blocks = await octoClient.getSubtree(this.cardId)
|
||||
this.rebuild(OctoUtils.hydrateBlocks(blocks))
|
||||
this.rawBlocks = await octoClient.getSubtree(this.cardId)
|
||||
this.rebuild(OctoUtils.hydrateBlocks(this.rawBlocks))
|
||||
}
|
||||
|
||||
incrementalUpdate(updatedBlocks: IBlock[]) {
|
||||
this.rawBlocks = OctoUtils.mergeBlocks(this.rawBlocks, updatedBlocks)
|
||||
this.rebuild(OctoUtils.hydrateBlocks(this.rawBlocks))
|
||||
}
|
||||
|
||||
private rebuild(blocks: IBlock[]) {
|
||||
|
@ -35,6 +44,12 @@ class MutableCardTree implements CardTree {
|
|||
const contentBlocks = blocks.filter((block) => block.type === 'text' || block.type === 'image' || block.type === 'divider') as IOrderedBlock[]
|
||||
this.contents = contentBlocks.sort((a, b) => a.order - b.order)
|
||||
}
|
||||
|
||||
mutableCopy(): MutableCardTree {
|
||||
const cardTree = new MutableCardTree(this.cardId)
|
||||
cardTree.incrementalUpdate(this.rawBlocks)
|
||||
return cardTree
|
||||
}
|
||||
}
|
||||
|
||||
export {MutableCardTree, CardTree}
|
||||
|
|
|
@ -9,18 +9,35 @@ import {BoardView} from '../blocks/boardView'
|
|||
interface WorkspaceTree {
|
||||
readonly boards: readonly Board[]
|
||||
readonly views: readonly BoardView[]
|
||||
|
||||
mutableCopy(): MutableWorkspaceTree
|
||||
}
|
||||
|
||||
class MutableWorkspaceTree {
|
||||
boards: Board[] = []
|
||||
views: BoardView[] = []
|
||||
|
||||
private rawBoards: IBlock[] = []
|
||||
private rawViews: IBlock[] = []
|
||||
|
||||
async sync() {
|
||||
const boards = await octoClient.getBlocksWithType('board')
|
||||
const views = await octoClient.getBlocksWithType('view')
|
||||
this.rawBoards = await octoClient.getBlocksWithType('board')
|
||||
this.rawViews = await octoClient.getBlocksWithType('view')
|
||||
this.rebuild(
|
||||
OctoUtils.hydrateBlocks(boards),
|
||||
OctoUtils.hydrateBlocks(views),
|
||||
OctoUtils.hydrateBlocks(this.rawBoards),
|
||||
OctoUtils.hydrateBlocks(this.rawViews),
|
||||
)
|
||||
}
|
||||
|
||||
incrementalUpdate(updatedBlocks: IBlock[]) {
|
||||
const updatedBoards = updatedBlocks.filter((o) => o.type === 'board')
|
||||
const updatedViews = updatedBlocks.filter((o) => o.type === 'view')
|
||||
|
||||
this.rawBoards = OctoUtils.mergeBlocks(this.rawBoards, updatedBoards)
|
||||
this.rawViews = OctoUtils.mergeBlocks(this.rawViews, updatedViews)
|
||||
this.rebuild(
|
||||
OctoUtils.hydrateBlocks(this.rawBoards),
|
||||
OctoUtils.hydrateBlocks(this.rawViews),
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -28,8 +45,13 @@ class MutableWorkspaceTree {
|
|||
this.boards = boards.filter((block) => block.type === 'board') as Board[]
|
||||
this.views = views.filter((block) => block.type === 'view') as BoardView[]
|
||||
}
|
||||
|
||||
mutableCopy(): MutableWorkspaceTree {
|
||||
const workspaceTree = new MutableWorkspaceTree()
|
||||
const rawBlocks = [...this.rawBoards, ...this.rawViews]
|
||||
workspaceTree.incrementalUpdate(rawBlocks)
|
||||
return workspaceTree
|
||||
}
|
||||
}
|
||||
|
||||
// type WorkspaceTree = Readonly<MutableWorkspaceTree>
|
||||
|
||||
export {MutableWorkspaceTree, WorkspaceTree}
|
||||
|
|
Loading…
Reference in a new issue