130 lines
2.9 KiB
Go
130 lines
2.9 KiB
Go
|
package utils
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"sync/atomic"
|
||
|
"time"
|
||
|
|
||
|
"github.com/mattermost/mattermost-server/v6/shared/mlog"
|
||
|
)
|
||
|
|
||
|
// CallbackFunc is a func that can enqueued in the callback queue and will be
|
||
|
// called when dequeued.
|
||
|
type CallbackFunc func() error
|
||
|
|
||
|
// CallbackQueue provides a simple thread pool for processing callbacks. Callbacks will
|
||
|
// be executed in the order in which they are enqueued, but no guarantees are provided
|
||
|
// regarding the order in which they finish (unless poolSize == 1).
|
||
|
type CallbackQueue struct {
|
||
|
name string
|
||
|
poolSize int
|
||
|
|
||
|
queue chan CallbackFunc
|
||
|
done chan struct{}
|
||
|
alive chan int
|
||
|
|
||
|
idone uint32
|
||
|
|
||
|
logger *mlog.Logger
|
||
|
}
|
||
|
|
||
|
// NewCallbackQueue creates a new CallbackQueue and starts a thread pool to service it.
|
||
|
func NewCallbackQueue(name string, queueSize int, poolSize int, logger *mlog.Logger) *CallbackQueue {
|
||
|
cn := &CallbackQueue{
|
||
|
name: name,
|
||
|
poolSize: poolSize,
|
||
|
queue: make(chan CallbackFunc, queueSize),
|
||
|
done: make(chan struct{}),
|
||
|
alive: make(chan int, poolSize),
|
||
|
logger: logger,
|
||
|
}
|
||
|
|
||
|
for i := 0; i < poolSize; i++ {
|
||
|
go cn.loop(i)
|
||
|
}
|
||
|
|
||
|
return cn
|
||
|
}
|
||
|
|
||
|
// Shutdown stops accepting enqueues and exits all pool threads. This method waits
|
||
|
// as long as the context allows for the threads to exit.
|
||
|
// Returns true if the pool exited, false on timeout.
|
||
|
func (cn *CallbackQueue) Shutdown(context context.Context) bool {
|
||
|
if !atomic.CompareAndSwapUint32(&cn.idone, 0, 1) {
|
||
|
// already shutdown
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
// signal threads to exit
|
||
|
close(cn.done)
|
||
|
|
||
|
// wait for the threads to exit or timeout
|
||
|
count := 0
|
||
|
for count < cn.poolSize {
|
||
|
select {
|
||
|
case <-cn.alive:
|
||
|
count++
|
||
|
case <-context.Done():
|
||
|
return false
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// try to drain any remaining callbacks
|
||
|
for {
|
||
|
select {
|
||
|
case f := <-cn.queue:
|
||
|
cn.exec(f)
|
||
|
case <-context.Done():
|
||
|
return false
|
||
|
default:
|
||
|
return true
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Enqueue adds a callback to the queue.
|
||
|
func (cn *CallbackQueue) Enqueue(f CallbackFunc) {
|
||
|
if atomic.LoadUint32(&cn.idone) != 0 {
|
||
|
cn.logger.Debug("CallbackQueue skipping enqueue, notifier is shutdown", mlog.String("name", cn.name))
|
||
|
return
|
||
|
}
|
||
|
|
||
|
select {
|
||
|
case cn.queue <- f:
|
||
|
default:
|
||
|
start := time.Now()
|
||
|
cn.queue <- f
|
||
|
dur := time.Since(start)
|
||
|
cn.logger.Warn("CallbackQueue queue backlog", mlog.String("name", cn.name), mlog.Duration("wait_time", dur))
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (cn *CallbackQueue) loop(id int) {
|
||
|
defer func() {
|
||
|
cn.logger.Trace("CallbackQueue thread exited", mlog.String("name", cn.name), mlog.Int("id", id))
|
||
|
cn.alive <- id
|
||
|
}()
|
||
|
|
||
|
for {
|
||
|
select {
|
||
|
case f := <-cn.queue:
|
||
|
cn.exec(f)
|
||
|
case <-cn.done:
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (cn *CallbackQueue) exec(f CallbackFunc) {
|
||
|
// don't let a panic in the callback exit the thread.
|
||
|
defer func() {
|
||
|
if r := recover(); r != nil {
|
||
|
cn.logger.Error("CallbackQueue callback panic", mlog.String("name", cn.name), mlog.Any("panic", r))
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
if err := f(); err != nil {
|
||
|
cn.logger.Error("CallbackQueue callback error", mlog.String("name", cn.name), mlog.Err(err))
|
||
|
}
|
||
|
}
|