photoprism/internal/api/websocket_writer.go

125 lines
2.9 KiB
Go
Raw Normal View History

package api
import (
"strings"
"sync"
"time"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/photoprism/photoprism/internal/acl"
"github.com/photoprism/photoprism/internal/entity"
"github.com/photoprism/photoprism/internal/event"
)
// wsSendMessage sends a message to the WebSocket client.
func wsSendMessage(topic string, data interface{}, ws *websocket.Conn, writeMutex *sync.Mutex) {
if topic == "" || ws == nil || writeMutex == nil {
return
}
writeMutex.Lock()
defer writeMutex.Unlock()
if err := ws.SetWriteDeadline(time.Now().Add(30 * time.Second)); err != nil {
return
} else if err := ws.WriteJSON(gin.H{"event": topic, "data": data}); err != nil {
return
}
}
// wsWriter initializes a WebSocket writer for sending messages.
func wsWriter(ws *websocket.Conn, writeMutex *sync.Mutex, connId string) {
pingTicker := time.NewTicker(15 * time.Second)
// Subscribe to events.
e := event.Subscribe(
"user.*.*.*",
"session.*.*.*",
"log.fatal",
"log.error",
"log.warning",
"log.warn",
"log.info",
"notify.*",
"index.*",
"upload.*",
"import.*",
"config.*",
"count.*",
"photos.*",
"cameras.*",
"lenses.*",
"countries.*",
"albums.*",
"labels.*",
"subjects.*",
"people.*",
"sync.*",
)
defer func() {
pingTicker.Stop()
event.Unsubscribe(e)
_ = ws.Close()
wsAuth.mutex.Lock()
delete(wsAuth.sid, connId)
delete(wsAuth.rid, connId)
delete(wsAuth.user, connId)
wsAuth.mutex.Unlock()
}()
for {
select {
case <-pingTicker.C:
writeMutex.Lock()
if err := ws.SetWriteDeadline(time.Now().Add(30 * time.Second)); err != nil {
writeMutex.Unlock()
return
} else if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
writeMutex.Unlock()
return
}
writeMutex.Unlock()
case msg := <-e.Receiver:
wsAuth.mutex.RLock()
sid := wsAuth.sid[connId] // Session ID.
// rid := wsAuth.rid[connId] // Session RefID.
user := entity.UnknownUser // User.
if hit, ok := wsAuth.user[connId]; ok {
user = hit
}
wsAuth.mutex.RUnlock()
// Split topic into sub-channels.
ev := msg.Topic()
ch := strings.Split(ev, ".")
// Send the message only to authorized recipients.
switch len(ch) {
case 2:
// Send to everyone who is allowed to subscribe.
if res := acl.Resource(ch[0]); acl.Events.AllowAll(res, user.AclRole(), wsSubscribePerms) {
wsSendMessage(ev, msg.Fields, ws, writeMutex)
}
case 4:
ev = strings.Join(ch[2:4], ".")
if acl.ChannelUser.Equal(ch[0]) && ch[1] == user.UID() || acl.Events.AllowAll(acl.Resource(ch[2]), user.AclRole(), wsSubscribePerms) {
// Send to matching user uid.
wsSendMessage(ev, msg.Fields, ws, writeMutex)
} else if acl.ChannelSession.Equal(ch[0]) && ch[1] == sid {
// Send to matching session id.
wsSendMessage(ev, msg.Fields, ws, writeMutex)
}
}
}
}
}