parent
b50b85f367
commit
2d8ffb43e6
|
@ -44,8 +44,8 @@ var (
|
|||
JudgeQueues = cache.SafeJudgeQueue{}
|
||||
|
||||
// 连接池 node_address -> connection_pool
|
||||
TsdbConnPools *ConnPools = &ConnPools{M: make(map[string]*pool.ConnPool)}
|
||||
JudgeConnPools *ConnPools = &ConnPools{M: make(map[string]*pool.ConnPool)}
|
||||
TsdbConnPools = &ConnPools{M: make(map[string]*pool.ConnPool)}
|
||||
JudgeConnPools = &ConnPools{M: make(map[string]*pool.ConnPool)}
|
||||
|
||||
connTimeout int32
|
||||
callTimeout int32
|
||||
|
@ -80,7 +80,6 @@ func initConnPools() {
|
|||
|
||||
JudgeConnPools = CreateConnPools(Config.MaxConns, Config.MaxIdle,
|
||||
Config.ConnTimeout, Config.CallTimeout, GetJudges())
|
||||
|
||||
}
|
||||
|
||||
func initSendQueues() {
|
||||
|
|
|
@ -14,7 +14,7 @@ import (
|
|||
"github.com/ugorji/go/codec"
|
||||
)
|
||||
|
||||
// 每个后端backend对应一个ConnPool
|
||||
// backend -> ConnPool
|
||||
type ConnPools struct {
|
||||
sync.RWMutex
|
||||
M map[string]*pool.ConnPool
|
||||
|
@ -39,10 +39,10 @@ func CreateConnPools(maxConns, maxIdle, connTimeout, callTimeout int, cluster []
|
|||
return cp
|
||||
}
|
||||
|
||||
func createOnePool(name string, address string, connTimeout time.Duration, maxConns int, maxIdle int) *pool.ConnPool {
|
||||
func createOnePool(name, address string, connTimeout time.Duration, maxConns, maxIdle int) *pool.ConnPool {
|
||||
p := pool.NewConnPool(name, address, maxConns, maxIdle)
|
||||
p.New = func(connName string) (pool.NConn, error) {
|
||||
//校验地址是否正确
|
||||
// check address
|
||||
_, err := net.ResolveTCPAddr("tcp", p.Address)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -83,6 +83,7 @@ func (cp *ConnPools) Update(cluster []string) {
|
|||
cp.M[address] = createOnePool(address, address, ct, maxConns, maxIdle)
|
||||
}
|
||||
|
||||
// delete invalid address from cp.M
|
||||
for address := range cp.M {
|
||||
if _, exists := newCluster[address]; !exists {
|
||||
delete(cp.M, address)
|
||||
|
@ -90,8 +91,8 @@ func (cp *ConnPools) Update(cluster []string) {
|
|||
}
|
||||
}
|
||||
|
||||
// 同步发送, 完成发送或超时后 才能返回
|
||||
func (cp *ConnPools) Call(addr, method string, args interface{}, resp interface{}) error {
|
||||
// Call will block until the request failed or timeout
|
||||
func (cp *ConnPools) Call(addr, method string, args, resp interface{}) error {
|
||||
connPool, exists := cp.Get(addr)
|
||||
if !exists {
|
||||
return fmt.Errorf("%s has no connection pool", addr)
|
||||
|
@ -132,7 +133,7 @@ func (cp *ConnPools) Get(address string) (*pool.ConnPool, bool) {
|
|||
return p, exists
|
||||
}
|
||||
|
||||
// RpcCient, 要实现io.Closer接口
|
||||
// RpcClient implements the io.Closer interface
|
||||
type RpcClient struct {
|
||||
cli *rpc.Client
|
||||
name string
|
||||
|
@ -155,6 +156,6 @@ func (rc RpcClient) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (rc RpcClient) Call(method string, args interface{}, reply interface{}) error {
|
||||
func (rc RpcClient) Call(method string, args, reply interface{}) error {
|
||||
return rc.cli.Call(method, args, reply)
|
||||
}
|
||||
|
|
|
@ -72,7 +72,7 @@ func FetchDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse {
|
|||
if len(input.Tags) == 0 {
|
||||
counter, err := GetCounter(input.Metric, "", nil)
|
||||
if err != nil {
|
||||
logger.Warning(err)
|
||||
logger.Warningf("get counter error: %+v", err)
|
||||
continue
|
||||
}
|
||||
worker <- struct{}{}
|
||||
|
@ -81,7 +81,7 @@ func FetchDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse {
|
|||
for _, tag := range input.Tags {
|
||||
counter, err := GetCounter(input.Metric, tag, nil)
|
||||
if err != nil {
|
||||
logger.Warning(err)
|
||||
logger.Warningf("get counter error: %+v", err)
|
||||
continue
|
||||
}
|
||||
worker <- struct{}{}
|
||||
|
@ -90,7 +90,7 @@ func FetchDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse {
|
|||
}
|
||||
}
|
||||
|
||||
//等待所有goroutine执行完成
|
||||
// 等待所有 goroutine 执行完成
|
||||
for i := 0; i < workerNum; i++ {
|
||||
worker <- struct{}{}
|
||||
}
|
||||
|
@ -101,15 +101,15 @@ func FetchDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse {
|
|||
//进行数据计算
|
||||
aggrDatas := make([]*dataobj.TsdbQueryResponse, 0)
|
||||
if input.AggrFunc != "" && len(resp) > 1 {
|
||||
|
||||
aggrCounter := make(map[string][]*dataobj.TsdbQueryResponse)
|
||||
|
||||
// 没有聚合 tag, 或者曲线没有其他 tags, 直接所有曲线进行计算
|
||||
if len(input.GroupKey) == 0 || getTags(resp[0].Counter) == "" {
|
||||
aggrData := &dataobj.TsdbQueryResponse{
|
||||
Start: input.Start,
|
||||
End: input.End,
|
||||
Values: calc.Compute(input.AggrFunc, resp),
|
||||
}
|
||||
//没有聚合 tag, 或者曲线没有其他 tags, 直接所有曲线进行计算
|
||||
aggrDatas = append(aggrDatas, aggrData)
|
||||
} else {
|
||||
for _, data := range resp {
|
||||
|
@ -117,14 +117,14 @@ func FetchDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse {
|
|||
|
||||
tagsMap, err := dataobj.SplitTagsString(getTags(data.Counter))
|
||||
if err != nil {
|
||||
logger.Warning(err)
|
||||
logger.Warningf("split tag string error: %+v", err)
|
||||
continue
|
||||
}
|
||||
tagsMap["endpoint"] = data.Endpoint
|
||||
|
||||
// 校验 GroupKey 是否在 tags 中
|
||||
for _, key := range input.GroupKey {
|
||||
value, exists := tagsMap[key]
|
||||
if exists {
|
||||
if value, exists := tagsMap[key]; exists {
|
||||
counterMap[key] = value
|
||||
}
|
||||
}
|
||||
|
@ -137,6 +137,7 @@ func FetchDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse {
|
|||
}
|
||||
}
|
||||
|
||||
// 有需要聚合的 tag 需要将 counter 带上
|
||||
for counter, datas := range aggrCounter {
|
||||
aggrData := &dataobj.TsdbQueryResponse{
|
||||
Start: input.Start,
|
||||
|
@ -156,7 +157,7 @@ func GetCounter(metric, tag string, tagMap map[string]string) (counter string, e
|
|||
if tagMap == nil {
|
||||
tagMap, err = dataobj.SplitTagsString(tag)
|
||||
if err != nil {
|
||||
logger.Warning(err, tag)
|
||||
logger.Warningf("split tag string error: %+v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -174,7 +175,7 @@ func fetchDataSync(start, end int64, consolFun, endpoint, counter string, step i
|
|||
|
||||
data, err := fetchData(start, end, consolFun, endpoint, counter, step)
|
||||
if err != nil {
|
||||
logger.Warning(err)
|
||||
logger.Warningf("fetch tsdb data error: %+v", err)
|
||||
stats.Counter.Set("query.data.err", 1)
|
||||
}
|
||||
dataChan <- data
|
||||
|
@ -189,6 +190,7 @@ func fetchData(start, end int64, consolFun, endpoint, counter string, step int)
|
|||
return resp, err
|
||||
}
|
||||
|
||||
// 没有获取到数据 做标记处理 math.NaN()
|
||||
if len(resp.Values) < 1 {
|
||||
ts := start - start%int64(60)
|
||||
count := (end - start) / 60
|
||||
|
@ -200,10 +202,10 @@ func fetchData(start, end int64, consolFun, endpoint, counter string, step int)
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
step := (end - start) / count // integer divide by zero
|
||||
step := (end - start) / count
|
||||
for i := 0; i < int(count); i++ {
|
||||
resp.Values = append(resp.Values, &dataobj.RRDData{Timestamp: ts, Value: dataobj.JsonFloat(math.NaN())})
|
||||
ts += int64(step)
|
||||
ts += step
|
||||
}
|
||||
}
|
||||
resp.Start = start
|
||||
|
@ -240,18 +242,18 @@ func QueryOne(para dataobj.TsdbQueryParam) (resp *dataobj.TsdbQueryResponse, err
|
|||
|
||||
count := len(pools)
|
||||
for _, i := range rand.Perm(count) {
|
||||
pool := pools[i].Pool
|
||||
onePool := pools[i].Pool
|
||||
addr := pools[i].Addr
|
||||
|
||||
conn, err := pool.Fetch()
|
||||
conn, err := onePool.Fetch()
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
logger.Errorf("fetch pool error: %+v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
rpcConn := conn.(RpcClient)
|
||||
if rpcConn.Closed() {
|
||||
pool.ForceClose(conn)
|
||||
onePool.ForceClose(conn)
|
||||
|
||||
err = errors.New("conn closed")
|
||||
logger.Error(err)
|
||||
|
@ -272,28 +274,26 @@ func QueryOne(para dataobj.TsdbQueryParam) (resp *dataobj.TsdbQueryResponse, err
|
|||
|
||||
select {
|
||||
case <-time.After(time.Duration(callTimeout) * time.Millisecond):
|
||||
pool.ForceClose(conn)
|
||||
logger.Errorf("%s, call timeout. proc: %s", addr, pool.Proc())
|
||||
onePool.ForceClose(conn)
|
||||
logger.Errorf("%s, call timeout. proc: %s", addr, onePool.Proc())
|
||||
break
|
||||
case r := <-ch:
|
||||
if r.Err != nil {
|
||||
pool.ForceClose(conn)
|
||||
logger.Errorf("%s, call failed, err %v. proc: %s", addr, r.Err, pool.Proc())
|
||||
onePool.ForceClose(conn)
|
||||
logger.Errorf("%s, call failed, err %v. proc: %s", addr, r.Err, onePool.Proc())
|
||||
break
|
||||
|
||||
} else {
|
||||
pool.Release(conn)
|
||||
onePool.Release(conn)
|
||||
if len(r.Resp.Values) < 1 {
|
||||
r.Resp.Values = []*dataobj.RRDData{}
|
||||
return r.Resp, nil
|
||||
}
|
||||
|
||||
fixed := []*dataobj.RRDData{}
|
||||
fixed := make([]*dataobj.RRDData, 0)
|
||||
for _, v := range r.Resp.Values {
|
||||
if v == nil || !(v.Timestamp >= start && v.Timestamp <= end) {
|
||||
continue
|
||||
}
|
||||
|
||||
fixed = append(fixed, v)
|
||||
}
|
||||
r.Resp.Values = fixed
|
||||
|
@ -324,16 +324,12 @@ func SelectPoolByPK(pk string) ([]Pool, error) {
|
|||
|
||||
var pools []Pool
|
||||
for _, addr := range nodeAddrs.Addrs {
|
||||
pool, found := TsdbConnPools.Get(addr)
|
||||
onePool, found := TsdbConnPools.Get(addr)
|
||||
if !found {
|
||||
logger.Errorf("addr %s not found", addr)
|
||||
continue
|
||||
}
|
||||
p := Pool{
|
||||
Pool: pool,
|
||||
Addr: addr,
|
||||
}
|
||||
pools = append(pools, p)
|
||||
pools = append(pools, Pool{Pool: onePool, Addr: addr})
|
||||
}
|
||||
|
||||
if len(pools) < 1 {
|
||||
|
@ -341,7 +337,6 @@ func SelectPoolByPK(pk string) ([]Pool, error) {
|
|||
}
|
||||
|
||||
return pools, nil
|
||||
|
||||
}
|
||||
|
||||
func getTags(counter string) (tags string) {
|
||||
|
@ -381,7 +376,7 @@ func GetSeries(start, end int64, req []SeriesReq) ([]dataobj.QueryData, error) {
|
|||
var queryDatas []dataobj.QueryData
|
||||
|
||||
if len(req) < 1 {
|
||||
return queryDatas, fmt.Errorf("req err")
|
||||
return queryDatas, fmt.Errorf("req length < 1")
|
||||
}
|
||||
|
||||
addrs := address.GetHTTPAddresses("index")
|
||||
|
@ -402,14 +397,13 @@ func GetSeries(start, end int64, req []SeriesReq) ([]dataobj.QueryData, error) {
|
|||
return nil, fmt.Errorf("index response status code != 200")
|
||||
}
|
||||
|
||||
err = json.Unmarshal(resp, &res)
|
||||
if err != nil {
|
||||
if err = json.Unmarshal(resp, &res); err != nil {
|
||||
logger.Error(string(resp))
|
||||
return queryDatas, err
|
||||
}
|
||||
|
||||
for _, item := range res.Dat {
|
||||
counters := []string{}
|
||||
counters := make([]string, 0)
|
||||
if len(item.Tags) == 0 {
|
||||
counters = append(counters, item.Metric)
|
||||
} else {
|
||||
|
|
|
@ -17,7 +17,7 @@ import (
|
|||
// send
|
||||
const (
|
||||
DefaultSendTaskSleepInterval = time.Millisecond * 50 //默认睡眠间隔为50ms
|
||||
MAX_SEND_RETRY = 10
|
||||
MaxSendRetry = 10
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -53,7 +53,7 @@ func startSendTasks() {
|
|||
}
|
||||
}
|
||||
|
||||
func Send2TsdbTask(Q *list.SafeListLimited, node string, addr string, concurrent int) {
|
||||
func Send2TsdbTask(Q *list.SafeListLimited, node, addr string, concurrent int) {
|
||||
batch := Config.Batch // 一次发送,最多batch条数据
|
||||
Q = TsdbQueues[node+addr]
|
||||
|
||||
|
@ -101,7 +101,7 @@ func Send2TsdbTask(Q *list.SafeListLimited, node string, addr string, concurrent
|
|||
}
|
||||
}
|
||||
|
||||
// 将数据 打入 某个Tsdb的发送缓存队列, 具体是哪一个Tsdb 由一致性哈希 决定
|
||||
// Push2TsdbSendQueue pushes data to a TSDB instance which depends on the consistent ring.
|
||||
func Push2TsdbSendQueue(items []*dataobj.MetricValue) {
|
||||
errCnt := 0
|
||||
for _, item := range items {
|
||||
|
@ -110,13 +110,14 @@ func Push2TsdbSendQueue(items []*dataobj.MetricValue) {
|
|||
|
||||
node, err := TsdbNodeRing.GetNode(item.PK())
|
||||
if err != nil {
|
||||
logger.Warning("E:", err)
|
||||
logger.Warningf("get tsdb node error: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
cnode := Config.ClusterList[node]
|
||||
for _, addr := range cnode.Addrs {
|
||||
Q := TsdbQueues[node+addr]
|
||||
// 队列已满
|
||||
if !Q.PushFront(tsdbItem) {
|
||||
errCnt += 1
|
||||
}
|
||||
|
@ -155,7 +156,7 @@ func Send2JudgeTask(Q *list.SafeListLimited, addr string, concurrent int) {
|
|||
resp := &dataobj.SimpleRpcResponse{}
|
||||
var err error
|
||||
sendOk := false
|
||||
for i := 0; i < MAX_SEND_RETRY; i++ {
|
||||
for i := 0; i < MaxSendRetry; i++ {
|
||||
err = JudgeConnPools.Call(addr, "Judge.Send", judgeItems, resp)
|
||||
if err == nil {
|
||||
sendOk = true
|
||||
|
@ -210,7 +211,7 @@ func Push2JudgeSendQueue(items []*dataobj.MetricValue) {
|
|||
stats.Counter.Set("judge.queue.err", errCnt)
|
||||
}
|
||||
|
||||
// 打到Tsdb的数据,要根据rrdtool的特定 来限制 step、counterType、timestamp
|
||||
// 打到 Tsdb 的数据,要根据 rrdtool 的特定 来限制 step、counterType、timestamp
|
||||
func convert2TsdbItem(d *dataobj.MetricValue) *dataobj.TsdbItem {
|
||||
item := &dataobj.TsdbItem{
|
||||
Endpoint: d.Endpoint,
|
||||
|
@ -241,7 +242,7 @@ func TagMatch(straTags []model.Tag, tag map[string]string) bool {
|
|||
return false
|
||||
}
|
||||
var match bool
|
||||
if stag.Topt == "=" { //当前策略tagkey对应的tagv
|
||||
if stag.Topt == "=" { //当前策略 tagkey 对应的 tagv
|
||||
for _, v := range stag.Tval {
|
||||
if tag[stag.Tkey] == v {
|
||||
match = true
|
||||
|
|
|
@ -35,7 +35,7 @@ func (s *SafeStraMap) GetByKey(key string) []*model.Stra {
|
|||
func (s *SafeStraMap) GetAll() []*model.Stra {
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
stras := []*model.Stra{}
|
||||
stras := make([]*model.Stra, 0)
|
||||
for _, m := range s.M {
|
||||
for _, stra := range m {
|
||||
stras = append(stras, stra...)
|
||||
|
|
|
@ -7,15 +7,6 @@ import (
|
|||
"github.com/didi/nightingale/src/dataobj"
|
||||
)
|
||||
|
||||
var (
|
||||
validFuncName = map[string]struct{}{
|
||||
"sum": {},
|
||||
"avg": {},
|
||||
"max": {},
|
||||
"min": {},
|
||||
}
|
||||
)
|
||||
|
||||
type AggrTsValue struct {
|
||||
Value dataobj.JsonFloat
|
||||
Count int
|
||||
|
@ -65,10 +56,7 @@ func sum(datas []*dataobj.TsdbQueryResponse) map[int64]*AggrTsValue {
|
|||
if _, exists := dataMap[datas[i].Values[j].Timestamp]; exists {
|
||||
dataMap[datas[i].Values[j].Timestamp].Value += value
|
||||
} else {
|
||||
v := AggrTsValue{
|
||||
Value: value,
|
||||
}
|
||||
dataMap[datas[i].Values[j].Timestamp] = &v
|
||||
dataMap[datas[i].Values[j].Timestamp] = &AggrTsValue{Value: value}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -87,13 +75,32 @@ func avg(datas []*dataobj.TsdbQueryResponse) map[int64]*AggrTsValue {
|
|||
|
||||
if _, exists := dataMap[datas[i].Values[j].Timestamp]; exists {
|
||||
dataMap[datas[i].Values[j].Timestamp].Count += 1
|
||||
dataMap[datas[i].Values[j].Timestamp].Value += (datas[i].Values[j].Value - dataMap[datas[i].Values[j].Timestamp].Value) / dataobj.JsonFloat(dataMap[datas[i].Values[j].Timestamp].Count)
|
||||
dataMap[datas[i].Values[j].Timestamp].Value += (datas[i].Values[j].Value - dataMap[datas[i].Values[j].Timestamp].Value) /
|
||||
dataobj.JsonFloat(dataMap[datas[i].Values[j].Timestamp].Count)
|
||||
} else {
|
||||
v := AggrTsValue{
|
||||
Value: value,
|
||||
Count: 1,
|
||||
dataMap[datas[i].Values[j].Timestamp] = &AggrTsValue{Value: value, Count: 1}
|
||||
}
|
||||
}
|
||||
}
|
||||
return dataMap
|
||||
}
|
||||
|
||||
func minOrMax(datas []*dataobj.TsdbQueryResponse, fn func(a, b dataobj.JsonFloat) bool) map[int64]*AggrTsValue {
|
||||
dataMap := make(map[int64]*AggrTsValue)
|
||||
datasLen := len(datas)
|
||||
for i := 0; i < datasLen; i++ {
|
||||
for j := 0; j < len(datas[i].Values); j++ {
|
||||
value := datas[i].Values[j].Value
|
||||
if math.IsNaN(float64(value)) {
|
||||
continue
|
||||
}
|
||||
|
||||
if _, exists := dataMap[datas[i].Values[j].Timestamp]; exists {
|
||||
if fn(value, dataMap[datas[i].Values[j].Timestamp].Value) {
|
||||
dataMap[datas[i].Values[j].Timestamp].Value = value
|
||||
}
|
||||
dataMap[datas[i].Values[j].Timestamp] = &v
|
||||
} else {
|
||||
dataMap[datas[i].Values[j].Timestamp] = &AggrTsValue{Value: value}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -101,51 +108,9 @@ func avg(datas []*dataobj.TsdbQueryResponse) map[int64]*AggrTsValue {
|
|||
}
|
||||
|
||||
func max(datas []*dataobj.TsdbQueryResponse) map[int64]*AggrTsValue {
|
||||
dataMap := make(map[int64]*AggrTsValue)
|
||||
datasLen := len(datas)
|
||||
for i := 0; i < datasLen; i++ {
|
||||
for j := 0; j < len(datas[i].Values); j++ {
|
||||
value := datas[i].Values[j].Value
|
||||
if math.IsNaN(float64(value)) {
|
||||
continue
|
||||
}
|
||||
|
||||
if _, exists := dataMap[datas[i].Values[j].Timestamp]; exists {
|
||||
if value > dataMap[datas[i].Values[j].Timestamp].Value {
|
||||
dataMap[datas[i].Values[j].Timestamp].Value = value
|
||||
}
|
||||
} else {
|
||||
v := AggrTsValue{
|
||||
Value: value,
|
||||
}
|
||||
dataMap[datas[i].Values[j].Timestamp] = &v
|
||||
}
|
||||
}
|
||||
}
|
||||
return dataMap
|
||||
return minOrMax(datas, func(a, b dataobj.JsonFloat) bool { return a > b })
|
||||
}
|
||||
|
||||
func min(datas []*dataobj.TsdbQueryResponse) map[int64]*AggrTsValue {
|
||||
dataMap := make(map[int64]*AggrTsValue)
|
||||
datasLen := len(datas)
|
||||
for i := 0; i < datasLen; i++ {
|
||||
for j := 0; j < len(datas[i].Values); j++ {
|
||||
value := datas[i].Values[j].Value
|
||||
if math.IsNaN(float64(value)) {
|
||||
continue
|
||||
}
|
||||
|
||||
if _, exists := dataMap[datas[i].Values[j].Timestamp]; exists {
|
||||
if value < dataMap[datas[i].Values[j].Timestamp].Value {
|
||||
dataMap[datas[i].Values[j].Timestamp].Value = value
|
||||
}
|
||||
} else {
|
||||
v := AggrTsValue{
|
||||
Value: value,
|
||||
}
|
||||
dataMap[datas[i].Values[j].Timestamp] = &v
|
||||
}
|
||||
}
|
||||
}
|
||||
return dataMap
|
||||
return minOrMax(datas, func(a, b dataobj.JsonFloat) bool { return a < b })
|
||||
}
|
||||
|
|
|
@ -95,7 +95,7 @@ func Parse(conf string) error {
|
|||
|
||||
err = viper.Unmarshal(&Config)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot read yml[%s]: %v\n", conf, err)
|
||||
return fmt.Errorf("cannot read yml[%s]: %v", conf, err)
|
||||
}
|
||||
|
||||
Config.Backend.ClusterList = formatClusterItems(Config.Backend.Cluster)
|
||||
|
|
|
@ -7,12 +7,12 @@ import (
|
|||
)
|
||||
|
||||
func RebuildJudgePool() {
|
||||
t1 := time.NewTicker(time.Duration(8) * time.Second)
|
||||
ticker := time.NewTicker(time.Duration(8) * time.Second)
|
||||
for {
|
||||
<-t1.C
|
||||
<-ticker.C
|
||||
judges := backend.GetJudges()
|
||||
if len(judges) == 0 {
|
||||
//防止心跳服务故障导致judge不可用,如果judges个数为0,先不更新judge连接池
|
||||
//防止心跳服务故障导致 judge 不可用,如果 judges 个数为 0,先不更新 judge 连接池
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
|
@ -9,9 +9,9 @@ import (
|
|||
)
|
||||
|
||||
func UpdateJudgeQueue() {
|
||||
t1 := time.NewTicker(time.Duration(8) * time.Second)
|
||||
ticker := time.NewTicker(time.Duration(8) * time.Second)
|
||||
for {
|
||||
<-t1.C
|
||||
<-ticker.C
|
||||
updateJudgeQueue()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,10 +22,10 @@ type StraResp struct {
|
|||
}
|
||||
|
||||
func GetStrategy() {
|
||||
t1 := time.NewTicker(time.Duration(8) * time.Second)
|
||||
ticker := time.NewTicker(time.Duration(8) * time.Second)
|
||||
getStrategy()
|
||||
for {
|
||||
<-t1.C
|
||||
<-ticker.C
|
||||
getStrategy()
|
||||
}
|
||||
}
|
||||
|
@ -33,7 +33,7 @@ func GetStrategy() {
|
|||
func getStrategy() {
|
||||
addrs := address.GetHTTPAddresses("monapi")
|
||||
if len(addrs) == 0 {
|
||||
logger.Error("empty addr")
|
||||
logger.Error("find no monapi address")
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -59,7 +59,7 @@ func getStrategy() {
|
|||
}
|
||||
|
||||
if err != nil {
|
||||
logger.Error("get stra err:", err)
|
||||
logger.Errorf("get stra err: %v", err)
|
||||
stats.Counter.Set("stra.err", 1)
|
||||
}
|
||||
|
||||
|
@ -71,20 +71,19 @@ func getStrategy() {
|
|||
for _, stra := range stras.Data {
|
||||
stats.Counter.Set("stra.count", 1)
|
||||
|
||||
//var metric string
|
||||
if len(stra.Exprs) < 1 {
|
||||
logger.Warningf("stra:%v exprs illegal", stra)
|
||||
logger.Warningf("illegal stra:%v exprs", stra)
|
||||
continue
|
||||
}
|
||||
// nodata 策略不使用 push 模式
|
||||
if stra.Exprs[0].Func == "nodata" {
|
||||
//nodata策略 不使用push模式
|
||||
continue
|
||||
}
|
||||
|
||||
metric := stra.Exprs[0].Metric
|
||||
for _, endpoint := range stra.Endpoints {
|
||||
key := str.PK(metric, endpoint) //TODO get straMap key, 此处需要优化
|
||||
k1 := key[0:2] //为了加快查找,增加一层map,key为计算出来的hash的前2位
|
||||
k1 := key[0:2] //为了加快查找,增加一层 map,key 为计算出来的 hash 的前 2 位
|
||||
|
||||
if _, exists := straMap[k1]; !exists {
|
||||
straMap[k1] = make(map[string][]*model.Stra)
|
||||
|
|
|
@ -55,9 +55,9 @@ func tsdbInstance(c *gin.Context) {
|
|||
|
||||
pk := dataobj.PKWithCounter(input.Endpoint, counter)
|
||||
pools, err := backend.SelectPoolByPK(pk)
|
||||
addrs := []string{}
|
||||
for _, pool := range pools {
|
||||
addrs = append(addrs, pool.Addr)
|
||||
addrs := make([]string, len(pools))
|
||||
for i, pool := range pools {
|
||||
addrs[i] = pool.Addr
|
||||
}
|
||||
|
||||
render.Data(c, addrs, nil)
|
||||
|
|
|
@ -19,8 +19,8 @@ func PushData(c *gin.Context) {
|
|||
return
|
||||
}
|
||||
|
||||
recvMetricValues := []*dataobj.MetricValue{}
|
||||
metricValues := []*dataobj.MetricValue{}
|
||||
recvMetricValues := make([]*dataobj.MetricValue, 0)
|
||||
metricValues := make([]*dataobj.MetricValue, 0)
|
||||
errors.Dangerous(c.ShouldBindJSON(&recvMetricValues))
|
||||
|
||||
var msg string
|
||||
|
|
|
@ -20,14 +20,13 @@ func (t *Transfer) Push(args []*dataobj.MetricValue, reply *dataobj.TransferResp
|
|||
start := time.Now()
|
||||
reply.Invalid = 0
|
||||
|
||||
items := []*dataobj.MetricValue{}
|
||||
items := make([]*dataobj.MetricValue, 0)
|
||||
for _, v := range args {
|
||||
logger.Debug("->recv: ", v)
|
||||
stats.Counter.Set("points.in", 1)
|
||||
err := v.CheckValidity()
|
||||
if err != nil {
|
||||
if err := v.CheckValidity(); err != nil {
|
||||
stats.Counter.Set("points.in.err", 1)
|
||||
msg := fmt.Sprintf("item is illegal item:%s err:%v", v, err)
|
||||
msg := fmt.Sprintf("illegal item:%s err:%v", v, err)
|
||||
logger.Warningf(msg)
|
||||
reply.Invalid += 1
|
||||
reply.Msg += msg
|
||||
|
@ -44,10 +43,10 @@ func (t *Transfer) Push(args []*dataobj.MetricValue, reply *dataobj.TransferResp
|
|||
if backend.Config.Enabled {
|
||||
backend.Push2JudgeSendQueue(items)
|
||||
}
|
||||
|
||||
if reply.Invalid == 0 {
|
||||
reply.Msg = "ok"
|
||||
}
|
||||
|
||||
reply.Total = len(args)
|
||||
reply.Latency = (time.Now().UnixNano() - start.UnixNano()) / 1000000
|
||||
return nil
|
||||
|
|
|
@ -6,7 +6,6 @@ import (
|
|||
)
|
||||
|
||||
func (t *Transfer) Query(args []dataobj.QueryData, reply *dataobj.QueryDataResp) error {
|
||||
//start := time.Now()
|
||||
reply.Data = backend.FetchData(args)
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -23,12 +23,12 @@ func Start() {
|
|||
server := rpc.NewServer()
|
||||
server.Register(new(Transfer))
|
||||
|
||||
l, e := net.Listen("tcp", addr)
|
||||
if e != nil {
|
||||
logger.Fatal("cannot listen ", addr, e)
|
||||
l, err := net.Listen("tcp", addr)
|
||||
if err != nil {
|
||||
logger.Fatalf("fail to connect address: [%s], error: %v", addr, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
logger.Info("listening ", addr)
|
||||
logger.Infof("server is available at:[%s]", addr)
|
||||
|
||||
var mh codec.MsgpackHandle
|
||||
mh.MapType = reflect.TypeOf(map[string]interface{}(nil))
|
||||
|
@ -36,7 +36,7 @@ func Start() {
|
|||
for {
|
||||
conn, err := l.Accept()
|
||||
if err != nil {
|
||||
logger.Warning("listener accept error: ", err)
|
||||
logger.Warningf("listener accept error: %v", err)
|
||||
time.Sleep(time.Duration(100) * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
|
|
|
@ -66,7 +66,7 @@ func main() {
|
|||
routes.Config(r)
|
||||
go http.Start(r, "transfer", cfg.Logger.Level)
|
||||
|
||||
ending()
|
||||
cleanup()
|
||||
}
|
||||
|
||||
// auto detect configuration file
|
||||
|
@ -97,12 +97,12 @@ func pconf() {
|
|||
}
|
||||
}
|
||||
|
||||
func ending() {
|
||||
func cleanup() {
|
||||
c := make(chan os.Signal, 1)
|
||||
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
|
||||
select {
|
||||
case <-c:
|
||||
fmt.Printf("stop signal caught, stopping... pid=%d\n", os.Getpid())
|
||||
fmt.Println("stop signal caught, stopping... pid=", os.Getpid())
|
||||
}
|
||||
|
||||
logger.Close()
|
||||
|
@ -112,7 +112,7 @@ func ending() {
|
|||
|
||||
func start() {
|
||||
runner.Init()
|
||||
fmt.Println("transfer start, use configuration file:", *conf)
|
||||
fmt.Println("transfer started, use configuration file:", *conf)
|
||||
fmt.Println("runner.Cwd:", runner.Cwd)
|
||||
fmt.Println("runner.Hostname:", runner.Hostname)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue