Some improvements based on golangci-lint checks, and adding more rules

This commit is contained in:
Jesús Espino 2020-10-22 13:34:42 +02:00
parent 3516e6b26c
commit 607b8aa063
14 changed files with 122 additions and 90 deletions

View file

@ -25,11 +25,14 @@ server-lint:
echo "golangci-lint is not installed. Please see https://github.com/golangci/golangci-lint#install for installation instructions."; \ echo "golangci-lint is not installed. Please see https://github.com/golangci/golangci-lint#install for installation instructions."; \
exit 1; \ exit 1; \
fi; \ fi; \
cd server; golangci-lint run ./... cd server; golangci-lint run -p format -p unused -p complexity -p bugs -p performance -E asciicheck -E depguard -E dogsled -E dupl -E funlen -E gochecknoglobals -E gochecknoinits -E goconst -E gocritic -E godot -E godox -E goerr113 -E goheader -E golint -E gomnd -E gomodguard -E goprintffuncname -E gosimple -E interfacer -E lll -E misspell -E nlreturn -E nolintlint -E stylecheck -E unconvert -E whitespace -E wsl --skip-dirs services/store/sqlstore/migrations/ ./...
server-test: server-test:
cd server; go test ./... cd server; go test ./...
server-doc:
cd server; go doc ./...
watch-server: watch-server:
cd server; modd cd server; modd

View file

