Change push index from async to sync
This commit is contained in:
parent
982fc6aaa2
commit
3c1ed52bb9
|
@ -8,20 +8,10 @@ import (
|
||||||
"github.com/didi/nightingale/src/modules/tsdb/backend/rpc"
|
"github.com/didi/nightingale/src/modules/tsdb/backend/rpc"
|
||||||
"github.com/didi/nightingale/src/toolkits/stats"
|
"github.com/didi/nightingale/src/toolkits/stats"
|
||||||
|
|
||||||
"github.com/toolkits/pkg/concurrent/semaphore"
|
|
||||||
"github.com/toolkits/pkg/logger"
|
"github.com/toolkits/pkg/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
|
||||||
semaUpdateIndexAll *semaphore.Semaphore
|
|
||||||
)
|
|
||||||
|
|
||||||
func StartUpdateIndexTask() {
|
func StartUpdateIndexTask() {
|
||||||
if rpc.Config.MaxConns != 0 {
|
|
||||||
semaUpdateIndexAll = semaphore.NewSemaphore(rpc.Config.MaxConns / 2)
|
|
||||||
} else {
|
|
||||||
semaUpdateIndexAll = semaphore.NewSemaphore(10)
|
|
||||||
}
|
|
||||||
|
|
||||||
t1 := time.NewTicker(time.Duration(Config.RebuildInterval) * time.Second)
|
t1 := time.NewTicker(time.Duration(Config.RebuildInterval) * time.Second)
|
||||||
for {
|
for {
|
||||||
|
@ -73,22 +63,13 @@ func RebuildAllIndex(params ...[]string) error {
|
||||||
i = i + 1
|
i = i + 1
|
||||||
|
|
||||||
if i == aggrNum {
|
if i == aggrNum {
|
||||||
semaUpdateIndexAll.Acquire()
|
rpc.Push2Index(rpc.ALLINDEX, tmpList, addrs)
|
||||||
go func(items []*dataobj.TsdbItem) {
|
|
||||||
defer semaUpdateIndexAll.Release()
|
|
||||||
rpc.Push2Index(rpc.ALLINDEX, items, addrs)
|
|
||||||
}(tmpList)
|
|
||||||
|
|
||||||
i = 0
|
i = 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if i != 0 {
|
if i != 0 {
|
||||||
semaUpdateIndexAll.Acquire()
|
rpc.Push2Index(rpc.ALLINDEX, tmpList[:i], addrs)
|
||||||
go func(items []*dataobj.TsdbItem) {
|
|
||||||
defer semaUpdateIndexAll.Release()
|
|
||||||
rpc.Push2Index(rpc.ALLINDEX, items, addrs)
|
|
||||||
}(tmpList[:i])
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue