Merge pull request #64 from chenjiandongx/imporve-performance

performance improvement
This commit is contained in:
qinyening 2020-04-07 12:43:35 +08:00 committed by GitHub
commit ff209fba85
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 46 additions and 45 deletions

View File

@ -83,7 +83,7 @@ func (cp *ConnPools) Update(cluster []string) {
cp.M[address] = createOnePool(address, address, ct, maxConns, maxIdle) cp.M[address] = createOnePool(address, address, ct, maxConns, maxIdle)
} }
for address, _ := range cp.M { for address := range cp.M {
if _, exists := newCluster[address]; !exists { if _, exists := newCluster[address]; !exists {
delete(cp.M, address) delete(cp.M, address)
} }
@ -91,8 +91,8 @@ func (cp *ConnPools) Update(cluster []string) {
} }
// 同步发送, 完成发送或超时后 才能返回 // 同步发送, 完成发送或超时后 才能返回
func (this *ConnPools) Call(addr, method string, args interface{}, resp interface{}) error { func (cp *ConnPools) Call(addr, method string, args interface{}, resp interface{}) error {
connPool, exists := this.Get(addr) connPool, exists := cp.Get(addr)
if !exists { if !exists {
return fmt.Errorf("%s has no connection pool", addr) return fmt.Errorf("%s has no connection pool", addr)
} }
@ -103,7 +103,7 @@ func (this *ConnPools) Call(addr, method string, args interface{}, resp interfac
} }
rpcClient := conn.(RpcClient) rpcClient := conn.(RpcClient)
callTimeout := time.Duration(this.CallTimeout) * time.Millisecond callTimeout := time.Duration(cp.CallTimeout) * time.Millisecond
done := make(chan error, 1) done := make(chan error, 1)
go func() { go func() {
@ -125,10 +125,10 @@ func (this *ConnPools) Call(addr, method string, args interface{}, resp interfac
} }
} }
func (this *ConnPools) Get(address string) (*pool.ConnPool, bool) { func (cp *ConnPools) Get(address string) (*pool.ConnPool, bool) {
this.RLock() cp.RLock()
defer this.RUnlock() defer cp.RUnlock()
p, exists := this.M[address] p, exists := cp.M[address]
return p, exists return p, exists
} }
@ -138,23 +138,23 @@ type RpcClient struct {
name string name string
} }
func (this RpcClient) Name() string { func (rc RpcClient) Name() string {
return this.name return rc.name
} }
func (this RpcClient) Closed() bool { func (rc RpcClient) Closed() bool {
return this.cli == nil return rc.cli == nil
} }
func (this RpcClient) Close() error { func (rc RpcClient) Close() error {
if this.cli != nil { if rc.cli != nil {
err := this.cli.Close() err := rc.cli.Close()
this.cli = nil rc.cli = nil
return err return err
} }
return nil return nil
} }
func (this RpcClient) Call(method string, args interface{}, reply interface{}) error { func (rc RpcClient) Call(method string, args interface{}, reply interface{}) error {
return this.cli.Call(method, args, reply) return rc.cli.Call(method, args, reply)
} }

View File