@ -83,7 +83,7 @@ func (a *API) handlePostBlocks(w http.ResponseWriter, r *http.Request) {
}() }()
var blocks []model.Block var blocks []model.Block
err = json.Unmarshal([]byte(requestBody), &blocks) err = json.Unmarshal(requestBody, &blocks)
if err != nil { if err != nil {
errorResponse(w, http.StatusInternalServerError, ``) errorResponse(w, http.StatusInternalServerError, ``)
return return
@ -95,15 +95,16 @@ func (a *API) handlePostBlocks(w http.ResponseWriter, r *http.Request) {
errorResponse(w, http.StatusInternalServerError, fmt.Sprintf(`{"description": "missing type", "id": "%s"}`, block.ID)) errorResponse(w, http.StatusInternalServerError, fmt.Sprintf(`{"description": "missing type", "id": "%s"}`, block.ID))
return return
} }
if block.CreateAt < 1 { if block.CreateAt < 1 {
errorResponse(w, http.StatusInternalServerError, fmt.Sprintf(`{"description": "invalid createAt", "id": "%s"}`, block.ID)) errorResponse(w, http.StatusInternalServerError, fmt.Sprintf(`{"description": "invalid createAt", "id": "%s"}`, block.ID))
return return
} }
if block.UpdateAt < 1 { if block.UpdateAt < 1 {
errorResponse(w, http.StatusInternalServerError, fmt.Sprintf(`{"description": "invalid updateAt", "id": "%s"}`, block.ID)) errorResponse(w, http.StatusInternalServerError, fmt.Sprintf(`{"description": "invalid updateAt", "id": "%s"}`, block.ID))
return return
} }
} }
err = a.app().InsertBlocks(blocks) err = a.app().InsertBlocks(blocks)
@ -190,7 +191,7 @@ func (a *API) handleImport(w http.ResponseWriter, r *http.Request) {
}() }()
var blocks []model.Block var blocks []model.Block
err = json.Unmarshal([]byte(requestBody), &blocks) err = json.Unmarshal(requestBody, &blocks)
if err != nil { if err != nil {
errorResponse(w, http.StatusInternalServerError, ``) errorResponse(w, http.StatusInternalServerError, ``)
return return
@ -229,6 +230,7 @@ func (a *API) handleServeFile(w http.ResponseWriter, r *http.Request) {
func (a *API) handleUploadFile(w http.ResponseWriter, r *http.Request) { func (a *API) handleUploadFile(w http.ResponseWriter, r *http.Request) {
fmt.Println(`handleUploadFile`) fmt.Println(`handleUploadFile`)
file, handle, err := r.FormFile("file") file, handle, err := r.FormFile("file")
if err != nil { if err != nil {
fmt.Fprintf(w, "%v", err) fmt.Fprintf(w, "%v", err)
@ -243,6 +245,7 @@ func (a *API) handleUploadFile(w http.ResponseWriter, r *http.Request) {
jsonStringResponse(w, http.StatusInternalServerError, `{}`) jsonStringResponse(w, http.StatusInternalServerError, `{}`)
return return
} }
log.Printf(`saveFile, url: %s`, url) log.Printf(`saveFile, url: %s`, url)
json := fmt.Sprintf(`{ "url": "%s" }`, url) json := fmt.Sprintf(`{ "url": "%s" }`, url)
jsonStringResponse(w, http.StatusOK, json) jsonStringResponse(w, http.StatusOK, json)

View file

@ -10,10 +10,10 @@ import (
type App struct { type App struct {
config *config.Configuration config *config.Configuration
store store.Store store store.Store
wsServer *ws.WSServer wsServer *ws.Server
filesBackend filesstore.FileBackend filesBackend filesstore.FileBackend
} }
func New(config *config.Configuration, store store.Store, wsServer *ws.WSServer, filesBackend filesstore.FileBackend) *App { func New(config *config.Configuration, store store.Store, wsServer *ws.Server, filesBackend filesstore.FileBackend) *App {
return &App{config: config, store: store, wsServer: wsServer, filesBackend: filesBackend} return &App{config: config, store: store, wsServer: wsServer, filesBackend: filesBackend}
} }

View file

@ -24,12 +24,14 @@ func (a *App) InsertBlock(block model.Block) error {
func (a *App) InsertBlocks(blocks []model.Block) error { func (a *App) InsertBlocks(blocks []model.Block) error {
var blockIDsToNotify = []string{} var blockIDsToNotify = []string{}
uniqueBlockIDs := make(map[string]bool) uniqueBlockIDs := make(map[string]bool)
for _, block := range blocks { for _, block := range blocks {
if !uniqueBlockIDs[block.ID] { if !uniqueBlockIDs[block.ID] {
blockIDsToNotify = append(blockIDsToNotify, block.ID) blockIDsToNotify = append(blockIDsToNotify, block.ID)
} }
if len(block.ParentID) > 0 && !uniqueBlockIDs[block.ParentID] { if len(block.ParentID) > 0 && !uniqueBlockIDs[block.ParentID] {
blockIDsToNotify = append(blockIDsToNotify, block.ParentID) blockIDsToNotify = append(blockIDsToNotify, block.ParentID)
} }

View file

@ -16,14 +16,16 @@ func TestGetParentID(t *testing.T) {
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
defer ctrl.Finish() defer ctrl.Finish()
store := mockstore.NewMockStore(ctrl) store := mockstore.NewMockStore(ctrl)
wsserver := ws.NewWSServer() wsserver := ws.NewServer()
app := New(&config.Configuration{}, store, wsserver, &mocks.FileBackend{}) app := New(&config.Configuration{}, store, wsserver, &mocks.FileBackend{})
t.Run("success query", func(t *testing.T) { t.Run("success query", func(t *testing.T) {
store.EXPECT().GetParentID(gomock.Eq("test-id")).Return("test-parent-id", nil) store.EXPECT().GetParentID(gomock.Eq("test-id")).Return("test-parent-id", nil)
result, err := app.GetParentID("test-id") result, err := app.GetParentID("test-id")
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, "test-parent-id", result) require.Equal(t, "test-parent-id", result)
}) })
t.Run("fail query", func(t *testing.T) { t.Run("fail query", func(t *testing.T) {
store.EXPECT().GetParentID(gomock.Eq("test-id")).Return("", errors.New("block-not-found")) store.EXPECT().GetParentID(gomock.Eq("test-id")).Return("", errors.New("block-not-found"))
_, err := app.GetParentID("test-id") _, err := app.GetParentID("test-id")

View file

@ -15,6 +15,10 @@ import (
// ---------------------------------------------------------------------------------------------------- // ----------------------------------------------------------------------------------------------------
// WebSocket OnChange listener // WebSocket OnChange listener
const (
timeBetweenPidMonitoringChecks = 2 * time.Second
)
func isProcessRunning(pid int) bool { func isProcessRunning(pid int) bool {
process, err := os.FindProcess(pid) process, err := os.FindProcess(pid)
if err != nil { if err != nil {
@ -27,13 +31,15 @@ func isProcessRunning(pid int) bool {
func monitorPid(pid int) { func monitorPid(pid int) {
log.Printf("Monitoring PID: %d", pid) log.Printf("Monitoring PID: %d", pid)
go func() { go func() {
for { for {
if !isProcessRunning(pid) { if !isProcessRunning(pid) {
log.Printf("Monitored process not found, exiting.") log.Printf("Monitored process not found, exiting.")
os.Exit(1) os.Exit(1)
} }
time.Sleep(2 * time.Second)
time.Sleep(timeBetweenPidMonitoringChecks)
} }
}() }()
} }

View file

@ -26,11 +26,11 @@ const CurrentVersion = "0.0.1"
type Server struct { type Server struct {
config *config.Configuration config *config.Configuration
wsServer *ws.WSServer wsServer *ws.Server
webServer *web.WebServer webServer *web.WebServer
store store.Store store store.Store
filesBackend filesstore.FileBackend filesBackend filesstore.FileBackend
telemetry *telemetry.TelemetryService telemetry *telemetry.Service
logger *zap.Logger logger *zap.Logger
} }
@ -46,7 +46,7 @@ func New(cfg *config.Configuration) (*Server, error) {
return nil, err return nil, err
} }
wsServer := ws.NewWSServer() wsServer := ws.NewServer()
filesBackendSettings := model.FileSettings{} filesBackendSettings := model.FileSettings{}
filesBackendSettings.SetDefaults(false) filesBackendSettings.SetDefaults(false)
@ -67,11 +67,13 @@ func New(cfg *config.Configuration) (*Server, error) {
// Ctrl+C handling // Ctrl+C handling
handler := make(chan os.Signal, 1) handler := make(chan os.Signal, 1)
signal.Notify(handler, os.Interrupt) signal.Notify(handler, os.Interrupt)
go func() { go func() {
for sig := range handler { for sig := range handler {
// sig is a ^C, handle it // sig is a ^C, handle it
if sig == os.Interrupt { if sig == os.Interrupt {
os.Exit(1) os.Exit(1)
break break
} }
} }

View file

@ -41,9 +41,7 @@ func createTask(name string, function TaskFunc, interval time.Duration, recurrin
defer close(task.cancelled) defer close(task.cancelled)
ticker := time.NewTicker(interval) ticker := time.NewTicker(interval)
defer func() { defer ticker.Stop()
ticker.Stop()
}()
for { for {
select { select {

View file

@ -12,67 +12,67 @@ import (
) )
func TestCreateTask(t *testing.T) { func TestCreateTask(t *testing.T) {
TASK_NAME := "Test Task" taskName := "Test Task"
TASK_TIME := time.Millisecond * 200 taskTime := time.Millisecond * 200
TASK_WAIT := time.Millisecond * 100 taskWait := time.Millisecond * 100
executionCount := new(int32) executionCount := new(int32)
testFunc := func() { testFunc := func() {
atomic.AddInt32(executionCount, 1) atomic.AddInt32(executionCount, 1)
} }
task := CreateTask(TASK_NAME, testFunc, TASK_TIME) task := CreateTask(taskName, testFunc, taskTime)
assert.EqualValues(t, 0, atomic.LoadInt32(executionCount)) assert.EqualValues(t, 0, atomic.LoadInt32(executionCount))
time.Sleep(TASK_TIME + TASK_WAIT) time.Sleep(taskTime + taskWait)
assert.EqualValues(t, 1, atomic.LoadInt32(executionCount)) assert.EqualValues(t, 1, atomic.LoadInt32(executionCount))
assert.Equal(t, TASK_NAME, task.Name) assert.Equal(t, taskName, task.Name)
assert.Equal(t, TASK_TIME, task.Interval) assert.Equal(t, taskTime, task.Interval)
assert.False(t, task.Recurring) assert.False(t, task.Recurring)
} }
func TestCreateRecurringTask(t *testing.T) { func TestCreateRecurringTask(t *testing.T) {
TASK_NAME := "Test Recurring Task" taskName := "Test Recurring Task"
TASK_TIME := time.Millisecond * 200 taskTime := time.Millisecond * 200
TASK_WAIT := time.Millisecond * 100 taskWait := time.Millisecond * 100
executionCount := new(int32) executionCount := new(int32)
testFunc := func() { testFunc := func() {
atomic.AddInt32(executionCount, 1) atomic.AddInt32(executionCount, 1)
} }
task := CreateRecurringTask(TASK_NAME, testFunc, TASK_TIME) task := CreateRecurringTask(taskName, testFunc, taskTime)
assert.EqualValues(t, 0, atomic.LoadInt32(executionCount)) assert.EqualValues(t, 0, atomic.LoadInt32(executionCount))
time.Sleep(TASK_TIME + TASK_WAIT) time.Sleep(taskTime + taskWait)
assert.EqualValues(t, 1, atomic.LoadInt32(executionCount)) assert.EqualValues(t, 1, atomic.LoadInt32(executionCount))
time.Sleep(TASK_TIME) time.Sleep(taskTime)
assert.EqualValues(t, 2, atomic.LoadInt32(executionCount)) assert.EqualValues(t, 2, atomic.LoadInt32(executionCount))
assert.Equal(t, TASK_NAME, task.Name) assert.Equal(t, taskName, task.Name)
assert.Equal(t, TASK_TIME, task.Interval) assert.Equal(t, taskTime, task.Interval)
assert.True(t, task.Recurring) assert.True(t, task.Recurring)
task.Cancel() task.Cancel()
} }
func TestCancelTask(t *testing.T) { func TestCancelTask(t *testing.T) {
TASK_NAME := "Test Task" taskName := "Test Task"
TASK_TIME := time.Millisecond * 100 taskTime := time.Millisecond * 100
TASK_WAIT := time.Millisecond * 100 taskWait := time.Millisecond * 100
executionCount := new(int32) executionCount := new(int32)
testFunc := func() { testFunc := func() {
atomic.AddInt32(executionCount, 1) atomic.AddInt32(executionCount, 1)
} }
task := CreateTask(TASK_NAME, testFunc, TASK_TIME) task := CreateTask(taskName, testFunc, taskTime)
assert.EqualValues(t, 0, atomic.LoadInt32(executionCount)) assert.EqualValues(t, 0, atomic.LoadInt32(executionCount))
task.Cancel() task.Cancel()
time.Sleep(TASK_TIME + TASK_WAIT) time.Sleep(taskTime + taskWait)
assert.EqualValues(t, 0, atomic.LoadInt32(executionCount)) assert.EqualValues(t, 0, atomic.LoadInt32(executionCount))
} }

View file

@ -17,7 +17,8 @@ func (s *SQLStore) latestsBlocksSubquery() sq.SelectBuilder {
} }
func (s *SQLStore) GetBlocksWithParentAndType(parentID string, blockType string) ([]model.Block, error) { func (s *SQLStore) GetBlocksWithParentAndType(parentID string, blockType string) ([]model.Block, error) {
query := s.getQueryBuilder().Select("id", "parent_id", "schema", "type", "title", "COALESCE(\"fields\", '{}')", "create_at", "update_at", "delete_at"). query := s.getQueryBuilder().
Select("id", "parent_id", "schema", "type", "title", "COALESCE(\"fields\", '{}')", "create_at", "update_at", "delete_at").
FromSelect(s.latestsBlocksSubquery(), "latest"). FromSelect(s.latestsBlocksSubquery(), "latest").
Where(sq.Eq{"delete_at": 0}). Where(sq.Eq{"delete_at": 0}).
Where(sq.Eq{"parent_id": parentID}). Where(sq.Eq{"parent_id": parentID}).
@ -25,6 +26,7 @@ func (s *SQLStore) GetBlocksWithParentAndType(parentID string, blockType string)
rows, err := query.Query() rows, err := query.Query()
if err != nil { if err != nil {
log.Printf(`getBlocksWithParentAndType ERROR: %v`, err) log.Printf(`getBlocksWithParentAndType ERROR: %v`, err)
return nil, err return nil, err
} }
@ -32,7 +34,8 @@ func (s *SQLStore) GetBlocksWithParentAndType(parentID string, blockType string)
} }
func (s *SQLStore) GetBlocksWithParent(parentID string) ([]model.Block, error) { func (s *SQLStore) GetBlocksWithParent(parentID string) ([]model.Block, error) {
query := s.getQueryBuilder().Select("id", "parent_id", "schema", "type", "title", "COALESCE(\"fields\", '{}')", "create_at", "update_at", "delete_at"). query := s.getQueryBuilder().
Select("id", "parent_id", "schema", "type", "title", "COALESCE(\"fields\", '{}')", "create_at", "update_at", "delete_at").
FromSelect(s.latestsBlocksSubquery(), "latest"). FromSelect(s.latestsBlocksSubquery(), "latest").
Where(sq.Eq{"delete_at": 0}). Where(sq.Eq{"delete_at": 0}).
Where(sq.Eq{"parent_id": parentID}) Where(sq.Eq{"parent_id": parentID})
@ -47,7 +50,8 @@ func (s *SQLStore) GetBlocksWithParent(parentID string) ([]model.Block, error) {
} }
func (s *SQLStore) GetBlocksWithType(blockType string) ([]model.Block, error) { func (s *SQLStore) GetBlocksWithType(blockType string) ([]model.Block, error) {
query := s.getQueryBuilder().Select("id", "parent_id", "schema", "type", "title", "COALESCE(\"fields\", '{}')", "create_at", "update_at", "delete_at"). query := s.getQueryBuilder().
Select("id", "parent_id", "schema", "type", "title", "COALESCE(\"fields\", '{}')", "create_at", "update_at", "delete_at").
FromSelect(s.latestsBlocksSubquery(), "latest"). FromSelect(s.latestsBlocksSubquery(), "latest").
Where(sq.Eq{"delete_at": 0}). Where(sq.Eq{"delete_at": 0}).
Where(sq.Eq{"type": blockType}) Where(sq.Eq{"type": blockType})
@ -61,7 +65,8 @@ func (s *SQLStore) GetBlocksWithType(blockType string) ([]model.Block, error) {
} }
func (s *SQLStore) GetSubTree(blockID string) ([]model.Block, error) { func (s *SQLStore) GetSubTree(blockID string) ([]model.Block, error) {
query := s.getQueryBuilder().Select("id", "parent_id", "schema", "type", "title", "COALESCE(\"fields\", '{}')", "create_at", "update_at", "delete_at"). query := s.getQueryBuilder().
Select("id", "parent_id", "schema", "type", "title", "COALESCE(\"fields\", '{}')", "create_at", "update_at", "delete_at").
FromSelect(s.latestsBlocksSubquery(), "latest"). FromSelect(s.latestsBlocksSubquery(), "latest").
Where(sq.Eq{"delete_at": 0}). Where(sq.Eq{"delete_at": 0}).
Where(sq.Or{sq.Eq{"id": blockID}, sq.Eq{"parent_id": blockID}}) Where(sq.Or{sq.Eq{"id": blockID}, sq.Eq{"parent_id": blockID}})
@ -76,7 +81,8 @@ func (s *SQLStore) GetSubTree(blockID string) ([]model.Block, error) {
} }
func (s *SQLStore) GetAllBlocks() ([]model.Block, error) { func (s *SQLStore) GetAllBlocks() ([]model.Block, error) {
query := s.getQueryBuilder().Select("id", "parent_id", "schema", "type", "title", "COALESCE(\"fields\", '{}')", "create_at", "update_at", "delete_at"). query := s.getQueryBuilder().
Select("id", "parent_id", "schema", "type", "title", "COALESCE(\"fields\", '{}')", "create_at", "update_at", "delete_at").
FromSelect(s.latestsBlocksSubquery(), "latest"). FromSelect(s.latestsBlocksSubquery(), "latest").
Where(sq.Eq{"delete_at": 0}) Where(sq.Eq{"delete_at": 0})
@ -97,6 +103,7 @@ func blocksFromRows(rows *sql.Rows) ([]model.Block, error) {
for rows.Next() { for rows.Next() {
var block model.Block var block model.Block
var fieldsJSON string var fieldsJSON string
err := rows.Scan( err := rows.Scan(
&block.ID, &block.ID,
&block.ParentID, &block.ParentID,

View file

@ -1,6 +1,8 @@
package sqlstore package sqlstore
import ( import (
"errors"
"github.com/golang-migrate/migrate/v4" "github.com/golang-migrate/migrate/v4"
"github.com/golang-migrate/migrate/v4/database" "github.com/golang-migrate/migrate/v4/database"
"github.com/golang-migrate/migrate/v4/database/postgres" "github.com/golang-migrate/migrate/v4/database/postgres"
@ -13,28 +15,24 @@ import (
) )
func (s *SQLStore) Migrate() error { func (s *SQLStore) Migrate() error {
var bresource *bindata.AssetSource
var driver database.Driver var driver database.Driver
var err error var err error
var bresource *bindata.AssetSource
if s.dbType == "sqlite3" { if s.dbType == "sqlite3" {
driver, err = sqlite3.WithInstance(s.db, &sqlite3.Config{}) driver, err = sqlite3.WithInstance(s.db, &sqlite3.Config{})
if err != nil { if err != nil {
return err return err
} }
bresource = bindata.Resource(sqlite.AssetNames(), bresource = bindata.Resource(sqlite.AssetNames(), sqlite.Asset)
func(name string) ([]byte, error) {
return sqlite.Asset(name)
})
} }
if s.dbType == "postgres" { if s.dbType == "postgres" {
driver, err = postgres.WithInstance(s.db, &postgres.Config{}) driver, err = postgres.WithInstance(s.db, &postgres.Config{})
if err != nil { if err != nil {
return err return err
} }
bresource = bindata.Resource(pgmigrations.AssetNames(), bresource = bindata.Resource(pgmigrations.AssetNames(), pgmigrations.Asset)
func(name string) ([]byte, error) {
return pgmigrations.Asset(name)
})
} }
d, err := bindata.WithInstance(bresource) d, err := bindata.WithInstance(bresource)
@ -48,8 +46,9 @@ func (s *SQLStore) Migrate() error {
} }
err = m.Up() err = m.Up()
if err != nil && err != migrate.ErrNoChange { if err != nil && errors.Is(err, migrate.ErrNoChange) {
return err return err
} }
return nil return nil
} }

View file

@ -14,19 +14,15 @@ import (
) )
const ( const (
DAY_MILLISECONDS = 24 * 60 * 60 * 1000 rudderKey = "placeholder_rudder_key"
MONTH_MILLISECONDS = 31 * DAY_MILLISECONDS rudderDataplaneURL = "placeholder_rudder_dataplane_url"
timeBetweenTelemetryChecks = 10 * time.Minute
RUDDER_KEY = "placeholder_rudder_key"
RUDDER_DATAPLANE_URL = "placeholder_rudder_dataplane_url"
TRACK_CONFIG = "config"
) )
type telemetryTracker func() map[string]interface{} type Tracker func() map[string]interface{}
type TelemetryService struct { type Service struct {
trackers map[string]telemetryTracker trackers map[string]Tracker
log *log.Logger log *log.Logger
rudderClient rudder.Client rudderClient rudder.Client
telemetryID string telemetryID string
@ -35,25 +31,25 @@ type TelemetryService struct {
type RudderConfig struct { type RudderConfig struct {
RudderKey string RudderKey string
DataplaneUrl string DataplaneURL string
} }
func New(telemetryID string, log *log.Logger) *TelemetryService { func New(telemetryID string, log *log.Logger) *Service {
service := &TelemetryService{ service := &Service{
log: log, log: log,
telemetryID: telemetryID, telemetryID: telemetryID,
trackers: map[string]telemetryTracker{}, trackers: map[string]Tracker{},
} }
return service return service
} }
func (ts *TelemetryService) RegisterTracker(name string, tracker telemetryTracker) { func (ts *Service) RegisterTracker(name string, tracker Tracker) {
ts.trackers[name] = tracker ts.trackers[name] = tracker
} }
func (ts *TelemetryService) getRudderConfig() RudderConfig { func (ts *Service) getRudderConfig() RudderConfig {
if !strings.Contains(RUDDER_KEY, "placeholder") && !strings.Contains(RUDDER_DATAPLANE_URL, "placeholder") { if !strings.Contains(rudderKey, "placeholder") && !strings.Contains(rudderDataplaneURL, "placeholder") {
return RudderConfig{RUDDER_KEY, RUDDER_DATAPLANE_URL} return RudderConfig{rudderKey, rudderDataplaneURL}
} else if os.Getenv("RUDDER_KEY") != "" && os.Getenv("RUDDER_DATAPLANE_URL") != "" { } else if os.Getenv("RUDDER_KEY") != "" && os.Getenv("RUDDER_DATAPLANE_URL") != "" {
return RudderConfig{os.Getenv("RUDDER_KEY"), os.Getenv("RUDDER_DATAPLANE_URL")} return RudderConfig{os.Getenv("RUDDER_KEY"), os.Getenv("RUDDER_DATAPLANE_URL")}
} else { } else {
@ -61,17 +57,18 @@ func (ts *TelemetryService) getRudderConfig() RudderConfig {
} }
} }
func (ts *TelemetryService) sendDailyTelemetry(override bool) { func (ts *Service) sendDailyTelemetry(override bool) {
config := ts.getRudderConfig() config := ts.getRudderConfig()
if (config.DataplaneUrl != "" && config.RudderKey != "") || override { if (config.DataplaneURL != "" && config.RudderKey != "") || override {
ts.initRudder(config.DataplaneUrl, config.RudderKey) ts.initRudder(config.DataplaneURL, config.RudderKey)
for name, tracker := range ts.trackers { for name, tracker := range ts.trackers {
ts.sendTelemetry(name, tracker()) ts.sendTelemetry(name, tracker())
} }
} }
} }
func (ts *TelemetryService) sendTelemetry(event string, properties map[string]interface{}) { func (ts *Service) sendTelemetry(event string, properties map[string]interface{}) {
if ts.rudderClient != nil { if ts.rudderClient != nil {
var context *rudder.Context var context *rudder.Context
ts.rudderClient.Enqueue(rudder.Track{ ts.rudderClient.Enqueue(rudder.Track{
@ -83,13 +80,13 @@ func (ts *TelemetryService) sendTelemetry(event string, properties map[string]in
} }
} }
func (ts *TelemetryService) initRudder(endpoint string, rudderKey string) { func (ts *Service) initRudder(endpoint string, rudderKey string) {
if ts.rudderClient == nil { if ts.rudderClient == nil {
config := rudder.Config{} config := rudder.Config{}
config.Logger = rudder.StdLogger(ts.log) config.Logger = rudder.StdLogger(ts.log)
config.Endpoint = endpoint config.Endpoint = endpoint
// For testing // For testing
if endpoint != RUDDER_DATAPLANE_URL { if endpoint != rudderDataplaneURL {
config.Verbose = true config.Verbose = true
config.BatchSize = 1 config.BatchSize = 1
} }
@ -106,7 +103,7 @@ func (ts *TelemetryService) initRudder(endpoint string, rudderKey string) {
} }
} }
func (ts *TelemetryService) doTelemetryIfNeeded(firstRun time.Time) { func (ts *Service) doTelemetryIfNeeded(firstRun time.Time) {
hoursSinceFirstServerRun := time.Since(firstRun).Hours() hoursSinceFirstServerRun := time.Since(firstRun).Hours()
// Send once every 10 minutes for the first hour // Send once every 10 minutes for the first hour
// Send once every hour thereafter for the first 12 hours // Send once every hour thereafter for the first 12 hours
@ -120,21 +117,21 @@ func (ts *TelemetryService) doTelemetryIfNeeded(firstRun time.Time) {
} }
} }
func (ts *TelemetryService) RunTelemetryJob(firstRun int64) { func (ts *Service) RunTelemetryJob(firstRun int64) {
// Send on boot // Send on boot
ts.doTelemetry() ts.doTelemetry()
scheduler.CreateRecurringTask("Telemetry", func() { scheduler.CreateRecurringTask("Telemetry", func() {
ts.doTelemetryIfNeeded(time.Unix(0, firstRun*int64(time.Millisecond))) ts.doTelemetryIfNeeded(time.Unix(0, firstRun*int64(time.Millisecond)))
}, time.Minute*10) }, timeBetweenTelemetryChecks)
} }
func (ts *TelemetryService) doTelemetry() { func (ts *Service) doTelemetry() {
ts.timestampLastTelemetrySent = time.Now() ts.timestampLastTelemetrySent = time.Now()
ts.sendDailyTelemetry(false) ts.sendDailyTelemetry(false)
} }
// Shutdown closes the telemetry client. // Shutdown closes the telemetry client.
func (ts *TelemetryService) Shutdown() error { func (ts *Service) Shutdown() error {
if ts.rudderClient != nil { if ts.rudderClient != nil {
return ts.rudderClient.Close() return ts.rudderClient.Close()
} }

View file

@ -53,19 +53,23 @@ func (ws *WebServer) Start() error {
urlPort := fmt.Sprintf(`:%d`, ws.port) urlPort := fmt.Sprintf(`:%d`, ws.port)
var isSSL = ws.ssl && fileExists("./cert/cert.pem") && fileExists("./cert/key.pem") var isSSL = ws.ssl && fileExists("./cert/cert.pem") && fileExists("./cert/key.pem")
if isSSL { if isSSL {
log.Println("https server started on ", urlPort) log.Println("https server started on ", urlPort)
err := http.ListenAndServeTLS(urlPort, "./cert/cert.pem", "./cert/key.pem", nil) err := http.ListenAndServeTLS(urlPort, "./cert/cert.pem", "./cert/key.pem", nil)
if err != nil { if err != nil {
return err return err
} }
return nil return nil
} }
log.Println("http server started on ", urlPort) log.Println("http server started on ", urlPort)
err := http.ListenAndServe(urlPort, nil) err := http.ListenAndServe(urlPort, nil)
if err != nil { if err != nil {
return err return err
} }
return nil return nil
} }
@ -75,5 +79,6 @@ func fileExists(path string) bool {
if os.IsNotExist(err) { if os.IsNotExist(err) {
return false return false
} }
return err == nil return err == nil
} }

View file

@ -11,27 +11,29 @@ import (
) )
// RegisterRoutes registeres routes // RegisterRoutes registeres routes
func (ws *WSServer) RegisterRoutes(r *mux.Router) { func (ws *Server) RegisterRoutes(r *mux.Router) {
r.HandleFunc("/ws/onchange", ws.handleWebSocketOnChange) r.HandleFunc("/ws/onchange", ws.handleWebSocketOnChange)
} }
// AddListener adds a listener for a block's change // AddListener adds a listener for a block's change
func (ws *WSServer) AddListener(client *websocket.Conn, blockIDs []string) { func (ws *Server) AddListener(client *websocket.Conn, blockIDs []string) {
ws.mu.Lock() ws.mu.Lock()
for _, blockID := range blockIDs { for _, blockID := range blockIDs {
if ws.listeners[blockID] == nil { if ws.listeners[blockID] == nil {
ws.listeners[blockID] = []*websocket.Conn{} ws.listeners[blockID] = []*websocket.Conn{}
} }
ws.listeners[blockID] = append(ws.listeners[blockID], client) ws.listeners[blockID] = append(ws.listeners[blockID], client)
} }
ws.mu.Unlock() ws.mu.Unlock()
} }
// RemoveListener removes a webSocket listener from all blocks // RemoveListener removes a webSocket listener from all blocks
func (ws *WSServer) RemoveListener(client *websocket.Conn) { func (ws *Server) RemoveListener(client *websocket.Conn) {
ws.mu.Lock() ws.mu.Lock()
for key, clients := range ws.listeners { for key, clients := range ws.listeners {
var listeners = []*websocket.Conn{} var listeners = []*websocket.Conn{}
for _, existingClient := range clients { for _, existingClient := range clients {
if client != existingClient { if client != existingClient {
listeners = append(listeners, existingClient) listeners = append(listeners, existingClient)
@ -43,7 +45,7 @@ func (ws *WSServer) RemoveListener(client *websocket.Conn) {
} }
// RemoveListenerFromBlocks removes a webSocket listener from a set of block // RemoveListenerFromBlocks removes a webSocket listener from a set of block
func (ws *WSServer) RemoveListenerFromBlocks(client *websocket.Conn, blockIDs []string) { func (ws *Server) RemoveListenerFromBlocks(client *websocket.Conn, blockIDs []string) {
ws.mu.Lock() ws.mu.Lock()
for _, blockID := range blockIDs { for _, blockID := range blockIDs {
@ -58,6 +60,7 @@ func (ws *WSServer) RemoveListenerFromBlocks(client *websocket.Conn, blockIDs []
if client == listener { if client == listener {
newListeners := append(listeners[:index], listeners[index+1:]...) newListeners := append(listeners[:index], listeners[index+1:]...)
ws.listeners[blockID] = newListeners ws.listeners[blockID] = newListeners
break break
} }
} }
@ -67,7 +70,7 @@ func (ws *WSServer) RemoveListenerFromBlocks(client *websocket.Conn, blockIDs []
} }
// GetListeners returns the listeners to a blockID's changes // GetListeners returns the listeners to a blockID's changes
func (ws *WSServer) GetListeners(blockID string) []*websocket.Conn { func (ws *Server) GetListeners(blockID string) []*websocket.Conn {
ws.mu.Lock() ws.mu.Lock()
listeners := ws.listeners[blockID] listeners := ws.listeners[blockID]
ws.mu.Unlock() ws.mu.Unlock()
@ -75,16 +78,16 @@ func (ws *WSServer) GetListeners(blockID string) []*websocket.Conn {
return listeners return listeners
} }
// WSServer is a WebSocket server // Server is a WebSocket server
type WSServer struct { type Server struct {
upgrader websocket.Upgrader upgrader websocket.Upgrader
listeners map[string][]*websocket.Conn listeners map[string][]*websocket.Conn
mu sync.RWMutex mu sync.RWMutex
} }
// NewWSServer creates a new WSServer // NewServer creates a new Server
func NewWSServer() *WSServer { func NewServer() *Server {
return &WSServer{ return &Server{
listeners: make(map[string][]*websocket.Conn), listeners: make(map[string][]*websocket.Conn),
upgrader: websocket.Upgrader{ upgrader: websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { CheckOrigin: func(r *http.Request) bool {
@ -106,7 +109,7 @@ type WebsocketCommand struct {
BlockIDs []string `json:"blockIds"` BlockIDs []string `json:"blockIds"`
} }
func (ws *WSServer) handleWebSocketOnChange(w http.ResponseWriter, r *http.Request) { func (ws *Server) handleWebSocketOnChange(w http.ResponseWriter, r *http.Request) {
// Upgrade initial GET request to a websocket // Upgrade initial GET request to a websocket
client, err := ws.upgrader.Upgrade(w, r, nil) client, err := ws.upgrader.Upgrade(w, r, nil)
if err != nil { if err != nil {
@ -133,6 +136,7 @@ func (ws *WSServer) handleWebSocketOnChange(w http.ResponseWriter, r *http.Reque
if err != nil { if err != nil {
log.Printf("ERROR WebSocket onChange, client: %s, err: %v", client.RemoteAddr(), err) log.Printf("ERROR WebSocket onChange, client: %s, err: %v", client.RemoteAddr(), err)
ws.RemoveListener(client) ws.RemoveListener(client)
break break
} }
@ -141,6 +145,7 @@ func (ws *WSServer) handleWebSocketOnChange(w http.ResponseWriter, r *http.Reque
if err != nil { if err != nil {
// handle this error // handle this error
log.Printf(`ERROR webSocket parsing command JSON: %v`, string(p)) log.Printf(`ERROR webSocket parsing command JSON: %v`, string(p))
continue continue
} }
@ -148,9 +153,11 @@ func (ws *WSServer) handleWebSocketOnChange(w http.ResponseWriter, r *http.Reque
case "ADD": case "ADD":
log.Printf(`Command: Add blockID: %v, client: %s`, command.BlockIDs, client.RemoteAddr()) log.Printf(`Command: Add blockID: %v, client: %s`, command.BlockIDs, client.RemoteAddr())
ws.AddListener(client, command.BlockIDs) ws.AddListener(client, command.BlockIDs)
case "REMOVE": case "REMOVE":
log.Printf(`Command: Remove blockID: %v, client: %s`, command.BlockIDs, client.RemoteAddr()) log.Printf(`Command: Remove blockID: %v, client: %s`, command.BlockIDs, client.RemoteAddr())
ws.RemoveListenerFromBlocks(client, command.BlockIDs) ws.RemoveListenerFromBlocks(client, command.BlockIDs)
default: default:
log.Printf(`ERROR webSocket command, invalid action: %v`, command.Action) log.Printf(`ERROR webSocket command, invalid action: %v`, command.Action)
} }
@ -158,7 +165,7 @@ func (ws *WSServer) handleWebSocketOnChange(w http.ResponseWriter, r *http.Reque
} }
// BroadcastBlockChangeToWebsocketClients broadcasts change to clients // BroadcastBlockChangeToWebsocketClients broadcasts change to clients
func (ws *WSServer) BroadcastBlockChangeToWebsocketClients(blockIDs []string) { func (ws *Server) BroadcastBlockChangeToWebsocketClients(blockIDs []string) {
for _, blockID := range blockIDs { for _, blockID := range blockIDs {
listeners := ws.GetListeners(blockID) listeners := ws.GetListeners(blockID)
log.Printf("%d listener(s) for blockID: %s", len(listeners), blockID) log.Printf("%d listener(s) for blockID: %s", len(listeners), blockID)
@ -168,6 +175,7 @@ func (ws *WSServer) BroadcastBlockChangeToWebsocketClients(blockIDs []string) {
Action: "UPDATE_BLOCK", Action: "UPDATE_BLOCK",
BlockID: blockID, BlockID: blockID,
} }
for _, listener := range listeners { for _, listener := range listeners {
log.Printf("Broadcast change, blockID: %s, remoteAddr: %s", blockID, listener.RemoteAddr()) log.Printf("Broadcast change, blockID: %s, remoteAddr: %s", blockID, listener.RemoteAddr())
err := listener.WriteJSON(message) err := listener.WriteJSON(message)