// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. // See LICENSE.txt for license information. package notify import ( "sync" "github.com/mattermost/focalboard/server/model" "github.com/wiggin77/merror" "github.com/mattermost/mattermost-server/v6/shared/mlog" ) type Action string const ( Add Action = "add" Update Action = "update" Delete Action = "delete" ) type BlockChangeEvent struct { Action Action Workspace string Board *model.Block Card *model.Block BlockChanged *model.Block BlockOld *model.Block ModifiedByID string } type SubscriptionChangeNotifier interface { BroadcastSubscriptionChange(workspaceID string, subscription *model.Subscription) } // Backend provides an interface for sending notifications. type Backend interface { Start() error ShutDown() error BlockChanged(evt BlockChangeEvent) error Name() string } // Service is a service that sends notifications based on block activity using one or more backends. type Service struct { mux sync.RWMutex backends []Backend logger *mlog.Logger } // New creates a notification service with one or more Backends capable of sending notifications. func New(logger *mlog.Logger, backends ...Backend) (*Service, error) { notify := &Service{ backends: make([]Backend, 0, len(backends)), logger: logger, } merr := merror.New() for _, backend := range backends { if err := notify.AddBackend(backend); err != nil { merr.Append(err) } else { logger.Info("Initialized notification backend", mlog.String("name", backend.Name())) } } return notify, merr.ErrorOrNil() } // AddBackend adds a backend to the list that will be informed of any block changes. func (s *Service) AddBackend(backend Backend) error { if err := backend.Start(); err != nil { return err } s.mux.Lock() defer s.mux.Unlock() s.backends = append(s.backends, backend) return nil } // Shutdown calls shutdown for all backends. func (s *Service) Shutdown() error { s.mux.Lock() defer s.mux.Unlock() merr := merror.New() for _, backend := range s.backends { if err := backend.ShutDown(); err != nil { merr.Append(err) } } s.backends = nil return merr.ErrorOrNil() } // BlockChanged should be called whenever a block is added/updated/deleted. // All backends are informed of the event. func (s *Service) BlockChanged(evt BlockChangeEvent) { s.mux.RLock() defer s.mux.RUnlock() for _, backend := range s.backends { if err := backend.BlockChanged(evt); err != nil { s.logger.Error("Error delivering notification", mlog.String("backend", backend.Name()), mlog.String("action", string(evt.Action)), mlog.String("block_id", evt.BlockChanged.ID), mlog.Err(err), ) } } } // BroadcastSubscriptionChange sends a websocket message with details of the changed subscription to all // connected users in the workspace. func (s *Service) BroadcastSubscriptionChange(workspaceID string, subscription *model.Subscription) { s.mux.RLock() backends := make([]Backend, len(s.backends)) copy(backends, s.backends) s.mux.RUnlock() for _, backend := range backends { if scn, ok := backend.(SubscriptionChangeNotifier); ok { s.logger.Debug("Delivering subscription change notification", mlog.String("workspace_id", workspaceID), mlog.String("block_id", subscription.BlockID), mlog.String("subscriber_id", subscription.SubscriberID), ) scn.BroadcastSubscriptionChange(workspaceID, subscription) } } }