diff --git a/mattermost-plugin/server/plugin.go b/mattermost-plugin/server/plugin.go index 53a0975de..a83e8e669 100644 --- a/mattermost-plugin/server/plugin.go +++ b/mattermost-plugin/server/plugin.go @@ -178,6 +178,10 @@ func (p *Plugin) OnDeactivate() error { return p.server.Shutdown() } +func (p *Plugin) OnPluginClusterEvent(_ *plugin.Context, ev mmModel.PluginClusterEvent) { + p.wsPluginAdapter.HandleClusterEvent(ev) +} + // ServeHTTP demonstrates a plugin that handles HTTP requests by greeting the world. func (p *Plugin) ServeHTTP(_ *plugin.Context, w http.ResponseWriter, r *http.Request) { router := p.server.GetRootRouter() diff --git a/server/ws/common.go b/server/ws/common.go new file mode 100644 index 000000000..4f3063414 --- /dev/null +++ b/server/ws/common.go @@ -0,0 +1,20 @@ +package ws + +import ( + "github.com/mattermost/focalboard/server/model" +) + +// UpdateMsg is sent on block updates. +type UpdateMsg struct { + Action string `json:"action"` + Block model.Block `json:"block"` +} + +// WebsocketCommand is an incoming command from the client. +type WebsocketCommand struct { + Action string `json:"action"` + WorkspaceID string `json:"workspaceId"` + Token string `json:"token"` + ReadToken string `json:"readToken"` + BlockIDs []string `json:"blockIds"` +} diff --git a/server/ws/plugin_adapter.go b/server/ws/plugin_adapter.go index 5e1617732..40c527abf 100644 --- a/server/ws/plugin_adapter.go +++ b/server/ws/plugin_adapter.go @@ -285,6 +285,30 @@ func (pa *PluginAdapter) getUserIDsForWorkspace(workspaceID string) []string { return userIDs } +// sendWorkspaceMessageSkipCluster sends a message to all the users +// with a websocket client connected to. +func (pa *PluginAdapter) sendWorkspaceMessageSkipCluster(workspaceID string, payload map[string]interface{}) { + userIDs := pa.getUserIDsForWorkspace(workspaceID) + for _, userID := range userIDs { + pa.api.PublishWebSocketEvent(websocketActionUpdateBlock, payload, &mmModel.WebsocketBroadcast{UserId: userID}) + } +} + +// sendWorkspaceMessage sends and propagates a message that is aimed +// for all the users that are subscribed to a given workspace. +func (pa *PluginAdapter) sendWorkspaceMessage(workspaceID string, payload map[string]interface{}) { + go func() { + clusterMessage := &ClusterMessage{ + WorkspaceID: workspaceID, + Payload: payload, + } + + pa.sendMessageToCluster("websocket_message", clusterMessage) + }() + + pa.sendWorkspaceMessageSkipCluster(workspaceID, payload) +} + func (pa *PluginAdapter) BroadcastBlockChange(workspaceID string, block model.Block) { pa.api.LogInfo("BroadcastingBlockChange", "workspaceID", workspaceID, @@ -296,10 +320,7 @@ func (pa *PluginAdapter) BroadcastBlockChange(workspaceID string, block model.Bl Block: block, } - userIDs := pa.getUserIDsForWorkspace(workspaceID) - for _, userID := range userIDs { - pa.api.PublishWebSocketEvent(websocketActionUpdateBlock, structToMap(message), &mmModel.WebsocketBroadcast{UserId: userID}) - } + pa.sendWorkspaceMessage(workspaceID, structToMap(message)) } func (pa *PluginAdapter) BroadcastBlockDelete(workspaceID, blockID, parentID string) { diff --git a/server/ws/plugin_adapter_cluster.go b/server/ws/plugin_adapter_cluster.go new file mode 100644 index 000000000..69bde6b2f --- /dev/null +++ b/server/ws/plugin_adapter_cluster.go @@ -0,0 +1,50 @@ +package ws + +import ( + "encoding/json" + + mmModel "github.com/mattermost/mattermost-server/v6/model" +) + +type ClusterMessage struct { + WorkspaceID string + Payload map[string]interface{} +} + +func (pa *PluginAdapter) sendMessageToCluster(id string, clusterMessage *ClusterMessage) { + b, err := json.Marshal(clusterMessage) + if err != nil { + pa.api.LogError("couldn't get JSON bytes from cluster message", + "id", id, + "err", err, + ) + return + } + + event := mmModel.PluginClusterEvent{Id: id, Data: b} + opts := mmModel.PluginClusterEventSendOptions{ + SendType: mmModel.PluginClusterEventSendTypeReliable, + } + + if err := pa.api.PublishPluginClusterEvent(event, opts); err != nil { + pa.api.LogError("error publishing cluster event", + "id", id, + "err", err, + ) + } +} + +func (pa *PluginAdapter) HandleClusterEvent(ev mmModel.PluginClusterEvent) { + pa.api.LogDebug("received cluster event", "id", ev.Id) + + var clusterMessage ClusterMessage + if err := json.Unmarshal(ev.Data, &clusterMessage); err != nil { + pa.api.LogError("cannot unmarshal cluster message data", + "id", ev.Id, + "err", err, + ) + return + } + + pa.sendWorkspaceMessageSkipCluster(clusterMessage.WorkspaceID, clusterMessage.Payload) +} diff --git a/server/ws/server.go b/server/ws/server.go index c3fa1896e..398a4ce47 100644 --- a/server/ws/server.go +++ b/server/ws/server.go @@ -64,21 +64,6 @@ type Server struct { logger *mlog.Logger } -// UpdateMsg is sent on block updates. -type UpdateMsg struct { - Action string `json:"action"` - Block model.Block `json:"block"` -} - -// WebsocketCommand is an incoming command from the client. -type WebsocketCommand struct { - Action string `json:"action"` - WorkspaceID string `json:"workspaceId"` - Token string `json:"token"` - ReadToken string `json:"readToken"` - BlockIDs []string `json:"blockIds"` -} - type websocketSession struct { client *wsClient userID string