@ -20,11 +20,19 @@ import (
) )
func FetchData(inputs []dataobj.QueryData) []*dataobj.TsdbQueryResponse { func FetchData(inputs []dataobj.QueryData) []*dataobj.TsdbQueryResponse {
resp := []*dataobj.TsdbQueryResponse{}
workerNum := 100 workerNum := 100
worker := make(chan struct{}, workerNum) //控制goroutine并发数 worker := make(chan struct{}, workerNum) // 控制 goroutine 并发数
dataChan := make(chan *dataobj.TsdbQueryResponse, 20000) dataChan := make(chan *dataobj.TsdbQueryResponse, 20000)
done := make(chan struct{}, 1)
resp := make([]*dataobj.TsdbQueryResponse, 0)
go func() {
defer func() { done <- struct{}{} }()
for d := range dataChan {
resp = append(resp, d)
}
}()
for _, input := range inputs { for _, input := range inputs {
for _, endpoint := range input.Endpoints { for _, endpoint := range input.Endpoints {
for _, counter := range input.Counters { for _, counter := range input.Counters {
@ -34,29 +42,32 @@ func FetchData(inputs []dataobj.QueryData) []*dataobj.TsdbQueryResponse {
} }
} }
//等待所有goroutine执行完成 // 等待所有 goroutine 执行完成
for i := 0; i < workerNum; i++ { for i := 0; i < workerNum; i++ {
worker <- struct{}{} worker <- struct{}{}
} }
close(dataChan) close(dataChan)
for {
d, ok := <-dataChan // 等待所有 dataChan 被消费完
if !ok { <-done
break
}
resp = append(resp, d)
}
return resp return resp
} }
func FetchDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse { func FetchDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse {
resp := []*dataobj.TsdbQueryResponse{}
workerNum := 100 workerNum := 100
worker := make(chan struct{}, workerNum) //控制goroutine并发数 worker := make(chan struct{}, workerNum) // 控制 goroutine 并发数
dataChan := make(chan *dataobj.TsdbQueryResponse, 20000) dataChan := make(chan *dataobj.TsdbQueryResponse, 20000)
done := make(chan struct{}, 1)
resp := make([]*dataobj.TsdbQueryResponse, 0)
go func() {
defer func() { done <- struct{}{} }()
for d := range dataChan {
resp = append(resp, d)
}
}()
for _, endpoint := range input.Endpoints { for _, endpoint := range input.Endpoints {
if len(input.Tags) == 0 { if len(input.Tags) == 0 {
counter, err := GetCounter(input.Metric, "", nil) counter, err := GetCounter(input.Metric, "", nil)
@ -85,16 +96,10 @@ func FetchDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse {
} }
close(dataChan) close(dataChan)
for { <-done
d, ok := <-dataChan
if !ok {
break
}
resp = append(resp, d)
}
//进行数据计算 //进行数据计算
aggrDatas := []*dataobj.TsdbQueryResponse{} aggrDatas := make([]*dataobj.TsdbQueryResponse, 0)
if input.AggrFunc != "" && len(resp) > 1 { if input.AggrFunc != "" && len(resp) > 1 {
aggrCounter := make(map[string][]*dataobj.TsdbQueryResponse) aggrCounter := make(map[string][]*dataobj.TsdbQueryResponse)
@ -173,7 +178,6 @@ func fetchDataSync(start, end int64, consolFun, endpoint, counter string, step i
stats.Counter.Set("query.data.err", 1) stats.Counter.Set("query.data.err", 1)
} }
dataChan <- data dataChan <- data
return
} }
func fetchData(start, end int64, consolFun, endpoint, counter string, step int) (*dataobj.TsdbQueryResponse, error) { func fetchData(start, end int64, consolFun, endpoint, counter string, step int) (*dataobj.TsdbQueryResponse, error) {

View File

@ -22,7 +22,6 @@ func (this *ConsistentHashRing) Set(r *consistent.Consistent) {
this.Lock() this.Lock()
defer this.Unlock() defer this.Unlock()
this.ring = r this.ring = r
return
} }
func (this *ConsistentHashRing) GetRing() *consistent.Consistent { func (this *ConsistentHashRing) GetRing() *consistent.Consistent {

View File

@ -46,7 +46,7 @@ var (
) )
func NewClusterNode(addrs []string) *backend.ClusterNode { func NewClusterNode(addrs []string) *backend.ClusterNode {
return &backend.ClusterNode{addrs} return &backend.ClusterNode{Addrs: addrs}
} }
// map["node"]="host1,host2" --> map["node"]=["host1", "host2"] // map["node"]="host1,host2" --> map["node"]=["host1", "host2"]

View File

@ -73,7 +73,7 @@ func getStrategy() {
//var metric string //var metric string
if len(stra.Exprs) < 1 { if len(stra.Exprs) < 1 {
logger.Warning("stra:%v exprs illegal", stra) logger.Warningf("stra:%v exprs illegal", stra)
continue continue
} }
if stra.Exprs[0].Func == "nodata" { if stra.Exprs[0].Func == "nodata" {

View File

@ -52,5 +52,4 @@ func PushData(c *gin.Context) {
} }
render.Data(c, "ok", nil) render.Data(c, "ok", nil)
return
} }

View File

@ -76,5 +76,4 @@ func push(items []*dataobj.MetricValue) {
} }
defer resp.Body.Close() defer resp.Body.Close()
return
} }