From 968cd71f3422f36be351b1488036edfe25503f21 Mon Sep 17 00:00:00 2001 From: Michael Mayer Date: Tue, 26 May 2020 15:15:14 +0200 Subject: [PATCH] Backend: Add groom worker and test stubs #154 Signed-off-by: Michael Mayer --- internal/api/photo.go | 4 +-- internal/config/config.go | 7 ++--- internal/mutex/mutex.go | 14 +++++++--- internal/photoprism/convert.go | 6 ++--- internal/photoprism/import.go | 8 +++--- internal/photoprism/index.go | 8 +++--- internal/photoprism/purge.go | 14 +++++----- internal/photoprism/resample.go | 6 ++--- internal/video/video.go | 5 +--- internal/video/video_test.go | 13 ++++++++++ internal/workers/groom.go | 42 ++++++++++++++++++++++++++++++ internal/workers/groom_test.go | 31 ++++++++++++++++++++++ internal/workers/share.go | 43 +++++++++++++++++-------------- internal/workers/share_test.go | 16 ++++++++++++ internal/workers/sync.go | 37 +++++++++++++++----------- internal/workers/sync_download.go | 34 ++++++++++++------------ internal/workers/sync_refresh.go | 10 +++---- internal/workers/sync_test.go | 16 ++++++++++++ internal/workers/sync_upload.go | 14 +++++----- internal/workers/workers.go | 30 +++++++++++++++------ internal/workers/workers_test.go | 22 ++++++++++++++++ 21 files changed, 273 insertions(+), 107 deletions(-) create mode 100644 internal/video/video_test.go create mode 100644 internal/workers/groom.go create mode 100644 internal/workers/groom_test.go create mode 100644 internal/workers/share_test.go create mode 100644 internal/workers/sync_test.go create mode 100644 internal/workers/workers_test.go diff --git a/internal/api/photo.go b/internal/api/photo.go index 952a7fd0b..f122e9d8e 100644 --- a/internal/api/photo.go +++ b/internal/api/photo.go @@ -193,7 +193,7 @@ func LikePhoto(router *gin.RouterGroup, conf *config.Config) { return } - if err := m.SetFavorite(true); err != nil{ + if err := m.SetFavorite(true); err != nil { log.Errorf("photo: %s", err.Error()) c.AbortWithStatusJSON(http.StatusInternalServerError, ErrSaveFailed) return @@ -226,7 +226,7 @@ func DislikePhoto(router *gin.RouterGroup, conf *config.Config) { return } - if err := m.SetFavorite(false); err != nil{ + if err := m.SetFavorite(false); err != nil { log.Errorf("photo: %s", err.Error()) c.AbortWithStatusJSON(http.StatusInternalServerError, ErrSaveFailed) return diff --git a/internal/config/config.go b/internal/config/config.go index 9cc94d2ab..c27a3f272 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -201,9 +201,10 @@ func (c *Config) Cache() *gc.Cache { // Shutdown services and workers. func (c *Config) Shutdown() { - mutex.Worker.Cancel() - mutex.Share.Cancel() - mutex.Sync.Cancel() + mutex.MainWorker.Cancel() + mutex.ShareWorker.Cancel() + mutex.SyncWorker.Cancel() + mutex.GroomWorker.Cancel() if err := c.CloseDb(); err != nil { log.Errorf("could not close database connection: %s", err) diff --git a/internal/mutex/mutex.go b/internal/mutex/mutex.go index 363b22f08..ebea1e7a2 100644 --- a/internal/mutex/mutex.go +++ b/internal/mutex/mutex.go @@ -5,8 +5,14 @@ import ( ) var ( - Db = sync.Mutex{} - Worker = Busy{} - Sync = Busy{} - Share = Busy{} + Db = sync.Mutex{} + MainWorker = Busy{} + SyncWorker = Busy{} + ShareWorker = Busy{} + GroomWorker = Busy{} ) + +// WorkersBusy returns true if any worker is busy. +func WorkersBusy() bool { + return MainWorker.Busy() || SyncWorker.Busy() || ShareWorker.Busy() || GroomWorker.Busy() +} diff --git a/internal/photoprism/convert.go b/internal/photoprism/convert.go index e0c02d55b..36017912b 100644 --- a/internal/photoprism/convert.go +++ b/internal/photoprism/convert.go @@ -32,11 +32,11 @@ func NewConvert(conf *config.Config) *Convert { // Start converts all files in a directory to JPEG if possible. func (c *Convert) Start(path string) error { - if err := mutex.Worker.Start(); err != nil { + if err := mutex.MainWorker.Start(); err != nil { return err } - defer mutex.Worker.Stop() + defer mutex.MainWorker.Stop() jobs := make(chan ConvertJob) @@ -70,7 +70,7 @@ func (c *Convert) Start(path string) error { } }() - if mutex.Worker.Canceled() { + if mutex.MainWorker.Canceled() { return errors.New("convert: canceled") } diff --git a/internal/photoprism/import.go b/internal/photoprism/import.go index 482e19b90..592f4d9d3 100644 --- a/internal/photoprism/import.go +++ b/internal/photoprism/import.go @@ -58,12 +58,12 @@ func (imp *Import) Start(opt ImportOptions) map[string]bool { return done } - if err := mutex.Worker.Start(); err != nil { + if err := mutex.MainWorker.Start(); err != nil { event.Error(fmt.Sprintf("import: %s", err.Error())) return done } - defer mutex.Worker.Stop() + defer mutex.MainWorker.Stop() if err := ind.tensorFlow.Init(); err != nil { log.Errorf("import: %s", err.Error()) @@ -102,7 +102,7 @@ func (imp *Import) Start(opt ImportOptions) map[string]bool { } }() - if mutex.Worker.Canceled() { + if mutex.MainWorker.Canceled() { return errors.New("import canceled") } @@ -220,7 +220,7 @@ func (imp *Import) Start(opt ImportOptions) map[string]bool { // Cancel stops the current import operation. func (imp *Import) Cancel() { - mutex.Worker.Cancel() + mutex.MainWorker.Cancel() } // DestinationFilename returns the destination filename of a MediaFile to be imported. diff --git a/internal/photoprism/index.go b/internal/photoprism/index.go index 7325b0bec..e2c6aa00d 100644 --- a/internal/photoprism/index.go +++ b/internal/photoprism/index.go @@ -54,7 +54,7 @@ func (ind *Index) thumbPath() string { // Cancel stops the current indexing operation. func (ind *Index) Cancel() { - mutex.Worker.Cancel() + mutex.MainWorker.Cancel() } // Start indexes media files in the originals directory. @@ -68,12 +68,12 @@ func (ind *Index) Start(opt IndexOptions) map[string]bool { return done } - if err := mutex.Worker.Start(); err != nil { + if err := mutex.MainWorker.Start(); err != nil { event.Error(fmt.Sprintf("index: %s", err.Error())) return done } - defer mutex.Worker.Stop() + defer mutex.MainWorker.Stop() if err := ind.tensorFlow.Init(); err != nil { log.Errorf("index: %s", err.Error()) @@ -112,7 +112,7 @@ func (ind *Index) Start(opt IndexOptions) map[string]bool { } }() - if mutex.Worker.Canceled() { + if mutex.MainWorker.Canceled() { return errors.New("indexing canceled") } diff --git a/internal/photoprism/purge.go b/internal/photoprism/purge.go index 17d7acfd9..621ad7c50 100644 --- a/internal/photoprism/purge.go +++ b/internal/photoprism/purge.go @@ -55,14 +55,14 @@ func (prg *Purge) Start(opt PurgeOptions) (purgedFiles map[string]bool, purgedPh purgedPhotos = make(map[string]bool) originalsPath := prg.originalsPath() - if err := mutex.Worker.Start(); err != nil { + if err := mutex.MainWorker.Start(); err != nil { err = fmt.Errorf("purge: %s", err.Error()) event.Error(err.Error()) return purgedFiles, purgedPhotos, err } defer func() { - mutex.Worker.Stop() + mutex.MainWorker.Stop() runtime.GC() }() @@ -81,7 +81,7 @@ func (prg *Purge) Start(opt PurgeOptions) (purgedFiles map[string]bool, purgedPh } for _, file := range files { - if mutex.Worker.Canceled() { + if mutex.MainWorker.Canceled() { return purgedFiles, purgedPhotos, errors.New("purge canceled") } @@ -107,7 +107,7 @@ func (prg *Purge) Start(opt PurgeOptions) (purgedFiles map[string]bool, purgedPh } } - if mutex.Worker.Canceled() { + if mutex.MainWorker.Canceled() { return purgedFiles, purgedPhotos, errors.New("purge canceled") } @@ -131,7 +131,7 @@ func (prg *Purge) Start(opt PurgeOptions) (purgedFiles map[string]bool, purgedPh } for _, photo := range photos { - if mutex.Worker.Canceled() { + if mutex.MainWorker.Canceled() { return purgedFiles, purgedPhotos, errors.New("purge canceled") } @@ -158,7 +158,7 @@ func (prg *Purge) Start(opt PurgeOptions) (purgedFiles map[string]bool, purgedPh } } - if mutex.Worker.Canceled() { + if mutex.MainWorker.Canceled() { return purgedFiles, purgedPhotos, errors.New("purge canceled") } @@ -182,5 +182,5 @@ func (prg *Purge) Start(opt PurgeOptions) (purgedFiles map[string]bool, purgedPh // Cancel stops the current purge operation. func (prg *Purge) Cancel() { - mutex.Worker.Cancel() + mutex.MainWorker.Cancel() } diff --git a/internal/photoprism/resample.go b/internal/photoprism/resample.go index d42b7c8e9..08aabe1b4 100644 --- a/internal/photoprism/resample.go +++ b/internal/photoprism/resample.go @@ -24,11 +24,11 @@ func NewResample(conf *config.Config) *Resample { // Start creates default thumbnails for all files in originalsPath. func (rs *Resample) Start(force bool) error { - if err := mutex.Worker.Start(); err != nil { + if err := mutex.MainWorker.Start(); err != nil { return err } - defer mutex.Worker.Stop() + defer mutex.MainWorker.Stop() originalsPath := rs.conf.OriginalsPath() thumbnailsPath := rs.conf.ThumbPath() @@ -65,7 +65,7 @@ func (rs *Resample) Start(force bool) error { } }() - if mutex.Worker.Canceled() { + if mutex.MainWorker.Canceled() { return errors.New("resample: canceled") } diff --git a/internal/video/video.go b/internal/video/video.go index db5f50c9e..419c57033 100644 --- a/internal/video/video.go +++ b/internal/video/video.go @@ -1,5 +1,5 @@ /* -This package encapsulates JPEG thumbnail generation. +This package contains video related types and functions. Additional information can be found in our Developer Guide: @@ -8,12 +8,9 @@ https://github.com/photoprism/photoprism/wiki package video import ( - "github.com/photoprism/photoprism/internal/event" "github.com/photoprism/photoprism/pkg/fs" ) -var log = event.Log - type Type struct { Format fs.FileType Width int diff --git a/internal/video/video_test.go b/internal/video/video_test.go new file mode 100644 index 000000000..60d6e1968 --- /dev/null +++ b/internal/video/video_test.go @@ -0,0 +1,13 @@ +package video + +import "testing" + +func TestTypes(t *testing.T) { + if val := Types[""]; val != TypeMP4 { + t.Fatal("default type should be TypeMP4") + } + + if val := Types["mp4"]; val != TypeMP4 { + t.Fatal("mp4 type should be TypeMP4") + } +} diff --git a/internal/workers/groom.go b/internal/workers/groom.go new file mode 100644 index 000000000..cd47cf4f4 --- /dev/null +++ b/internal/workers/groom.go @@ -0,0 +1,42 @@ +package workers + +import ( + "github.com/photoprism/photoprism/internal/config" + "github.com/photoprism/photoprism/internal/mutex" +) + +// Groom represents a groom worker. +type Groom struct { + conf *config.Config +} + +// NewGroom returns a new groom worker. +func NewGroom(conf *config.Config) *Groom { + return &Groom{conf: conf} +} + +// logError logs an error message if err is not nil. +func (worker *Groom) logError(err error) { + if err != nil { + log.Errorf("groom: %s", err.Error()) + } +} + +// logWarn logs a warning message if err is not nil. +func (worker *Groom) logWarn(err error) { + if err != nil { + log.Warnf("groom: %s", err.Error()) + } +} + +// Start starts the groom worker. +func (worker *Groom) Start() (err error) { + if err := mutex.GroomWorker.Start(); err != nil { + worker.logWarn(err) + return err + } + + defer mutex.GroomWorker.Stop() + + return err +} diff --git a/internal/workers/groom_test.go b/internal/workers/groom_test.go new file mode 100644 index 000000000..8623a8b02 --- /dev/null +++ b/internal/workers/groom_test.go @@ -0,0 +1,31 @@ +package workers + +import ( + "testing" + + "github.com/photoprism/photoprism/internal/config" + "github.com/photoprism/photoprism/internal/mutex" + "github.com/stretchr/testify/assert" +) + +func TestGroom_Start(t *testing.T) { + conf := config.TestConfig() + + worker := NewGroom(conf) + + assert.IsType(t, &Groom{}, worker) + + if err := mutex.GroomWorker.Start(); err != nil { + t.Fatal(err) + } + + if err := worker.Start(); err == nil { + t.Fatal("error expected") + } + + mutex.GroomWorker.Stop() + + if err := worker.Start(); err != nil { + t.Fatal(err) + } +} diff --git a/internal/workers/share.go b/internal/workers/share.go index eb63b8cc9..c6fb0510e 100644 --- a/internal/workers/share.go +++ b/internal/workers/share.go @@ -21,19 +21,26 @@ type Share struct { conf *config.Config } -// NewShare returns a new service share worker. +// NewShare returns a new share worker. func NewShare(conf *config.Config) *Share { return &Share{conf: conf} } +// logError logs an error message if err is not nil. +func (worker *Share) logError(err error) { + if err != nil { + log.Errorf("share: %s", err.Error()) + } +} + // Start starts the share worker. -func (s *Share) Start() (err error) { - if err := mutex.Share.Start(); err != nil { +func (worker *Share) Start() (err error) { + if err := mutex.ShareWorker.Start(); err != nil { event.Error(fmt.Sprintf("share: %s", err.Error())) return err } - defer mutex.Share.Stop() + defer mutex.ShareWorker.Stop() f := form.AccountSearch{ Share: true, @@ -44,7 +51,7 @@ func (s *Share) Start() (err error) { // Upload newly shared files for _, a := range accounts { - if mutex.Share.Canceled() { + if mutex.ShareWorker.Canceled() { return nil } @@ -55,7 +62,7 @@ func (s *Share) Start() (err error) { files, err := query.FileShares(a.ID, entity.FileShareNew) if err != nil { - log.Errorf("share: %s", err.Error()) + worker.logError(err) continue } @@ -68,7 +75,7 @@ func (s *Share) Start() (err error) { existingDirs := make(map[string]string) for _, file := range files { - if mutex.Share.Canceled() { + if mutex.ShareWorker.Canceled() { return nil } @@ -81,7 +88,7 @@ func (s *Share) Start() (err error) { } } - srcFileName := path.Join(s.conf.OriginalsPath(), file.File.FileName) + srcFileName := path.Join(worker.conf.OriginalsPath(), file.File.FileName) if a.ShareSize != "" { thumbType, ok := thumb.Types[a.ShareSize] @@ -91,16 +98,16 @@ func (s *Share) Start() (err error) { continue } - srcFileName, err = thumb.FromFile(srcFileName, file.File.FileHash, s.conf.ThumbPath(), thumbType.Width, thumbType.Height, thumbType.Options...) + srcFileName, err = thumb.FromFile(srcFileName, file.File.FileHash, worker.conf.ThumbPath(), thumbType.Width, thumbType.Height, thumbType.Options...) if err != nil { - log.Errorf("share: %s", err) + worker.logError(err) continue } } if err := client.Upload(srcFileName, file.RemoteName); err != nil { - log.Errorf("share: %s", err.Error()) + worker.logError(err) file.Errors++ file.Error = err.Error() } else { @@ -114,19 +121,17 @@ func (s *Share) Start() (err error) { file.Status = entity.FileShareError } - if mutex.Share.Canceled() { + if mutex.ShareWorker.Canceled() { return nil } - if err := entity.Db().Save(&file).Error; err != nil { - log.Errorf("share: %s", err.Error()) - } + worker.logError(entity.Db().Save(&file).Error) } } // Remove previously shared files if expired for _, a := range accounts { - if mutex.Share.Canceled() { + if mutex.ShareWorker.Canceled() { return nil } @@ -137,7 +142,7 @@ func (s *Share) Start() (err error) { files, err := query.ExpiredFileShares(a) if err != nil { - log.Errorf("share: %s", err.Error()) + worker.logError(err) continue } @@ -149,7 +154,7 @@ func (s *Share) Start() (err error) { client := webdav.New(a.AccURL, a.AccUser, a.AccPass) for _, file := range files { - if mutex.Share.Canceled() { + if mutex.ShareWorker.Canceled() { return nil } @@ -164,7 +169,7 @@ func (s *Share) Start() (err error) { } if err := entity.Db().Save(&file).Error; err != nil { - log.Errorf("share: %s", err.Error()) + worker.logError(err) } } } diff --git a/internal/workers/share_test.go b/internal/workers/share_test.go new file mode 100644 index 000000000..1ecc236c0 --- /dev/null +++ b/internal/workers/share_test.go @@ -0,0 +1,16 @@ +package workers + +import ( + "testing" + + "github.com/photoprism/photoprism/internal/config" + "github.com/stretchr/testify/assert" +) + +func TestNewShare(t *testing.T) { + conf := config.TestConfig() + + worker := NewShare(conf) + + assert.IsType(t, &Share{}, worker) +} diff --git a/internal/workers/sync.go b/internal/workers/sync.go index e1e147ec4..fd50c90b8 100644 --- a/internal/workers/sync.go +++ b/internal/workers/sync.go @@ -18,28 +18,35 @@ type Sync struct { conf *config.Config } -// NewSync returns a new service sync worker. +// NewSync returns a new sync worker. func NewSync(conf *config.Config) *Sync { return &Sync{ conf: conf, } } -// Report logs an error message if err is not nil. -func (s *Sync) report(err error) { +// logError logs an error message if err is not nil. +func (worker *Sync) logError(err error) { if err != nil { log.Errorf("sync: %s", err.Error()) } } +// logWarn logs a warning message if err is not nil. +func (worker *Sync) logWarn(err error) { + if err != nil { + log.Warnf("sync: %s", err.Error()) + } +} + // Start starts the sync worker. -func (s *Sync) Start() (err error) { - if err := mutex.Sync.Start(); err != nil { +func (worker *Sync) Start() (err error) { + if err := mutex.SyncWorker.Start(); err != nil { event.Error(fmt.Sprintf("sync: %s", err.Error())) return err } - defer mutex.Sync.Stop() + defer mutex.SyncWorker.Stop() f := form.AccountSearch{ Sync: true, @@ -57,7 +64,7 @@ func (s *Sync) Start() (err error) { a.AccSync = false if err := entity.Db().Save(&a).Error; err != nil { - log.Errorf("sync: %s", err.Error()) + worker.logError(err) } else { log.Warnf("sync: disabled sync, %s failed more than %d times", a.AccName, a.RetryLimit) } @@ -74,7 +81,7 @@ func (s *Sync) Start() (err error) { switch a.SyncStatus { case entity.AccountSyncStatusRefresh: - if complete, err := s.refresh(a); err != nil { + if complete, err := worker.refresh(a); err != nil { accErrors++ accError = err.Error() } else if complete { @@ -92,7 +99,7 @@ func (s *Sync) Start() (err error) { } } case entity.AccountSyncStatusDownload: - if complete, err := s.download(a); err != nil { + if complete, err := worker.download(a); err != nil { accErrors++ accError = err.Error() } else if complete { @@ -106,7 +113,7 @@ func (s *Sync) Start() (err error) { } } case entity.AccountSyncStatusUpload: - if complete, err := s.upload(a); err != nil { + if complete, err := worker.upload(a); err != nil { accErrors++ accError = err.Error() } else if complete { @@ -123,17 +130,17 @@ func (s *Sync) Start() (err error) { syncStatus = entity.AccountSyncStatusRefresh } - if mutex.Sync.Canceled() { + if mutex.SyncWorker.Canceled() { return nil } // Only update the following fields to avoid overwriting other settings if err := a.Updates(map[string]interface{}{ - "AccError": accError, - "AccErrors": accErrors, + "AccError": accError, + "AccErrors": accErrors, "SyncStatus": syncStatus, - "SyncDate": syncDate}); err != nil { - log.Errorf("sync: %s", err.Error()) + "SyncDate": syncDate}); err != nil { + worker.logError(err) } else if synced { event.Publish("sync.synced", event.Data{"account": a}) } diff --git a/internal/workers/sync_download.go b/internal/workers/sync_download.go index 798f04996..2e15cc859 100644 --- a/internal/workers/sync_download.go +++ b/internal/workers/sync_download.go @@ -17,12 +17,12 @@ import ( type Downloads map[string][]entity.FileSync // downloadPath returns a temporary download path. -func (s *Sync) downloadPath() string { - return s.conf.TempPath() + "/sync" +func (worker *Sync) downloadPath() string { + return worker.conf.TempPath() + "/sync" } // relatedDownloads returns files to be downloaded grouped by prefix. -func (s *Sync) relatedDownloads(a entity.Account) (result Downloads, err error) { +func (worker *Sync) relatedDownloads(a entity.Account) (result Downloads, err error) { result = make(Downloads) maxResults := 1000 @@ -35,7 +35,7 @@ func (s *Sync) relatedDownloads(a entity.Account) (result Downloads, err error) // Group results by directory and base name for i, file := range files { - k := fs.AbsBase(file.RemoteName, s.conf.Settings().Index.Group) + k := fs.AbsBase(file.RemoteName, worker.conf.Settings().Index.Group) result[k] = append(result[k], file) @@ -49,7 +49,7 @@ func (s *Sync) relatedDownloads(a entity.Account) (result Downloads, err error) } // Downloads remote files in batches and imports / indexes them -func (s *Sync) download(a entity.Account) (complete bool, err error) { +func (worker *Sync) download(a entity.Account) (complete bool, err error) { // Set up index worker indexJobs := make(chan photoprism.IndexJob) @@ -61,10 +61,10 @@ func (s *Sync) download(a entity.Account) (complete bool, err error) { go photoprism.ImportWorker(importJobs) defer close(importJobs) - relatedFiles, err := s.relatedDownloads(a) + relatedFiles, err := worker.relatedDownloads(a) if err != nil { - log.Errorf("sync: %s", err.Error()) + worker.logError(err) return false, err } @@ -81,16 +81,16 @@ func (s *Sync) download(a entity.Account) (complete bool, err error) { var baseDir string if a.SyncFilenames { - baseDir = s.conf.OriginalsPath() + baseDir = worker.conf.OriginalsPath() } else { - baseDir = fmt.Sprintf("%s/%d", s.downloadPath(), a.ID) + baseDir = fmt.Sprintf("%s/%d", worker.downloadPath(), a.ID) } done := make(map[string]bool) for _, files := range relatedFiles { for i, file := range files { - if mutex.Sync.Canceled() { + if mutex.SyncWorker.Canceled() { return false, nil } @@ -106,7 +106,7 @@ func (s *Sync) download(a entity.Account) (complete bool, err error) { file.Status = entity.FileSyncExists } else { if err := client.Download(file.RemoteName, localName, false); err != nil { - log.Errorf("sync: %s", err.Error()) + worker.logError(err) file.Errors++ file.Error = err.Error() } else { @@ -114,13 +114,13 @@ func (s *Sync) download(a entity.Account) (complete bool, err error) { file.Status = entity.FileSyncDownloaded } - if mutex.Sync.Canceled() { + if mutex.SyncWorker.Canceled() { return false, nil } } if err := entity.Db().Save(&file).Error; err != nil { - log.Errorf("sync: %s", err.Error()) + worker.logError(err) } else { files[i] = file } @@ -137,10 +137,10 @@ func (s *Sync) download(a entity.Account) (complete bool, err error) { continue } - related, err := mf.RelatedFiles(s.conf.Settings().Index.Group) + related, err := mf.RelatedFiles(worker.conf.Settings().Index.Group) if err != nil { - log.Warnf("sync: %s", err.Error()) + worker.logWarn(err) continue } @@ -180,9 +180,7 @@ func (s *Sync) download(a entity.Account) (complete bool, err error) { } if len(done) > 0 { - if err := entity.UpdatePhotoCounts(); err != nil { - log.Errorf("sync: %s", err) - } + worker.logError(entity.UpdatePhotoCounts()) } return false, nil diff --git a/internal/workers/sync_refresh.go b/internal/workers/sync_refresh.go index ba0864c3c..a7ffbe17d 100644 --- a/internal/workers/sync_refresh.go +++ b/internal/workers/sync_refresh.go @@ -9,7 +9,7 @@ import ( ) // Updates the local list of remote files so that they can be downloaded in batches -func (s *Sync) refresh(a entity.Account) (complete bool, err error) { +func (worker *Sync) refresh(a entity.Account) (complete bool, err error) { if a.AccType != remote.ServiceWebDAV { return false, nil } @@ -26,7 +26,7 @@ func (s *Sync) refresh(a entity.Account) (complete bool, err error) { dirs := append(subDirs.Abs(), a.SyncPath) for _, dir := range dirs { - if mutex.Sync.Canceled() { + if mutex.SyncWorker.Canceled() { return false, nil } @@ -38,7 +38,7 @@ func (s *Sync) refresh(a entity.Account) (complete bool, err error) { } for _, file := range files { - if mutex.Sync.Canceled() { + if mutex.SyncWorker.Canceled() { return false, nil } @@ -69,11 +69,11 @@ func (s *Sync) refresh(a entity.Account) (complete bool, err error) { } if f.Status == entity.FileSyncIgnore && mediaType == fs.MediaRaw && a.SyncRaw { - s.report(f.Update("Status", entity.FileSyncNew)) + worker.logError(f.Update("Status", entity.FileSyncNew)) } if f.Status == entity.FileSyncDownloaded && !f.RemoteDate.Equal(file.Date) { - s.report(f.Updates(map[string]interface{}{ + worker.logError(f.Updates(map[string]interface{}{ "Status": entity.FileSyncNew, "RemoteDate": file.Date, "RemoteSize": file.Size, diff --git a/internal/workers/sync_test.go b/internal/workers/sync_test.go new file mode 100644 index 000000000..3163e9711 --- /dev/null +++ b/internal/workers/sync_test.go @@ -0,0 +1,16 @@ +package workers + +import ( + "testing" + + "github.com/photoprism/photoprism/internal/config" + "github.com/stretchr/testify/assert" +) + +func TestNewSync(t *testing.T) { + conf := config.TestConfig() + + worker := NewSync(conf) + + assert.IsType(t, &Sync{}, worker) +} diff --git a/internal/workers/sync_upload.go b/internal/workers/sync_upload.go index 7804b45b0..405049253 100644 --- a/internal/workers/sync_upload.go +++ b/internal/workers/sync_upload.go @@ -13,7 +13,7 @@ import ( ) // Uploads local files to a remote account -func (s *Sync) upload(a entity.Account) (complete bool, err error) { +func (worker *Sync) upload(a entity.Account) (complete bool, err error) { maxResults := 250 // Get upload file list from database @@ -33,11 +33,11 @@ func (s *Sync) upload(a entity.Account) (complete bool, err error) { existingDirs := make(map[string]string) for _, file := range files { - if mutex.Sync.Canceled() { + if mutex.SyncWorker.Canceled() { return false, nil } - fileName := path.Join(s.conf.OriginalsPath(), file.FileName) + fileName := path.Join(worker.conf.OriginalsPath(), file.FileName) remoteName := path.Join(a.SyncPath, file.FileName) remoteDir := filepath.Dir(remoteName) @@ -49,7 +49,7 @@ func (s *Sync) upload(a entity.Account) (complete bool, err error) { } if err := client.Upload(fileName, remoteName); err != nil { - log.Errorf("sync: %s", err.Error()) + worker.logError(err) continue // try again next time } @@ -63,13 +63,11 @@ func (s *Sync) upload(a entity.Account) (complete bool, err error) { fileSync.Error = "" fileSync.Errors = 0 - if mutex.Sync.Canceled() { + if mutex.SyncWorker.Canceled() { return false, nil } - if err := entity.Db().Save(&fileSync).Error; err != nil { - log.Errorf("sync: %s", err.Error()) - } + worker.logError(entity.Db().Save(&fileSync).Error) } return false, nil diff --git a/internal/workers/workers.go b/internal/workers/workers.go index 7416844fd..3d7d5d46f 100644 --- a/internal/workers/workers.go +++ b/internal/workers/workers.go @@ -21,10 +21,12 @@ func Start(conf *config.Config) { case <-stop: log.Info("shutting down workers") ticker.Stop() - mutex.Share.Cancel() - mutex.Sync.Cancel() + mutex.GroomWorker.Cancel() + mutex.ShareWorker.Cancel() + mutex.SyncWorker.Cancel() return case <-ticker.C: + StartGroom(conf) StartShare(conf) StartSync(conf) } @@ -37,12 +39,24 @@ func Stop() { stop <- true } +// StartGroom runs the groom worker once. +func StartGroom(conf *config.Config) { + if !mutex.WorkersBusy() { + go func() { + worker := NewGroom(conf) + if err := worker.Start(); err != nil { + log.Error(err) + } + }() + } +} + // StartShare runs the share worker once. func StartShare(conf *config.Config) { - if !mutex.Share.Busy() { + if !mutex.ShareWorker.Busy() { go func() { - s := NewShare(conf) - if err := s.Start(); err != nil { + worker := NewShare(conf) + if err := worker.Start(); err != nil { log.Error(err) } }() @@ -51,10 +65,10 @@ func StartShare(conf *config.Config) { // StartShare runs the sync worker once. func StartSync(conf *config.Config) { - if !mutex.Sync.Busy() { + if !mutex.SyncWorker.Busy() { go func() { - s := NewSync(conf) - if err := s.Start(); err != nil { + worker := NewSync(conf) + if err := worker.Start(); err != nil { log.Error(err) } }() diff --git a/internal/workers/workers_test.go b/internal/workers/workers_test.go new file mode 100644 index 000000000..2f6faab74 --- /dev/null +++ b/internal/workers/workers_test.go @@ -0,0 +1,22 @@ +package workers + +import ( + "os" + "testing" + + "github.com/photoprism/photoprism/internal/config" + "github.com/sirupsen/logrus" +) + +func TestMain(m *testing.M) { + log = logrus.StandardLogger() + log.SetLevel(logrus.DebugLevel) + + c := config.TestConfig() + + code := m.Run() + + _ = c.CloseDb() + + os.Exit(code) +}