diff --git a/src/modules/transfer/backend/pool.go b/src/modules/transfer/backend/pool.go index 2b0a7b0e..10ca324e 100644 --- a/src/modules/transfer/backend/pool.go +++ b/src/modules/transfer/backend/pool.go @@ -83,7 +83,7 @@ func (cp *ConnPools) Update(cluster []string) { cp.M[address] = createOnePool(address, address, ct, maxConns, maxIdle) } - for address, _ := range cp.M { + for address := range cp.M { if _, exists := newCluster[address]; !exists { delete(cp.M, address) } @@ -91,8 +91,8 @@ func (cp *ConnPools) Update(cluster []string) { } // 同步发送, 完成发送或超时后 才能返回 -func (this *ConnPools) Call(addr, method string, args interface{}, resp interface{}) error { - connPool, exists := this.Get(addr) +func (cp *ConnPools) Call(addr, method string, args interface{}, resp interface{}) error { + connPool, exists := cp.Get(addr) if !exists { return fmt.Errorf("%s has no connection pool", addr) } @@ -103,7 +103,7 @@ func (this *ConnPools) Call(addr, method string, args interface{}, resp interfac } rpcClient := conn.(RpcClient) - callTimeout := time.Duration(this.CallTimeout) * time.Millisecond + callTimeout := time.Duration(cp.CallTimeout) * time.Millisecond done := make(chan error, 1) go func() { @@ -125,10 +125,10 @@ func (this *ConnPools) Call(addr, method string, args interface{}, resp interfac } } -func (this *ConnPools) Get(address string) (*pool.ConnPool, bool) { - this.RLock() - defer this.RUnlock() - p, exists := this.M[address] +func (cp *ConnPools) Get(address string) (*pool.ConnPool, bool) { + cp.RLock() + defer cp.RUnlock() + p, exists := cp.M[address] return p, exists } @@ -138,23 +138,23 @@ type RpcClient struct { name string } -func (this RpcClient) Name() string { - return this.name +func (rc RpcClient) Name() string { + return rc.name } -func (this RpcClient) Closed() bool { - return this.cli == nil +func (rc RpcClient) Closed() bool { + return rc.cli == nil } -func (this RpcClient) Close() error { - if this.cli != nil { - err := this.cli.Close() - this.cli = nil +func (rc RpcClient) Close() error { + if rc.cli != nil { + err := rc.cli.Close() + rc.cli = nil return err } return nil } -func (this RpcClient) Call(method string, args interface{}, reply interface{}) error { - return this.cli.Call(method, args, reply) +func (rc RpcClient) Call(method string, args interface{}, reply interface{}) error { + return rc.cli.Call(method, args, reply) } diff --git a/src/modules/transfer/backend/query.go b/src/modules/transfer/backend/query.go index 25f689f3..09101980 100644 --- a/src/modules/transfer/backend/query.go +++ b/src/modules/transfer/backend/query.go @@ -20,11 +20,19 @@ import ( ) func FetchData(inputs []dataobj.QueryData) []*dataobj.TsdbQueryResponse { - resp := []*dataobj.TsdbQueryResponse{} workerNum := 100 - worker := make(chan struct{}, workerNum) //控制goroutine并发数 + worker := make(chan struct{}, workerNum) // 控制 goroutine 并发数 dataChan := make(chan *dataobj.TsdbQueryResponse, 20000) + done := make(chan struct{}, 1) + resp := make([]*dataobj.TsdbQueryResponse, 0) + go func() { + defer func() { done <- struct{}{} }() + for d := range dataChan { + resp = append(resp, d) + } + }() + for _, input := range inputs { for _, endpoint := range input.Endpoints { for _, counter := range input.Counters { @@ -34,29 +42,32 @@ func FetchData(inputs []dataobj.QueryData) []*dataobj.TsdbQueryResponse { } } - //等待所有goroutine执行完成 + // 等待所有 goroutine 执行完成 for i := 0; i < workerNum; i++ { worker <- struct{}{} } - close(dataChan) - for { - d, ok := <-dataChan - if !ok { - break - } - resp = append(resp, d) - } + + // 等待所有 dataChan 被消费完 + <-done return resp } func FetchDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse { - resp := []*dataobj.TsdbQueryResponse{} workerNum := 100 - worker := make(chan struct{}, workerNum) //控制goroutine并发数 + worker := make(chan struct{}, workerNum) // 控制 goroutine 并发数 dataChan := make(chan *dataobj.TsdbQueryResponse, 20000) + done := make(chan struct{}, 1) + resp := make([]*dataobj.TsdbQueryResponse, 0) + go func() { + defer func() { done <- struct{}{} }() + for d := range dataChan { + resp = append(resp, d) + } + }() + for _, endpoint := range input.Endpoints { if len(input.Tags) == 0 { counter, err := GetCounter(input.Metric, "", nil) @@ -85,16 +96,10 @@ func FetchDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse { } close(dataChan) - for { - d, ok := <-dataChan - if !ok { - break - } - resp = append(resp, d) - } + <-done //进行数据计算 - aggrDatas := []*dataobj.TsdbQueryResponse{} + aggrDatas := make([]*dataobj.TsdbQueryResponse, 0) if input.AggrFunc != "" && len(resp) > 1 { aggrCounter := make(map[string][]*dataobj.TsdbQueryResponse) @@ -173,7 +178,6 @@ func fetchDataSync(start, end int64, consolFun, endpoint, counter string, step i stats.Counter.Set("query.data.err", 1) } dataChan <- data - return } func fetchData(start, end int64, consolFun, endpoint, counter string, step int) (*dataobj.TsdbQueryResponse, error) { diff --git a/src/modules/transfer/backend/ring.go b/src/modules/transfer/backend/ring.go index 99b183cb..8cfc1e76 100644 --- a/src/modules/transfer/backend/ring.go +++ b/src/modules/transfer/backend/ring.go @@ -22,7 +22,6 @@ func (this *ConsistentHashRing) Set(r *consistent.Consistent) { this.Lock() defer this.Unlock() this.ring = r - return } func (this *ConsistentHashRing) GetRing() *consistent.Consistent { diff --git a/src/modules/transfer/config/config.go b/src/modules/transfer/config/config.go index 86d28b15..85baddc7 100644 --- a/src/modules/transfer/config/config.go +++ b/src/modules/transfer/config/config.go @@ -46,7 +46,7 @@ var ( ) func NewClusterNode(addrs []string) *backend.ClusterNode { - return &backend.ClusterNode{addrs} + return &backend.ClusterNode{Addrs: addrs} } // map["node"]="host1,host2" --> map["node"]=["host1", "host2"] diff --git a/src/modules/transfer/cron/stra.go b/src/modules/transfer/cron/stra.go index b210447e..81c57573 100644 --- a/src/modules/transfer/cron/stra.go +++ b/src/modules/transfer/cron/stra.go @@ -73,7 +73,7 @@ func getStrategy() { //var metric string if len(stra.Exprs) < 1 { - logger.Warning("stra:%v exprs illegal", stra) + logger.Warningf("stra:%v exprs illegal", stra) continue } if stra.Exprs[0].Func == "nodata" { diff --git a/src/modules/transfer/http/routes/push_router.go b/src/modules/transfer/http/routes/push_router.go index 9a5ef222..f903eadc 100644 --- a/src/modules/transfer/http/routes/push_router.go +++ b/src/modules/transfer/http/routes/push_router.go @@ -52,5 +52,4 @@ func PushData(c *gin.Context) { } render.Data(c, "ok", nil) - return } diff --git a/src/toolkits/stats/init.go b/src/toolkits/stats/init.go index 9b74917d..29f674d1 100644 --- a/src/toolkits/stats/init.go +++ b/src/toolkits/stats/init.go @@ -76,5 +76,4 @@ func push(items []*dataobj.MetricValue) { } defer resp.Body.Close() - return }