Fetching data and packaging results can be handled at the same time to imporve performance
This commit is contained in:
parent
73ed8a66d3
commit
1c066df6ac
|
@ -20,11 +20,19 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func FetchData(inputs []dataobj.QueryData) []*dataobj.TsdbQueryResponse {
|
func FetchData(inputs []dataobj.QueryData) []*dataobj.TsdbQueryResponse {
|
||||||
resp := []*dataobj.TsdbQueryResponse{}
|
|
||||||
workerNum := 100
|
workerNum := 100
|
||||||
worker := make(chan struct{}, workerNum) // 控制 goroutine 并发数
|
worker := make(chan struct{}, workerNum) // 控制 goroutine 并发数
|
||||||
dataChan := make(chan *dataobj.TsdbQueryResponse, 20000)
|
dataChan := make(chan *dataobj.TsdbQueryResponse, 20000)
|
||||||
|
|
||||||
|
done := make(chan struct{}, 1)
|
||||||
|
resp := make([]*dataobj.TsdbQueryResponse, 0)
|
||||||
|
go func() {
|
||||||
|
defer func() { done <- struct{}{} }()
|
||||||
|
for d := range dataChan {
|
||||||
|
resp = append(resp, d)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
for _, input := range inputs {
|
for _, input := range inputs {
|
||||||
for _, endpoint := range input.Endpoints {
|
for _, endpoint := range input.Endpoints {
|
||||||
for _, counter := range input.Counters {
|
for _, counter := range input.Counters {
|
||||||
|
@ -38,25 +46,28 @@ func FetchData(inputs []dataobj.QueryData) []*dataobj.TsdbQueryResponse {
|
||||||
for i := 0; i < workerNum; i++ {
|
for i := 0; i < workerNum; i++ {
|
||||||
worker <- struct{}{}
|
worker <- struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
close(dataChan)
|
close(dataChan)
|
||||||
for {
|
|
||||||
d, ok := <-dataChan
|
// 等待所有 dataChan 被消费完
|
||||||
if !ok {
|
<-done
|
||||||
break
|
|
||||||
}
|
|
||||||
resp = append(resp, d)
|
|
||||||
}
|
|
||||||
|
|
||||||
return resp
|
return resp
|
||||||
}
|
}
|
||||||
|
|
||||||
func FetchDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse {
|
func FetchDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse {
|
||||||
resp := []*dataobj.TsdbQueryResponse{}
|
|
||||||
workerNum := 100
|
workerNum := 100
|
||||||
worker := make(chan struct{}, workerNum) // 控制 goroutine 并发数
|
worker := make(chan struct{}, workerNum) // 控制 goroutine 并发数
|
||||||
dataChan := make(chan *dataobj.TsdbQueryResponse, 20000)
|
dataChan := make(chan *dataobj.TsdbQueryResponse, 20000)
|
||||||
|
|
||||||
|
done := make(chan struct{}, 1)
|
||||||
|
resp := make([]*dataobj.TsdbQueryResponse, 0)
|
||||||
|
go func() {
|
||||||
|
defer func() { done <- struct{}{} }()
|
||||||
|
for d := range dataChan {
|
||||||
|
resp = append(resp, d)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
for _, endpoint := range input.Endpoints {
|
for _, endpoint := range input.Endpoints {
|
||||||
if len(input.Tags) == 0 {
|
if len(input.Tags) == 0 {
|
||||||
counter, err := GetCounter(input.Metric, "", nil)
|
counter, err := GetCounter(input.Metric, "", nil)
|
||||||
|
@ -85,16 +96,10 @@ func FetchDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse {
|
||||||
}
|
}
|
||||||
|
|
||||||
close(dataChan)
|
close(dataChan)
|
||||||
for {
|
<-done
|
||||||
d, ok := <-dataChan
|
|
||||||
if !ok {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
resp = append(resp, d)
|
|
||||||
}
|
|
||||||
|
|
||||||
//进行数据计算
|
//进行数据计算
|
||||||
aggrDatas := []*dataobj.TsdbQueryResponse{}
|
aggrDatas := make([]*dataobj.TsdbQueryResponse, 0)
|
||||||
if input.AggrFunc != "" && len(resp) > 1 {
|
if input.AggrFunc != "" && len(resp) > 1 {
|
||||||
|
|
||||||
aggrCounter := make(map[string][]*dataobj.TsdbQueryResponse)
|
aggrCounter := make(map[string][]*dataobj.TsdbQueryResponse)
|
||||||
|
|
Loading…
Reference in New Issue