71 lines
1.8 KiB
Go
71 lines
1.8 KiB
Go
package trans
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"os"
|
||
"sort"
|
||
"strings"
|
||
"time"
|
||
|
||
"github.com/didi/nightingale/v5/config"
|
||
"github.com/didi/nightingale/v5/models"
|
||
"github.com/didi/nightingale/v5/naming"
|
||
"github.com/didi/nightingale/v5/pkg/ipool"
|
||
|
||
"github.com/toolkits/pkg/logger"
|
||
)
|
||
|
||
var connPools *ipool.ConnPools
|
||
var svcsCache string
|
||
|
||
func Start(ctx context.Context) {
|
||
// 初始化本包的数据结构,然后启动一个goroutine,周期性获取活着的judge实例,更新相应的pool、queue等
|
||
judgeConf := config.Config.Judge
|
||
connPools = ipool.NewConnPools(judgeConf.ConnMax, judgeConf.ConnIdle, judgeConf.ConnTimeout, judgeConf.CallTimeout, []string{})
|
||
|
||
if err := syncInstances(); err != nil {
|
||
fmt.Println("syncInstances fail:", err)
|
||
logger.Close()
|
||
os.Exit(1)
|
||
}
|
||
|
||
go loopSyncInstances()
|
||
}
|
||
|
||
func loopSyncInstances() {
|
||
interval := time.Duration(config.Config.Heartbeat.Interval) * time.Millisecond
|
||
for {
|
||
time.Sleep(interval)
|
||
if err := syncInstances(); err != nil {
|
||
logger.Warning("syncInstances fail:", err)
|
||
}
|
||
}
|
||
}
|
||
|
||
func syncInstances() error {
|
||
// 获取当前活着的所有实例
|
||
instances, err := models.InstanceGetAlive(config.EndpointName)
|
||
if err != nil {
|
||
logger.Warningf("mysql.error: get alive server instances fail: %v", err)
|
||
return err
|
||
}
|
||
|
||
// 排序,便于与内存中的实例列表做差别判断
|
||
sort.Strings(instances)
|
||
|
||
// 如果列表变化,就去处理,并且要更新内存变量serverStr
|
||
newSvcs := strings.Join(instances, ",")
|
||
if newSvcs != svcsCache {
|
||
// 如果有新实例,创建对应的连接池,如果实例少了,删掉没用的连接池
|
||
connPools.UpdatePools(instances)
|
||
// 如果有新实例,创建对应的Queue,如果实例少了,删掉对应的Queue
|
||
queues.Update(instances)
|
||
// 重建哈希环
|
||
naming.RebuildConsistentHashRing(instances)
|
||
svcsCache = newSvcs
|
||
}
|
||
|
||
return nil
|
||
}
|