369 lines
10 KiB
Go
369 lines
10 KiB
Go
|
// Copyright 2015 PingCAP, Inc.
|
||
|
//
|
||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
// you may not use this file except in compliance with the License.
|
||
|
// You may obtain a copy of the License at
|
||
|
//
|
||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||
|
//
|
||
|
// Unless required by applicable law or agreed to in writing, software
|
||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
// See the License for the specific language governing permissions and
|
||
|
// limitations under the License.
|
||
|
|
||
|
package tidb
|
||
|
|
||
|
import (
|
||
|
"flag"
|
||
|
"fmt"
|
||
|
"os"
|
||
|
"runtime"
|
||
|
"strconv"
|
||
|
"strings"
|
||
|
"time"
|
||
|
|
||
|
"github.com/opentracing/opentracing-go"
|
||
|
"github.com/pingcap/errors"
|
||
|
"github.com/pingcap/parser/mysql"
|
||
|
"github.com/pingcap/parser/terror"
|
||
|
"github.com/pingcap/pd/client"
|
||
|
pumpcli "github.com/pingcap/tidb-tools/tidb-binlog/pump_client"
|
||
|
"github.com/pingcap/tidb/config"
|
||
|
"github.com/pingcap/tidb/ddl"
|
||
|
"github.com/pingcap/tidb/domain"
|
||
|
"github.com/pingcap/tidb/kv"
|
||
|
"github.com/pingcap/tidb/metrics"
|
||
|
plannercore "github.com/pingcap/tidb/planner/core"
|
||
|
"github.com/pingcap/tidb/privilege/privileges"
|
||
|
"github.com/pingcap/tidb/server"
|
||
|
"github.com/pingcap/tidb/session"
|
||
|
"github.com/pingcap/tidb/sessionctx/binloginfo"
|
||
|
"github.com/pingcap/tidb/sessionctx/variable"
|
||
|
"github.com/pingcap/tidb/statistics"
|
||
|
"github.com/pingcap/tidb/store/mockstore"
|
||
|
"github.com/pingcap/tidb/store/tikv"
|
||
|
"github.com/pingcap/tidb/store/tikv/gcworker"
|
||
|
"github.com/pingcap/tidb/util/logutil"
|
||
|
"github.com/pingcap/tidb/util/printer"
|
||
|
"github.com/pingcap/tidb/util/signal"
|
||
|
"github.com/pingcap/tidb/util/systimemon"
|
||
|
"github.com/pingcap/tidb/x-server"
|
||
|
"github.com/prometheus/client_golang/prometheus"
|
||
|
"github.com/prometheus/client_golang/prometheus/push"
|
||
|
log "github.com/sirupsen/logrus"
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
cfg *config.Config
|
||
|
storage kv.Storage
|
||
|
dom *domain.Domain
|
||
|
svr *server.Server
|
||
|
xsvr *xserver.Server
|
||
|
graceful bool
|
||
|
)
|
||
|
|
||
|
// Start the TiDB server using the configuration provided
|
||
|
func Start(path string, port uint, host string, debug bool) {
|
||
|
registerStores()
|
||
|
registerMetrics()
|
||
|
loadConfig()
|
||
|
|
||
|
cfg.Log.Level = "error"
|
||
|
cfg.Path = path
|
||
|
cfg.Store = "mocktikv"
|
||
|
if debug && host == "" {
|
||
|
cfg.Host = "0.0.0.0"
|
||
|
} else {
|
||
|
cfg.Host = "localhost"
|
||
|
}
|
||
|
cfg.Port = port
|
||
|
cfg.Status.ReportStatus = false
|
||
|
|
||
|
validateConfig()
|
||
|
setGlobalVars()
|
||
|
setupLog()
|
||
|
setupTracing() // Should before createServer and after setup config.
|
||
|
printInfo()
|
||
|
setupBinlogClient()
|
||
|
setupMetrics()
|
||
|
createStoreAndDomain()
|
||
|
createServer()
|
||
|
signal.SetupSignalHandler(serverShutdown)
|
||
|
runServer()
|
||
|
cleanup()
|
||
|
os.Exit(0)
|
||
|
}
|
||
|
|
||
|
func registerStores() {
|
||
|
err := session.RegisterStore("tikv", tikv.Driver{})
|
||
|
terror.MustNil(err)
|
||
|
tikv.NewGCHandlerFunc = gcworker.NewGCWorker
|
||
|
err = session.RegisterStore("mocktikv", mockstore.MockDriver{})
|
||
|
terror.MustNil(err)
|
||
|
}
|
||
|
|
||
|
func registerMetrics() {
|
||
|
metrics.RegisterMetrics()
|
||
|
}
|
||
|
|
||
|
func createStoreAndDomain() {
|
||
|
fullPath := fmt.Sprintf("%s://%s", cfg.Store, cfg.Path)
|
||
|
var err error
|
||
|
storage, err = session.NewStore(fullPath)
|
||
|
terror.MustNil(err)
|
||
|
// Bootstrap a session to load information schema.
|
||
|
dom, err = session.BootstrapSession(storage)
|
||
|
terror.MustNil(err)
|
||
|
}
|
||
|
|
||
|
func setupBinlogClient() {
|
||
|
if !cfg.Binlog.Enable {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if cfg.Binlog.IgnoreError {
|
||
|
binloginfo.SetIgnoreError(true)
|
||
|
}
|
||
|
|
||
|
client, err := pumpcli.NewPumpsClient(cfg.Path, parseDuration(cfg.Binlog.WriteTimeout), pd.SecurityOption{
|
||
|
CAPath: cfg.Security.ClusterSSLCA,
|
||
|
CertPath: cfg.Security.ClusterSSLCert,
|
||
|
KeyPath: cfg.Security.ClusterSSLKey,
|
||
|
})
|
||
|
terror.MustNil(err)
|
||
|
|
||
|
binloginfo.SetPumpsClient(client)
|
||
|
log.Infof("create pumps client success, ignore binlog error %v", cfg.Binlog.IgnoreError)
|
||
|
}
|
||
|
|
||
|
// Prometheus push.
|
||
|
const zeroDuration = time.Duration(0)
|
||
|
|
||
|
// pushMetric pushes metrics in background.
|
||
|
func pushMetric(addr string, interval time.Duration) {
|
||
|
if interval == zeroDuration || len(addr) == 0 {
|
||
|
log.Info("disable Prometheus push client")
|
||
|
return
|
||
|
}
|
||
|
log.Infof("start Prometheus push client with server addr %s and interval %s", addr, interval)
|
||
|
go prometheusPushClient(addr, interval)
|
||
|
}
|
||
|
|
||
|
// prometheusPushClient pushes metrics to Prometheus Pushgateway.
|
||
|
func prometheusPushClient(addr string, interval time.Duration) {
|
||
|
// TODO: TiDB do not have uniq name, so we use host+port to compose a name.
|
||
|
job := "tidb"
|
||
|
for {
|
||
|
err := push.AddFromGatherer(
|
||
|
job,
|
||
|
map[string]string{"instance": instanceName()},
|
||
|
addr,
|
||
|
prometheus.DefaultGatherer,
|
||
|
)
|
||
|
if err != nil {
|
||
|
log.Errorf("could not push metrics to Prometheus Pushgateway: %v", err)
|
||
|
}
|
||
|
time.Sleep(interval)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func instanceName() string {
|
||
|
hostname, err := os.Hostname()
|
||
|
if err != nil {
|
||
|
return "unknown"
|
||
|
}
|
||
|
return fmt.Sprintf("%s_%d", hostname, cfg.Port)
|
||
|
}
|
||
|
|
||
|
// parseDuration parses lease argument string.
|
||
|
func parseDuration(lease string) time.Duration {
|
||
|
dur, err := time.ParseDuration(lease)
|
||
|
if err != nil {
|
||
|
dur, err = time.ParseDuration(lease + "s")
|
||
|
}
|
||
|
if err != nil || dur < 0 {
|
||
|
log.Fatalf("invalid lease duration %s", lease)
|
||
|
}
|
||
|
return dur
|
||
|
}
|
||
|
|
||
|
func hasRootPrivilege() bool {
|
||
|
return os.Geteuid() == 0
|
||
|
}
|
||
|
|
||
|
func flagBoolean(name string, defaultVal bool, usage string) *bool {
|
||
|
if defaultVal == false {
|
||
|
// Fix #4125, golang do not print default false value in usage, so we append it.
|
||
|
usage = fmt.Sprintf("%s (default false)", usage)
|
||
|
return flag.Bool(name, defaultVal, usage)
|
||
|
}
|
||
|
return flag.Bool(name, defaultVal, usage)
|
||
|
}
|
||
|
|
||
|
func loadConfig() {
|
||
|
cfg = config.GetGlobalConfig()
|
||
|
}
|
||
|
|
||
|
func validateConfig() {
|
||
|
if cfg.Security.SkipGrantTable && !hasRootPrivilege() {
|
||
|
log.Error("TiDB run with skip-grant-table need root privilege.")
|
||
|
os.Exit(-1)
|
||
|
}
|
||
|
if _, ok := config.ValidStorage[cfg.Store]; !ok {
|
||
|
nameList := make([]string, 0, len(config.ValidStorage))
|
||
|
for k, v := range config.ValidStorage {
|
||
|
if v {
|
||
|
nameList = append(nameList, k)
|
||
|
}
|
||
|
}
|
||
|
log.Errorf("\"store\" should be in [%s] only", strings.Join(nameList, ", "))
|
||
|
os.Exit(-1)
|
||
|
}
|
||
|
if cfg.Store == "mocktikv" && cfg.RunDDL == false {
|
||
|
log.Errorf("can't disable DDL on mocktikv")
|
||
|
os.Exit(-1)
|
||
|
}
|
||
|
if cfg.Log.File.MaxSize > config.MaxLogFileSize {
|
||
|
log.Errorf("log max-size should not be larger than %d MB", config.MaxLogFileSize)
|
||
|
os.Exit(-1)
|
||
|
}
|
||
|
if cfg.XProtocol.XServer {
|
||
|
log.Error("X Server is not available")
|
||
|
os.Exit(-1)
|
||
|
}
|
||
|
cfg.OOMAction = strings.ToLower(cfg.OOMAction)
|
||
|
|
||
|
// lower_case_table_names is allowed to be 0, 1, 2
|
||
|
if cfg.LowerCaseTableNames < 0 || cfg.LowerCaseTableNames > 2 {
|
||
|
log.Errorf("lower-case-table-names should be 0 or 1 or 2.")
|
||
|
os.Exit(-1)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func setGlobalVars() {
|
||
|
ddlLeaseDuration := parseDuration(cfg.Lease)
|
||
|
session.SetSchemaLease(ddlLeaseDuration)
|
||
|
runtime.GOMAXPROCS(int(cfg.Performance.MaxProcs))
|
||
|
statsLeaseDuration := parseDuration(cfg.Performance.StatsLease)
|
||
|
session.SetStatsLease(statsLeaseDuration)
|
||
|
domain.RunAutoAnalyze = cfg.Performance.RunAutoAnalyze
|
||
|
statistics.FeedbackProbability = cfg.Performance.FeedbackProbability
|
||
|
statistics.MaxQueryFeedbackCount = int(cfg.Performance.QueryFeedbackLimit)
|
||
|
statistics.RatioOfPseudoEstimate = cfg.Performance.PseudoEstimateRatio
|
||
|
ddl.RunWorker = cfg.RunDDL
|
||
|
ddl.EnableSplitTableRegion = cfg.SplitTable
|
||
|
plannercore.AllowCartesianProduct = cfg.Performance.CrossJoin
|
||
|
privileges.SkipWithGrant = cfg.Security.SkipGrantTable
|
||
|
variable.ForcePriority = int32(mysql.Str2Priority(cfg.Performance.ForcePriority))
|
||
|
|
||
|
variable.SysVars[variable.TIDBMemQuotaQuery].Value = strconv.FormatInt(cfg.MemQuotaQuery, 10)
|
||
|
variable.SysVars["lower_case_table_names"].Value = strconv.Itoa(cfg.LowerCaseTableNames)
|
||
|
|
||
|
plannercore.SetPreparedPlanCache(cfg.PreparedPlanCache.Enabled)
|
||
|
if plannercore.PreparedPlanCacheEnabled() {
|
||
|
plannercore.PreparedPlanCacheCapacity = cfg.PreparedPlanCache.Capacity
|
||
|
}
|
||
|
|
||
|
if cfg.TiKVClient.GrpcConnectionCount > 0 {
|
||
|
tikv.MaxConnectionCount = cfg.TiKVClient.GrpcConnectionCount
|
||
|
}
|
||
|
tikv.GrpcKeepAliveTime = time.Duration(cfg.TiKVClient.GrpcKeepAliveTime) * time.Second
|
||
|
tikv.GrpcKeepAliveTimeout = time.Duration(cfg.TiKVClient.GrpcKeepAliveTimeout) * time.Second
|
||
|
|
||
|
tikv.CommitMaxBackoff = int(parseDuration(cfg.TiKVClient.CommitTimeout).Seconds() * 1000)
|
||
|
}
|
||
|
|
||
|
func setupLog() {
|
||
|
err := logutil.InitLogger(cfg.Log.ToLogConfig())
|
||
|
terror.MustNil(err)
|
||
|
}
|
||
|
|
||
|
func printInfo() {
|
||
|
// Make sure the TiDB info is always printed.
|
||
|
level := log.GetLevel()
|
||
|
log.SetLevel(log.InfoLevel)
|
||
|
printer.PrintTiDBInfo()
|
||
|
log.SetLevel(level)
|
||
|
}
|
||
|
|
||
|
func createServer() {
|
||
|
var driver server.IDriver
|
||
|
driver = server.NewTiDBDriver(storage)
|
||
|
var err error
|
||
|
svr, err = server.NewServer(cfg, driver)
|
||
|
// Both domain and storage have started, so we have to clean them before exiting.
|
||
|
terror.MustNil(err, closeDomainAndStorage)
|
||
|
if cfg.XProtocol.XServer {
|
||
|
xcfg := &xserver.Config{
|
||
|
Addr: fmt.Sprintf("%s:%d", cfg.XProtocol.XHost, cfg.XProtocol.XPort),
|
||
|
Socket: cfg.XProtocol.XSocket,
|
||
|
TokenLimit: cfg.TokenLimit,
|
||
|
}
|
||
|
xsvr, err = xserver.NewServer(xcfg)
|
||
|
terror.MustNil(err, closeDomainAndStorage)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func serverShutdown(isgraceful bool) {
|
||
|
if isgraceful {
|
||
|
graceful = true
|
||
|
}
|
||
|
if xsvr != nil {
|
||
|
xsvr.Close() // Should close xserver before server.
|
||
|
}
|
||
|
svr.Close()
|
||
|
}
|
||
|
|
||
|
func setupMetrics() {
|
||
|
// Enable the mutex profile, 1/10 of mutex blocking event sampling.
|
||
|
runtime.SetMutexProfileFraction(10)
|
||
|
systimeErrHandler := func() {
|
||
|
metrics.TimeJumpBackCounter.Inc()
|
||
|
}
|
||
|
callBackCount := 0
|
||
|
sucessCallBack := func() {
|
||
|
callBackCount++
|
||
|
// It is callback by monitor per second, we increase metrics.KeepAliveCounter per 5s.
|
||
|
if callBackCount >= 5 {
|
||
|
callBackCount = 0
|
||
|
metrics.KeepAliveCounter.Inc()
|
||
|
}
|
||
|
}
|
||
|
go systimemon.StartMonitor(time.Now, systimeErrHandler, sucessCallBack)
|
||
|
|
||
|
pushMetric(cfg.Status.MetricsAddr, time.Duration(cfg.Status.MetricsInterval)*time.Second)
|
||
|
}
|
||
|
|
||
|
func setupTracing() {
|
||
|
tracingCfg := cfg.OpenTracing.ToTracingConfig()
|
||
|
tracer, _, err := tracingCfg.New("TiDB")
|
||
|
if err != nil {
|
||
|
log.Fatal("cannot initialize Jaeger Tracer", err)
|
||
|
}
|
||
|
opentracing.SetGlobalTracer(tracer)
|
||
|
}
|
||
|
|
||
|
func runServer() {
|
||
|
err := svr.Run()
|
||
|
terror.MustNil(err)
|
||
|
if cfg.XProtocol.XServer {
|
||
|
err := xsvr.Run()
|
||
|
terror.MustNil(err)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func closeDomainAndStorage() {
|
||
|
dom.Close()
|
||
|
err := storage.Close()
|
||
|
terror.Log(errors.Trace(err))
|
||
|
}
|
||
|
|
||
|
func cleanup() {
|
||
|
if graceful {
|
||
|
svr.GracefulDown()
|
||
|
} else {
|
||
|
svr.KillAllConnections()
|
||
|
}
|
||
|
closeDomainAndStorage()
|
||
|
}
|