2020-10-16 22:27:16 +02:00
package sqlstore
import (
2021-04-17 09:06:57 +02:00
"bytes"
2022-03-22 15:24:34 +01:00
"context"
2021-05-24 19:06:11 +02:00
"database/sql"
2022-03-22 15:24:34 +01:00
"embed"
2022-03-26 00:21:56 +01:00
"errors"
2021-04-17 09:06:57 +02:00
"fmt"
2022-03-26 00:21:56 +01:00
"github.com/mattermost/focalboard/server/utils"
"strconv"
2021-04-17 09:06:57 +02:00
"text/template"
2020-10-22 13:34:42 +02:00
2022-03-22 15:24:34 +01:00
"github.com/mattermost/morph/models"
"github.com/mattermost/mattermost-server/v6/shared/mlog"
"github.com/mattermost/morph"
drivers "github.com/mattermost/morph/drivers"
mysql "github.com/mattermost/morph/drivers/mysql"
postgres "github.com/mattermost/morph/drivers/postgres"
sqlite "github.com/mattermost/morph/drivers/sqlite"
2022-03-26 00:05:56 +01:00
embedded "github.com/mattermost/morph/sources/embedded"
2022-03-22 15:24:34 +01:00
2021-05-24 19:06:11 +02:00
mysqldriver "github.com/go-sql-driver/mysql"
2021-07-09 03:09:02 +02:00
_ "github.com/lib/pq" // postgres driver
2021-11-11 17:01:43 +01:00
2022-03-22 15:24:34 +01:00
sq "github.com/Masterminds/squirrel"
"github.com/mattermost/focalboard/server/model"
2021-11-11 17:01:43 +01:00
"github.com/mattermost/mattermost-plugin-api/cluster"
)
2022-03-22 15:24:34 +01:00
//go:embed migrations
var assets embed . FS
2021-11-11 17:01:43 +01:00
const (
2022-03-26 00:21:56 +01:00
uniqueIDsMigrationRequiredVersion = 14
teamsAndBoardsMigrationRequiredVersion = 17
teamLessBoardsMigrationKey = "TeamLessBoardsMigrationComplete"
2021-04-17 09:06:57 +02:00
2022-03-22 15:24:34 +01:00
tempSchemaMigrationTableName = "temp_schema_migration"
)
2021-04-17 09:06:57 +02:00
2022-03-26 00:21:56 +01:00
var errChannelCreatorNotInTeam = errors . New ( "channel creator not found in user teams" )
2021-06-11 11:18:11 +02:00
func appendMultipleStatementsFlag ( connectionString string ) ( string , error ) {
config , err := mysqldriver . ParseDSN ( connectionString )
if err != nil {
return "" , err
}
if config . Params == nil {
config . Params = map [ string ] string { }
}
config . Params [ "multiStatements" ] = "true"
return config . FormatDSN ( ) , nil
}
2021-05-24 19:06:11 +02:00
// migrations in MySQL need to run with the multiStatements flag
// enabled, so this method creates a new connection ensuring that it's
2021-06-21 11:21:42 +02:00
// enabled.
2021-08-02 16:48:15 +02:00
func ( s * SQLStore ) getMigrationConnection ( ) ( * sql . DB , error ) {
connectionString := s . connectionString
2022-03-22 15:24:34 +01:00
if s . dbType == model . MysqlDBType {
2021-08-02 16:48:15 +02:00
var err error
connectionString , err = appendMultipleStatementsFlag ( s . connectionString )
if err != nil {
return nil , err
}
2021-05-24 19:06:11 +02:00
}
db , err := sql . Open ( s . dbType , connectionString )
if err != nil {
return nil , err
}
if err = db . Ping ( ) ; err != nil {
return nil , err
}
return db , nil
}
2020-10-16 22:27:16 +02:00
func ( s * SQLStore ) Migrate ( ) error {
2022-03-22 15:24:34 +01:00
var driver drivers . Driver
2020-10-16 22:27:16 +02:00
var err error
2020-10-22 13:34:42 +02:00
2022-03-22 15:24:34 +01:00
migrationConfig := drivers . Config {
StatementTimeoutInSecs : 1000000 ,
MigrationsTable : fmt . Sprintf ( "%sschema_migrations" , s . tablePrefix ) ,
}
if s . dbType == model . SqliteDBType {
driver , err = sqlite . WithInstance ( s . db , & sqlite . Config { Config : migrationConfig } )
2020-10-18 02:07:35 +02:00
if err != nil {
return err
}
2020-10-16 22:27:16 +02:00
}
2020-10-22 13:34:42 +02:00
2022-03-22 15:24:34 +01:00
var db * sql . DB
if s . dbType != model . SqliteDBType {
db , err = s . getMigrationConnection ( )
if err != nil {
return err
}
defer db . Close ( )
2021-08-02 16:48:15 +02:00
}
2022-03-22 15:24:34 +01:00
if s . dbType == model . PostgresDBType {
driver , err = postgres . WithInstance ( db , & postgres . Config { Config : migrationConfig } )
2020-10-18 02:07:35 +02:00
if err != nil {
return err
}
2020-10-18 01:09:12 +02:00
}
2022-03-22 15:24:34 +01:00
if s . dbType == model . MysqlDBType {
driver , err = mysql . WithInstance ( db , & mysql . Config { Config : migrationConfig } )
2021-04-22 22:53:01 +02:00
if err != nil {
return err
}
}
2022-03-22 15:24:34 +01:00
assetsList , err := assets . ReadDir ( "migrations" )
if err != nil {
return err
}
assetNamesForDriver := make ( [ ] string , len ( assetsList ) )
for i , dirEntry := range assetsList {
assetNamesForDriver [ i ] = dirEntry . Name ( )
}
params := map [ string ] interface { } {
"prefix" : s . tablePrefix ,
"postgres" : s . dbType == model . PostgresDBType ,
"sqlite" : s . dbType == model . SqliteDBType ,
"mysql" : s . dbType == model . MysqlDBType ,
"plugin" : s . isPlugin ,
}
2022-03-26 00:05:56 +01:00
migrationAssets := & embedded . AssetSource {
2022-03-22 15:24:34 +01:00
Names : assetNamesForDriver ,
AssetFunc : func ( name string ) ( [ ] byte , error ) {
2022-04-12 21:17:58 +02:00
asset , mErr := assets . ReadFile ( "migrations/" + name )
2022-03-22 15:24:34 +01:00
if mErr != nil {
return nil , mErr
}
tmpl , pErr := template . New ( "sql" ) . Parse ( string ( asset ) )
if pErr != nil {
return nil , pErr
}
buffer := bytes . NewBufferString ( "" )
err = tmpl . Execute ( buffer , params )
if err != nil {
return nil , err
}
return buffer . Bytes ( ) , nil
} ,
}
2021-04-20 11:27:20 +02:00
2022-03-26 00:05:56 +01:00
src , err := embedded . WithInstance ( migrationAssets )
2020-10-18 01:09:12 +02:00
if err != nil {
return err
2020-10-16 22:27:16 +02:00
}
2021-09-08 06:52:03 +02:00
2022-03-22 15:24:34 +01:00
opts := [ ] morph . EngineOption {
morph . WithLock ( "mm-lock-key" ) ,
2021-04-17 09:06:57 +02:00
}
2020-10-18 01:09:12 +02:00
2022-03-22 15:24:34 +01:00
if s . dbType == model . SqliteDBType {
opts = opts [ : 0 ] // sqlite driver does not support locking, it doesn't need to anyway.
}
engine , err := morph . New ( context . Background ( ) , driver , src , opts ... )
2020-10-16 22:27:16 +02:00
if err != nil {
return err
}
2022-03-22 15:24:34 +01:00
defer engine . Close ( )
2021-11-11 17:01:43 +01:00
var mutex * cluster . Mutex
if s . isPlugin {
var mutexErr error
mutex , mutexErr = s . NewMutexFn ( "Boards_dbMutex" )
if mutexErr != nil {
return fmt . Errorf ( "error creating database mutex: %w" , mutexErr )
}
}
if s . isPlugin {
s . logger . Debug ( "Acquiring cluster lock for Unique IDs migration" )
mutex . Lock ( )
}
2022-03-22 15:24:34 +01:00
if err := s . migrateSchemaVersionTable ( src . Migrations ( ) ) ; err != nil {
return err
}
if err := ensureMigrationsAppliedUpToVersion ( engine , driver , uniqueIDsMigrationRequiredVersion ) ; err != nil {
return err
}
2021-11-11 17:01:43 +01:00
if err := s . runUniqueIDsMigration ( ) ; err != nil {
if s . isPlugin {
s . logger . Debug ( "Releasing cluster lock for Unique IDs migration" )
mutex . Unlock ( )
}
return fmt . Errorf ( "error running unique IDs migration: %w" , err )
}
2022-03-22 15:24:34 +01:00
if err := ensureMigrationsAppliedUpToVersion ( engine , driver , categoriesUUIDIDMigrationRequiredVersion ) ; err != nil {
return err
}
if err := s . runCategoryUUIDIDMigration ( ) ; err != nil {
if s . isPlugin {
s . logger . Debug ( "Releasing cluster lock for Unique IDs migration" )
mutex . Unlock ( )
}
return fmt . Errorf ( "error running categoryID migration: %w" , err )
}
if err := s . deleteOldSchemaMigrationTable ( ) ; err != nil {
2022-03-26 00:21:56 +01:00
if s . isPlugin {
mutex . Unlock ( )
}
return err
}
if err := ensureMigrationsAppliedUpToVersion ( engine , driver , teamsAndBoardsMigrationRequiredVersion ) ; err != nil {
if s . isPlugin {
mutex . Unlock ( )
}
return err
}
if err := s . migrateTeamLessBoards ( ) ; err != nil {
if s . isPlugin {
mutex . Unlock ( )
}
2022-03-22 15:24:34 +01:00
return err
}
2021-11-11 17:01:43 +01:00
if s . isPlugin {
s . logger . Debug ( "Releasing cluster lock for Unique IDs migration" )
mutex . Unlock ( )
}
2022-03-22 15:24:34 +01:00
return engine . ApplyAll ( )
}
// migrateSchemaVersionTable converts the schema version table from
// the old format used by go-migrate to the new format used by
// gomorph.
// When running the Focalboard with go-migrate's schema version table
// existing in the database, gomorph is unable to make sense of it as it's
// not in the format required by gomorph.
func ( s * SQLStore ) migrateSchemaVersionTable ( migrations [ ] * models . Migration ) error {
migrationNeeded , err := s . isSchemaMigrationNeeded ( )
if err != nil {
return err
}
if ! migrationNeeded {
return nil
}
s . logger . Info ( "Migrating schema migration to new format" )
legacySchemaVersion , err := s . getLegacySchemaVersion ( )
if err != nil {
return err
}
if err := s . createTempSchemaTable ( ) ; err != nil {
return err
}
if err := s . populateTempSchemaTable ( migrations , legacySchemaVersion ) ; err != nil {
return err
}
if err := s . useNewSchemaTable ( ) ; err != nil {
2021-11-11 17:01:43 +01:00
return err
}
return nil
}
2022-03-22 15:24:34 +01:00
func ( s * SQLStore ) isSchemaMigrationNeeded ( ) ( bool , error ) {
// Check if `dirty` column exists on schema version table.
// This column exists only for the old schema version table.
// SQLite needs a bit of a special handling
if s . dbType == model . SqliteDBType {
return s . isSchemaMigrationNeededSQLite ( )
}
query := s . getQueryBuilder ( s . db ) .
Select ( "count(*)" ) .
From ( "information_schema.COLUMNS" ) .
Where ( sq . Eq {
"TABLE_NAME" : s . tablePrefix + "schema_migrations" ,
"COLUMN_NAME" : "dirty" ,
} )
row := query . QueryRow ( )
var count int
if err := row . Scan ( & count ) ; err != nil {
s . logger . Error ( "failed to check for columns of schema_migrations table" , mlog . Err ( err ) )
return false , err
}
return count == 1 , nil
}
func ( s * SQLStore ) isSchemaMigrationNeededSQLite ( ) ( bool , error ) {
// the way to check presence of a column is different
// for SQLite. Hence, the separate function
query := fmt . Sprintf ( "PRAGMA table_info(\"%sschema_migrations\");" , s . tablePrefix )
rows , err := s . db . Query ( query )
if err != nil {
s . logger . Error ( "SQLite - failed to check for columns in schema_migrations table" , mlog . Err ( err ) )
return false , err
}
defer s . CloseRows ( rows )
data := [ ] [ ] * string { }
for rows . Next ( ) {
// PRAGMA returns 6 columns
row := make ( [ ] * string , 6 )
err := rows . Scan (
& row [ 0 ] ,
& row [ 1 ] ,
& row [ 2 ] ,
& row [ 3 ] ,
& row [ 4 ] ,
& row [ 5 ] ,
)
if err != nil {
s . logger . Error ( "error scanning rows from SQLite schema_migrations table definition" , mlog . Err ( err ) )
return false , err
}
data = append ( data , row )
}
nameColumnFound := false
for _ , row := range data {
if len ( row ) >= 2 && * row [ 1 ] == "dirty" {
nameColumnFound = true
break
}
}
return nameColumnFound , nil
}
func ( s * SQLStore ) getLegacySchemaVersion ( ) ( uint32 , error ) {
query := s . getQueryBuilder ( s . db ) .
Select ( "version" ) .
From ( s . tablePrefix + "schema_migrations" )
row := query . QueryRow ( )
var version uint32
if err := row . Scan ( & version ) ; err != nil {
s . logger . Error ( "error fetching legacy schema version" , mlog . Err ( err ) )
s . logger . Error ( "getLegacySchemaVersion err " + err . Error ( ) )
return version , err
}
return version , nil
}
func ( s * SQLStore ) createTempSchemaTable ( ) error {
// squirrel doesn't support DDL query in query builder
// so, we need to use a plain old string
query := fmt . Sprintf ( "CREATE TABLE IF NOT EXISTS %s (Version bigint NOT NULL, Name varchar(64) NOT NULL, PRIMARY KEY (Version))" , s . tablePrefix + tempSchemaMigrationTableName )
if _ , err := s . db . Exec ( query ) ; err != nil {
s . logger . Error ( "failed to create temporary schema migration table" , mlog . Err ( err ) )
s . logger . Error ( "createTempSchemaTable error " + err . Error ( ) )
return err
}
return nil
}
func ( s * SQLStore ) populateTempSchemaTable ( migrations [ ] * models . Migration , legacySchemaVersion uint32 ) error {
query := s . getQueryBuilder ( s . db ) .
Insert ( s . tablePrefix + tempSchemaMigrationTableName ) .
Columns ( "Version" , "Name" )
for _ , migration := range migrations {
// migrations param contains both up and down variant for
// each migration. Skipping for either one (down in this case)
// to process a migration only a single time.
if migration . Direction == models . Down {
continue
}
if migration . Version > legacySchemaVersion {
break
}
query = query . Values ( migration . Version , migration . Name )
}
if _ , err := query . Exec ( ) ; err != nil {
s . logger . Error ( "failed to insert migration records into temporary schema table" , mlog . Err ( err ) )
return err
}
return nil
}
func ( s * SQLStore ) useNewSchemaTable ( ) error {
// first delete the old table, then
// rename the new table to old table's name
// renaming old schema migration table. Will delete later once the migration is
// complete, just in case.
var query string
if s . dbType == model . MysqlDBType {
query = fmt . Sprintf ( "RENAME TABLE `%sschema_migrations` TO `%sschema_migrations_old_temp`" , s . tablePrefix , s . tablePrefix )
} else {
query = fmt . Sprintf ( "ALTER TABLE %sschema_migrations RENAME TO %sschema_migrations_old_temp" , s . tablePrefix , s . tablePrefix )
}
if _ , err := s . db . Exec ( query ) ; err != nil {
s . logger . Error ( "failed to rename old schema migration table" , mlog . Err ( err ) )
return err
}
// renaming new temp table to old table's name
if s . dbType == model . MysqlDBType {
query = fmt . Sprintf ( "RENAME TABLE `%s%s` TO `%sschema_migrations`" , s . tablePrefix , tempSchemaMigrationTableName , s . tablePrefix )
} else {
query = fmt . Sprintf ( "ALTER TABLE %s%s RENAME TO %sschema_migrations" , s . tablePrefix , tempSchemaMigrationTableName , s . tablePrefix )
}
if _ , err := s . db . Exec ( query ) ; err != nil {
s . logger . Error ( "failed to rename temp schema table" , mlog . Err ( err ) )
return err
}
return nil
}
func ( s * SQLStore ) deleteOldSchemaMigrationTable ( ) error {
query := "DROP TABLE IF EXISTS " + s . tablePrefix + "schema_migrations_old_temp"
if _ , err := s . db . Exec ( query ) ; err != nil {
s . logger . Error ( "failed to delete old temp schema migrations table" , mlog . Err ( err ) )
return err
}
return nil
}
2022-03-26 00:21:56 +01:00
// We no longer support boards existing in DMs and private
// group messages. This function migrates all boards
// belonging to a DM to the best possible team.
func ( s * SQLStore ) migrateTeamLessBoards ( ) error {
if ! s . isPlugin {
return nil
}
setting , err := s . GetSystemSetting ( teamLessBoardsMigrationKey )
if err != nil {
return fmt . Errorf ( "cannot get teamless boards migration state: %w" , err )
}
// If the migration is already completed, do not run it again.
if hasAlreadyRun , _ := strconv . ParseBool ( setting ) ; hasAlreadyRun {
return nil
}
boards , err := s . getDMBoards ( s . db )
if err != nil {
return err
}
s . logger . Info ( fmt . Sprintf ( "Migrating %d teamless boards to a team" , len ( boards ) ) )
// cache for best suitable team for a DM. Since a DM can
// contain multiple boards, caching this avoids
// duplicate queries for the same DM.
channelToTeamCache := map [ string ] string { }
tx , err := s . db . BeginTx ( context . Background ( ) , nil )
if err != nil {
s . logger . Error ( "error starting transaction in migrateTeamLessBoards" , mlog . Err ( err ) )
return err
}
for i := range boards {
// check the cache first
teamID , ok := channelToTeamCache [ boards [ i ] . ChannelID ]
// query DB if entry not found in cache
if ! ok {
teamID , err = s . getBestTeamForBoard ( s . db , boards [ i ] )
if err != nil {
// don't let one board's error spoil
// the mood for others
continue
}
}
channelToTeamCache [ boards [ i ] . ChannelID ] = teamID
boards [ i ] . TeamID = teamID
query := s . getQueryBuilder ( tx ) .
Update ( s . tablePrefix + "boards" ) .
Set ( "team_id" , teamID ) .
Set ( "type" , model . BoardTypePrivate ) .
Where ( sq . Eq { "id" : boards [ i ] . ID } )
if _ , err := query . Exec ( ) ; err != nil {
s . logger . Error ( "failed to set team id for board" , mlog . String ( "board_id" , boards [ i ] . ID ) , mlog . String ( "team_id" , teamID ) , mlog . Err ( err ) )
return err
}
}
if err := s . setSystemSetting ( tx , teamLessBoardsMigrationKey , strconv . FormatBool ( true ) ) ; err != nil {
if rollbackErr := tx . Rollback ( ) ; rollbackErr != nil {
s . logger . Error ( "transaction rollback error" , mlog . Err ( rollbackErr ) , mlog . String ( "methodName" , "migrateTeamLessBoards" ) )
}
return fmt . Errorf ( "cannot mark migration as completed: %w" , err )
}
if err := tx . Commit ( ) ; err != nil {
s . logger . Error ( "failed to commit migrateTeamLessBoards transaction" , mlog . Err ( err ) )
return err
}
return nil
}
func ( s * SQLStore ) getDMBoards ( tx sq . BaseRunner ) ( [ ] * model . Board , error ) {
conditions := sq . And {
sq . Eq { "team_id" : "" } ,
sq . Or {
sq . Eq { "type" : "D" } ,
sq . Eq { "type" : "G" } ,
} ,
}
2022-03-28 10:02:01 +02:00
boards , err := s . getBoardsByCondition ( tx , conditions )
if err != nil && errors . Is ( err , sql . ErrNoRows ) {
return [ ] * model . Board { } , nil
}
return boards , err
2022-03-26 00:21:56 +01:00
}
// The destination is selected as the first team where all members
// of the DM are a part of. If no such team exists,
// we use the first team to which DM creator belongs to.
func ( s * SQLStore ) getBestTeamForBoard ( tx sq . BaseRunner , board * model . Board ) ( string , error ) {
userTeams , err := s . getBoardUserTeams ( tx , board )
if err != nil {
return "" , err
}
teams := [ ] [ ] interface { } { }
for _ , userTeam := range userTeams {
userTeamInterfaces := make ( [ ] interface { } , len ( userTeam ) )
for i := range userTeam {
userTeamInterfaces [ i ] = userTeam [ i ]
}
teams = append ( teams , userTeamInterfaces )
}
commonTeams := utils . Intersection ( teams ... )
var teamID string
if len ( commonTeams ) > 0 {
teamID = commonTeams [ 0 ] . ( string )
} else {
// no common teams found. Let's try finding the best suitable team
if board . Type == "D" {
// get DM's creator and pick one of their team
channel , appErr := ( * s . pluginAPI ) . GetChannel ( board . ChannelID )
if appErr != nil {
s . logger . Error ( "failed to fetch DM channel for board" , mlog . String ( "board_id" , board . ID ) , mlog . String ( "channel_id" , board . ChannelID ) , mlog . Err ( appErr ) )
return "" , appErr
}
if _ , ok := userTeams [ channel . CreatorId ] ; ! ok {
err := fmt . Errorf ( "%w board_id: %s, channel_id: %s, creator_id: %s" , errChannelCreatorNotInTeam , board . ID , board . ChannelID , channel . CreatorId )
s . logger . Error ( err . Error ( ) )
return "" , err
}
teamID = userTeams [ channel . CreatorId ] [ 0 ]
} else if board . Type == "G" {
// pick the team that has the most users as members
teamFrequency := map [ string ] int { }
highestFrequencyTeam := ""
highestFrequencyTeamFrequency := - 1
for _ , teams := range userTeams {
for _ , teamID := range teams {
teamFrequency [ teamID ] ++
if teamFrequency [ teamID ] > highestFrequencyTeamFrequency {
highestFrequencyTeamFrequency = teamFrequency [ teamID ]
highestFrequencyTeam = teamID
}
}
}
teamID = highestFrequencyTeam
}
}
return teamID , nil
}
func ( s * SQLStore ) getBoardUserTeams ( tx sq . BaseRunner , board * model . Board ) ( map [ string ] [ ] string , error ) {
query := s . getQueryBuilder ( tx ) .
Select ( "teammembers.userid" , "teammembers.teamid" ) .
From ( "channelmembers" ) .
Join ( "teammembers ON channelmembers.userid = teammembers.userid" ) .
Where ( sq . Eq { "channelid" : board . ChannelID } )
rows , err := query . Query ( )
if err != nil {
s . logger . Error ( "failed to fetch user teams for board" , mlog . String ( "boardID" , board . ID ) , mlog . String ( "channelID" , board . ChannelID ) , mlog . Err ( err ) )
return nil , err
}
defer rows . Close ( )
userTeams := map [ string ] [ ] string { }
for rows . Next ( ) {
var userID , teamID string
err := rows . Scan ( & userID , & teamID )
if err != nil {
s . logger . Error ( "getBoardUserTeams failed to scan SQL query result" , mlog . String ( "boardID" , board . ID ) , mlog . String ( "channelID" , board . ChannelID ) , mlog . Err ( err ) )
return nil , err
}
userTeams [ userID ] = append ( userTeams [ userID ] , teamID )
}
return userTeams , nil
}
2022-03-22 15:24:34 +01:00
func ensureMigrationsAppliedUpToVersion ( engine * morph . Morph , driver drivers . Driver , version int ) error {
applied , err := driver . AppliedMigrations ( )
if err != nil {
2021-11-11 17:01:43 +01:00
return err
}
2022-03-22 15:24:34 +01:00
currentVersion := len ( applied )
2021-11-11 17:01:43 +01:00
// if the target version is below or equal to the current one, do
// not migrate either because is not needed (both are equal) or
// because it would downgrade the database (is below)
if version <= currentVersion {
return nil
}
2022-03-22 15:24:34 +01:00
if _ , err = engine . Apply ( version - currentVersion ) ; err != nil {
2020-10-16 22:27:16 +02:00
return err
}
2020-10-22 13:34:42 +02:00
2020-10-16 22:27:16 +02:00
return nil
}