focalboard/server/services/notify/service.go
Doug Lauder 75bd409ba0
Notifications phase 1 (#1851)
Backend support for subscribing/unsubscribing to blocks, typically cards and boards. Notifies subscribers when changes are made to cards they are subscribed to.
2021-12-10 10:46:37 -05:00

132 lines
3.4 KiB
Go

// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package notify
import (
"sync"
"github.com/mattermost/focalboard/server/model"
"github.com/wiggin77/merror"
"github.com/mattermost/mattermost-server/v6/shared/mlog"
)
type Action string
const (
Add Action = "add"
Update Action = "update"
Delete Action = "delete"
)
type BlockChangeEvent struct {
Action Action
Workspace string
Board *model.Block
Card *model.Block
BlockChanged *model.Block
BlockOld *model.Block
ModifiedByID string
}
type SubscriptionChangeNotifier interface {
BroadcastSubscriptionChange(workspaceID string, subscription *model.Subscription)
}
// Backend provides an interface for sending notifications.
type Backend interface {
Start() error
ShutDown() error
BlockChanged(evt BlockChangeEvent) error
Name() string
}
// Service is a service that sends notifications based on block activity using one or more backends.
type Service struct {
mux sync.RWMutex
backends []Backend
logger *mlog.Logger
}
// New creates a notification service with one or more Backends capable of sending notifications.
func New(logger *mlog.Logger, backends ...Backend) (*Service, error) {
notify := &Service{
backends: make([]Backend, 0, len(backends)),
logger: logger,
}
merr := merror.New()
for _, backend := range backends {
if err := notify.AddBackend(backend); err != nil {
merr.Append(err)
} else {
logger.Info("Initialized notification backend", mlog.String("name", backend.Name()))
}
}
return notify, merr.ErrorOrNil()
}
// AddBackend adds a backend to the list that will be informed of any block changes.
func (s *Service) AddBackend(backend Backend) error {
if err := backend.Start(); err != nil {
return err
}
s.mux.Lock()
defer s.mux.Unlock()
s.backends = append(s.backends, backend)
return nil
}
// Shutdown calls shutdown for all backends.
func (s *Service) Shutdown() error {
s.mux.Lock()
defer s.mux.Unlock()
merr := merror.New()
for _, backend := range s.backends {
if err := backend.ShutDown(); err != nil {
merr.Append(err)
}
}
s.backends = nil
return merr.ErrorOrNil()
}
// BlockChanged should be called whenever a block is added/updated/deleted.
// All backends are informed of the event.
func (s *Service) BlockChanged(evt BlockChangeEvent) {
s.mux.RLock()
defer s.mux.RUnlock()
for _, backend := range s.backends {
if err := backend.BlockChanged(evt); err != nil {
s.logger.Error("Error delivering notification",
mlog.String("backend", backend.Name()),
mlog.String("action", string(evt.Action)),
mlog.String("block_id", evt.BlockChanged.ID),
mlog.Err(err),
)
}
}
}
// BroadcastSubscriptionChange sends a websocket message with details of the changed subscription to all
// connected users in the workspace.
func (s *Service) BroadcastSubscriptionChange(workspaceID string, subscription *model.Subscription) {
s.mux.RLock()
backends := make([]Backend, len(s.backends))
copy(backends, s.backends)
s.mux.RUnlock()
for _, backend := range backends {
if scn, ok := backend.(SubscriptionChangeNotifier); ok {
s.logger.Debug("Delivering subscription change notification",
mlog.String("workspace_id", workspaceID),
mlog.String("block_id", subscription.BlockID),
mlog.String("subscriber_id", subscription.SubscriberID),
)
scn.BroadcastSubscriptionChange(workspaceID, subscription)
}
}
}