standardizing code (#81)
* Review index module * Update debug info * Update according to the reivew feedback * Remove newline * Micro update * Micro fix * Fix grammer error
This commit is contained in:
parent
69702e5fd6
commit
0604c690d4
|
@ -8,6 +8,7 @@ import (
|
|||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
// Counter: sorted tags
|
||||
type CounterTsMap struct {
|
||||
sync.RWMutex
|
||||
M map[string]int64 `json:"counters"` // map[counter]ts
|
||||
|
@ -31,7 +32,7 @@ func (c *CounterTsMap) Clean(now, timeDuration int64, endpoint, metric string) {
|
|||
delete(c.M, counter)
|
||||
stats.Counter.Set("counter.clean", 1)
|
||||
|
||||
logger.Debugf("clean index endpoint:%s metric:%s counter:%s", endpoint, metric, counter)
|
||||
logger.Debugf("clean endpoint index:%s metric:%s counter:%s", endpoint, metric, counter)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,27 +16,29 @@ type EndpointIndexMap struct {
|
|||
M map[string]*MetricIndexMap `json:"endpoint_index"` //map[endpoint]metricMap{map[metric]Index}
|
||||
}
|
||||
|
||||
//push 索引数据
|
||||
// Push 索引数据
|
||||
func (e *EndpointIndexMap) Push(item dataobj.IndexModel, now int64) {
|
||||
counter := dataobj.SortedTags(item.Tags)
|
||||
tags := dataobj.SortedTags(item.Tags)
|
||||
metric := item.Metric
|
||||
|
||||
// 先判断 endpoint 是否已经被记录,不存在则直接初始化
|
||||
metricIndexMap, exists := e.GetMetricIndexMap(item.Endpoint)
|
||||
if !exists {
|
||||
metricIndexMap = &MetricIndexMap{Data: make(map[string]*MetricIndex)}
|
||||
metricIndexMap.SetMetricIndex(metric, NewMetricIndex(item, counter, now))
|
||||
metricIndexMap.SetMetricIndex(metric, NewMetricIndex(item, tags, now))
|
||||
e.SetMetricIndexMap(item.Endpoint, metricIndexMap)
|
||||
|
||||
NewEndpoints.PushFront(item.Endpoint) //必须在metricIndexMap成功之后在push
|
||||
NewEndpoints.PushFront(item.Endpoint) //必须在 metricIndexMap 成功之后再 push
|
||||
return
|
||||
}
|
||||
|
||||
// 再判断该 endpoint 下的具体某个 metric 是否存在
|
||||
metricIndex, exists := metricIndexMap.GetMetricIndex(metric)
|
||||
if !exists {
|
||||
metricIndexMap.SetMetricIndex(metric, NewMetricIndex(item, counter, now))
|
||||
metricIndexMap.SetMetricIndex(metric, NewMetricIndex(item, tags, now))
|
||||
return
|
||||
}
|
||||
metricIndex.Set(item, counter, now)
|
||||
metricIndex.Set(item, tags, now)
|
||||
}
|
||||
|
||||
func (e *EndpointIndexMap) Clean(timeDuration int64) {
|
||||
|
@ -49,13 +51,12 @@ func (e *EndpointIndexMap) Clean(timeDuration int64) {
|
|||
}
|
||||
|
||||
metricIndexMap.Clean(now, timeDuration, endpoint)
|
||||
|
||||
if metricIndexMap.Len() < 1 {
|
||||
e.Lock()
|
||||
delete(e.M, endpoint)
|
||||
stats.Counter.Set("endpoint.clean", 1)
|
||||
e.Unlock()
|
||||
logger.Debug("clean index endpoint: ", endpoint)
|
||||
logger.Debug("clean index endpoint:", endpoint)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -63,6 +64,7 @@ func (e *EndpointIndexMap) Clean(timeDuration int64) {
|
|||
func (e *EndpointIndexMap) GetMetricIndex(endpoint, metric string) (*MetricIndex, bool) {
|
||||
e.RLock()
|
||||
defer e.RUnlock()
|
||||
|
||||
metricIndexMap, exists := e.M[endpoint]
|
||||
if !exists {
|
||||
return nil, false
|
||||
|
@ -73,6 +75,7 @@ func (e *EndpointIndexMap) GetMetricIndex(endpoint, metric string) (*MetricIndex
|
|||
func (e *EndpointIndexMap) GetMetricIndexMap(endpoint string) (*MetricIndexMap, bool) {
|
||||
e.RLock()
|
||||
defer e.RUnlock()
|
||||
|
||||
metricIndexMap, exists := e.M[endpoint]
|
||||
return metricIndexMap, exists
|
||||
}
|
||||
|
@ -80,12 +83,14 @@ func (e *EndpointIndexMap) GetMetricIndexMap(endpoint string) (*MetricIndexMap,
|
|||
func (e *EndpointIndexMap) SetMetricIndexMap(endpoint string, metricIndex *MetricIndexMap) {
|
||||
e.Lock()
|
||||
defer e.Unlock()
|
||||
|
||||
e.M[endpoint] = metricIndex
|
||||
}
|
||||
|
||||
func (e *EndpointIndexMap) GetMetricsBy(endpoint string) []string {
|
||||
e.RLock()
|
||||
defer e.RUnlock()
|
||||
|
||||
if _, exists := e.M[endpoint]; !exists {
|
||||
return []string{}
|
||||
}
|
||||
|
@ -97,29 +102,28 @@ func (e *EndpointIndexMap) GetIndexByClude(endpoint, metric string, include, exc
|
|||
if !exists {
|
||||
return []string{}, nil
|
||||
}
|
||||
tagkvs := metricIndex.TagkvMap.GetTagkvMap()
|
||||
|
||||
fullmatch := getMatchedTags(tagkvs, include, exclude)
|
||||
// 部分tagk的tagv全部被exclude 或者 完全没有匹配的
|
||||
if len(fullmatch) != len(tagkvs) || len(fullmatch) == 0 {
|
||||
tagkvs := metricIndex.TagkvMap.GetTagkvMap()
|
||||
tags := getMatchedTags(tagkvs, include, exclude)
|
||||
// 部分 tagk 的 tagv 全部被 exclude 或者 完全没有匹配的
|
||||
if len(tags) != len(tagkvs) || len(tags) == 0 {
|
||||
return []string{}, nil
|
||||
}
|
||||
|
||||
if OverMaxLimit(fullmatch, Config.MaxQueryCount) {
|
||||
err := fmt.Errorf("xclude fullmatch get too much counters, endpoint:%s metric:%s, "+
|
||||
if OverMaxLimit(tags, Config.MaxQueryCount) {
|
||||
err := fmt.Errorf("xclude fullmatch get too much counters, endpoint:%s metric:%s, "+
|
||||
"include:%v, exclude:%v\n", endpoint, metric, include, exclude)
|
||||
return []string{}, err
|
||||
}
|
||||
|
||||
return GetAllCounter(GetSortTags(fullmatch)), nil
|
||||
return GetAllCounter(GetSortTags(tags)), nil
|
||||
}
|
||||
|
||||
func (e *EndpointIndexMap) GetEndpoints() []string {
|
||||
e.RLock()
|
||||
defer e.RUnlock()
|
||||
|
||||
length := len(e.M)
|
||||
ret := make([]string, length)
|
||||
ret := make([]string, len(e.M))
|
||||
i := 0
|
||||
for endpoint := range e.M {
|
||||
ret[i] = endpoint
|
||||
|
|
|
@ -41,25 +41,30 @@ func reportEndpoint(endpoints []interface{}) {
|
|||
perm := rand.Perm(len(addrs))
|
||||
for i := range perm {
|
||||
url := fmt.Sprintf("http://%s/v1/portal/endpoint", addrs[perm[i]])
|
||||
|
||||
m := map[string][]interface{}{
|
||||
"endpoints": endpoints,
|
||||
}
|
||||
|
||||
var body reportRes
|
||||
err := httplib.Post(url).JSONBodyQuiet(m).SetTimeout(3*time.Second).Header("x-srv-token", "monapi-builtin-token").ToJSON(&body)
|
||||
err := httplib.Post(url).
|
||||
JSONBodyQuiet(m).
|
||||
SetTimeout(3*time.Second).
|
||||
Header("x-srv-token", "monapi-builtin-token").
|
||||
ToJSON(&body)
|
||||
if err != nil {
|
||||
logger.Warningf("curl %s fail: %v. retry", url, err)
|
||||
stats.Counter.Set("report.endpoint.err", 1)
|
||||
continue
|
||||
}
|
||||
if body.Err != "" { //数据库连接出错会出现此情况
|
||||
logger.Warningf("curl %s fail: %s. retry", url, body.Err)
|
||||
logger.Warningf("curl [%s] fail: %v. retry", url, err)
|
||||
stats.Counter.Set("report.endpoint.err", 1)
|
||||
continue
|
||||
}
|
||||
|
||||
//推送成功,将endpoint状态标记为已上报,避免下次index重启时再重新上报
|
||||
// 数据库连接出错会出现此情况
|
||||
if body.Err != "" {
|
||||
logger.Warningf("curl [%s] fail: %v. retry", url, body.Err)
|
||||
stats.Counter.Set("report.endpoint.err", 1)
|
||||
continue
|
||||
}
|
||||
|
||||
// 推送成功,将 endpoint 状态标记为已上报,避免下次 index 重启时再重新上报
|
||||
for _, endpoint := range endpoints {
|
||||
metricIndexMap, _ := IndexDB.GetMetricIndexMap(endpoint.(string))
|
||||
metricIndexMap.SetReported()
|
||||
|
|
|
@ -55,9 +55,9 @@ func InitDB(cfg CacheSection) {
|
|||
}
|
||||
|
||||
func StartCleaner(interval int, cacheDuration int) {
|
||||
t1 := time.NewTicker(time.Duration(interval) * time.Second)
|
||||
ticker := time.NewTicker(time.Duration(interval) * time.Second)
|
||||
for {
|
||||
<-t1.C
|
||||
<-ticker.C
|
||||
|
||||
start := time.Now()
|
||||
IndexDB.Clean(int64(cacheDuration))
|
||||
|
@ -66,13 +66,12 @@ func StartCleaner(interval int, cacheDuration int) {
|
|||
}
|
||||
|
||||
func StartPersist(interval int) {
|
||||
t1 := time.NewTicker(time.Duration(interval) * time.Second)
|
||||
ticker := time.NewTicker(time.Duration(interval) * time.Second)
|
||||
for {
|
||||
<-t1.C
|
||||
<-ticker.C
|
||||
|
||||
err := Persist("normal")
|
||||
if err != nil {
|
||||
logger.Error("Persist err:", err)
|
||||
if err := Persist("normal"); err != nil {
|
||||
logger.Errorf("persist error:%+v", err)
|
||||
stats.Counter.Set("persist.err", 1)
|
||||
}
|
||||
}
|
||||
|
@ -88,32 +87,33 @@ func Rebuild(persistenceDir string, concurrency int) {
|
|||
}
|
||||
}
|
||||
|
||||
if dbDir == "" { //dbDir为空说明从远端下载索引失败,从本地读取
|
||||
logger.Debug("rebuild from local")
|
||||
// dbDir 为空说明从远端下载索引失败,从本地读取
|
||||
if dbDir == "" {
|
||||
logger.Debug("rebuild index from local disk")
|
||||
dbDir = fmt.Sprintf("%s/%s", persistenceDir, "db")
|
||||
}
|
||||
|
||||
err := RebuildFromDisk(dbDir, concurrency)
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
if err := RebuildFromDisk(dbDir, concurrency); err != nil {
|
||||
logger.Errorf("rebuild index from local disk error:%+v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func RebuildFromDisk(indexFileDir string, concurrency int) error {
|
||||
logger.Info("Try to rebuild index from disk")
|
||||
if !file.IsExist(indexFileDir) {
|
||||
return fmt.Errorf("index persistence dir %s not exists.", indexFileDir)
|
||||
return fmt.Errorf("index persistence dir [%s] don't exist", indexFileDir)
|
||||
}
|
||||
|
||||
//遍历目录
|
||||
// 遍历目录
|
||||
files, err := ioutil.ReadDir(indexFileDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
logger.Infof("There're [%d] endpoints need rebuild", len(files))
|
||||
logger.Infof("There're [%d] endpoints need to rebuild", len(files))
|
||||
|
||||
sema := semaphore.NewSemaphore(concurrency)
|
||||
for _, fileObj := range files {
|
||||
// 只处理文件
|
||||
if fileObj.IsDir() {
|
||||
continue
|
||||
}
|
||||
|
@ -128,9 +128,9 @@ func RebuildFromDisk(indexFileDir string, concurrency int) error {
|
|||
logger.Errorf("read file error, [endpoint:%s][reason:%v]", endpoint, err)
|
||||
return
|
||||
}
|
||||
|
||||
// 没有标记上报过的 endpoint 需要重新上报给 monapi
|
||||
if !metricIndexMap.IsReported() {
|
||||
NewEndpoints.PushFront(endpoint) //没有标记上报过的endpoint,重新上报给monapi
|
||||
NewEndpoints.PushFront(endpoint)
|
||||
}
|
||||
|
||||
IndexDB.Lock()
|
||||
|
@ -145,17 +145,18 @@ func RebuildFromDisk(indexFileDir string, concurrency int) error {
|
|||
|
||||
func Persist(mode string) error {
|
||||
indexFileDir := Config.PersistDir
|
||||
if mode == "end" {
|
||||
|
||||
switch mode {
|
||||
case "end":
|
||||
semaPermanence.Acquire()
|
||||
defer semaPermanence.Release()
|
||||
|
||||
} else if mode == "normal" || mode == "download" {
|
||||
case "normal", "download":
|
||||
if !semaPermanence.TryAcquire() {
|
||||
return fmt.Errorf("permanence operate is Already running...")
|
||||
return fmt.Errorf("permanence operate is already running")
|
||||
}
|
||||
defer semaPermanence.Release()
|
||||
} else {
|
||||
return fmt.Errorf("wrong mode:%v", mode)
|
||||
default:
|
||||
return fmt.Errorf("wrong mode:%s", mode)
|
||||
}
|
||||
|
||||
var tmpDir string
|
||||
|
@ -167,36 +168,40 @@ func Persist(mode string) error {
|
|||
if err := os.RemoveAll(tmpDir); err != nil {
|
||||
return err
|
||||
}
|
||||
//创建tmp目录
|
||||
// create tmp directory
|
||||
if err := os.MkdirAll(tmpDir, 0777); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
//填充tmp目录
|
||||
// write index data to disk
|
||||
endpoints := IndexDB.GetEndpoints()
|
||||
logger.Infof("save index data to disk[num:%d][mode:%s]\n", len(endpoints), mode)
|
||||
epLength := len(endpoints)
|
||||
logger.Infof("save index data to disk[num:%d][mode:%s]\n", epLength, mode)
|
||||
|
||||
for i, endpoint := range endpoints {
|
||||
logger.Infof("sync [%s] to disk, [%d%%] complete\n", endpoint, int((float64(i)/float64(len(endpoints)))*100))
|
||||
logger.Infof("sync [%s] to disk, [%d%%] complete\n", endpoint, int((float64(i)/float64(epLength))*100))
|
||||
|
||||
err := WriteIndexToFile(tmpDir, endpoint)
|
||||
if err != nil {
|
||||
logger.Errorf("write %s index to file err:%v", endpoint, err)
|
||||
if err := WriteIndexToFile(tmpDir, endpoint); err != nil {
|
||||
logger.Errorf("write %s index to file err:%v\n", endpoint, err)
|
||||
}
|
||||
}
|
||||
|
||||
logger.Infof("sync to disk , [%d%%] complete\n", 100)
|
||||
logger.Info("finish syncing index data")
|
||||
|
||||
if mode == "download" {
|
||||
compress.TarGz(fmt.Sprintf("%s/%s", indexFileDir, "db.tar.gz"), tmpDir)
|
||||
idxPath := fmt.Sprintf("%s/%s", indexFileDir, "db.tar.gz")
|
||||
if err := compress.TarGz(idxPath, tmpDir); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
//清空旧的db目录
|
||||
// clean up the discard directory
|
||||
oleIndexDir := fmt.Sprintf("%s/%s", indexFileDir, "db")
|
||||
if err := os.RemoveAll(oleIndexDir); err != nil {
|
||||
return err
|
||||
}
|
||||
//将tmp目录改名为正式的文件名
|
||||
|
||||
// rename directory
|
||||
if err := os.Rename(tmpDir, oleIndexDir); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -207,7 +212,7 @@ func Persist(mode string) error {
|
|||
func WriteIndexToFile(indexDir, endpoint string) error {
|
||||
metricIndexMap, exists := IndexDB.GetMetricIndexMap(endpoint)
|
||||
if !exists || metricIndexMap == nil {
|
||||
return fmt.Errorf("endpoint index not found")
|
||||
return fmt.Errorf("endpoint index doesn't found")
|
||||
}
|
||||
|
||||
metricIndexMap.Lock()
|
||||
|
@ -234,9 +239,13 @@ func ReadIndexFromFile(indexDir, endpoint string) (*MetricIndexMap, error) {
|
|||
}
|
||||
|
||||
func IndexList() []*model.Instance {
|
||||
var instances []*model.Instance
|
||||
activeIndexs, _ := report.GetAlive("index", "monapi")
|
||||
for _, instance := range activeIndexs {
|
||||
activeIndexes, err := report.GetAlive("index", "monapi")
|
||||
if err != nil {
|
||||
return []*model.Instance{}
|
||||
}
|
||||
|
||||
instances := make([]*model.Instance, len(activeIndexes))
|
||||
for _, instance := range activeIndexes {
|
||||
if instance.Identity != identity.Identity {
|
||||
instances = append(instances, instance)
|
||||
}
|
||||
|
@ -245,46 +254,50 @@ func IndexList() []*model.Instance {
|
|||
}
|
||||
|
||||
func getIndexFromRemote(instances []*model.Instance) error {
|
||||
filepath := fmt.Sprintf("db.tar.gz")
|
||||
var err error
|
||||
// Get the data
|
||||
perm := rand.Perm(len(instances))
|
||||
for i := range perm {
|
||||
url := fmt.Sprintf("http://%s:%s/api/index/idxfile", instances[perm[i]].Identity, instances[perm[i]].HTTPPort)
|
||||
resp, e := http.Get(url)
|
||||
if e != nil {
|
||||
err = fmt.Errorf("get index from:%s err:%v", url, e)
|
||||
logger.Warning(err)
|
||||
continue
|
||||
filepath := "db.tar.gz"
|
||||
request := func(idx int) error {
|
||||
url := fmt.Sprintf("http://%s:%s/api/index/idxfile", instances[idx].Identity, instances[idx].HTTPPort)
|
||||
resp, err := http.Get(url)
|
||||
if err != nil {
|
||||
logger.Warningf("get index from:%s err:%v", url, err)
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// Create the file
|
||||
out, e := os.Create(filepath)
|
||||
if e != nil {
|
||||
err = fmt.Errorf("create file:%s err:%v", filepath, e)
|
||||
logger.Warning(err)
|
||||
continue
|
||||
out, err := os.Create(filepath)
|
||||
if err != nil {
|
||||
logger.Warningf("create file:%s err:%v", filepath, err)
|
||||
return err
|
||||
}
|
||||
defer out.Close()
|
||||
|
||||
// Write the body to file
|
||||
_, err = io.Copy(out, resp.Body)
|
||||
if err != nil {
|
||||
logger.Warning(err)
|
||||
continue
|
||||
logger.Warningf("io.Copy error:%+v", err)
|
||||
return err
|
||||
}
|
||||
break
|
||||
return nil
|
||||
}
|
||||
|
||||
perm := rand.Perm(len(instances))
|
||||
var err error
|
||||
// retry
|
||||
for i := range perm {
|
||||
err = request(perm[i])
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
compress.UnTarGz(filepath, ".")
|
||||
if err != nil {
|
||||
if err := compress.UnTarGz(filepath, "."); err != nil {
|
||||
return err
|
||||
}
|
||||
//清空db目录
|
||||
err = os.Remove(filepath)
|
||||
|
||||
return err
|
||||
return os.Remove(filepath)
|
||||
}
|
||||
|
|
|
@ -54,21 +54,21 @@ func (m *MetricIndex) Set(item dataobj.IndexModel, counter string, now int64) {
|
|||
|
||||
type MetricIndexMap struct {
|
||||
sync.RWMutex
|
||||
Reported bool //用途:判断endpoint是否已成功上报给monapi
|
||||
Reported bool // 用于判断 endpoint 是否已成功上报给 monapi
|
||||
Data map[string]*MetricIndex
|
||||
}
|
||||
|
||||
func (m *MetricIndexMap) Clean(now, timeDuration int64, endpoint string) {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
for metric, metricIndex := range m.Data {
|
||||
//清理tagkv
|
||||
// 删除过期 tagkv
|
||||
if now-metricIndex.Ts > timeDuration {
|
||||
stats.Counter.Set("metric.clean", 1)
|
||||
delete(m.Data, metric)
|
||||
continue
|
||||
}
|
||||
|
||||
metricIndex.TagkvMap.Clean(now, timeDuration)
|
||||
metricIndex.CounterMap.Clean(now, timeDuration, endpoint, metric)
|
||||
}
|
||||
|
@ -77,6 +77,7 @@ func (m *MetricIndexMap) Clean(now, timeDuration int64, endpoint string) {
|
|||
func (m *MetricIndexMap) DelMetric(metric string) {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
delete(m.Data, metric)
|
||||
}
|
||||
|
||||
|
@ -98,13 +99,15 @@ func (m *MetricIndexMap) GetMetricIndex(metric string) (*MetricIndex, bool) {
|
|||
func (m *MetricIndexMap) SetMetricIndex(metric string, metricIndex *MetricIndex) {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
m.Data[metric] = metricIndex
|
||||
}
|
||||
|
||||
func (m *MetricIndexMap) GetMetrics() []string {
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
var metrics []string
|
||||
|
||||
metrics := make([]string, len(m.Data))
|
||||
for k := range m.Data {
|
||||
metrics = append(metrics, k)
|
||||
}
|
||||
|
@ -114,11 +117,13 @@ func (m *MetricIndexMap) GetMetrics() []string {
|
|||
func (m *MetricIndexMap) SetReported() {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
m.Reported = true
|
||||
}
|
||||
|
||||
func (m *MetricIndexMap) IsReported() bool {
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
|
||||
return m.Reported
|
||||
}
|
||||
|
|
|
@ -5,7 +5,7 @@ import (
|
|||
)
|
||||
|
||||
type TagPair struct {
|
||||
Key string `json:"tagk"` //json和变量不一致为了兼容前端
|
||||
Key string `json:"tagk"` // json 和变量不一致为了兼容前端
|
||||
Values []string `json:"tagv"`
|
||||
}
|
||||
|
||||
|
@ -29,14 +29,15 @@ func getMatchedTags(tagsMap map[string][]string, include, exclude []*TagPair) ma
|
|||
|
||||
if len(include) > 0 {
|
||||
for _, tagPair := range include {
|
||||
// include 中的 tagKey 在 tags 列表中不存在
|
||||
if _, exists := tagsMap[tagPair.Key]; !exists {
|
||||
// include中的tag key在tags列表中不存在
|
||||
return nil
|
||||
}
|
||||
|
||||
// tagKey 存在,初始化 map
|
||||
if _, found := inMap[tagPair.Key]; !found {
|
||||
inMap[tagPair.Key] = make(map[string]bool)
|
||||
}
|
||||
// 对存在的值进行标记
|
||||
for _, tagv := range tagPair.Values {
|
||||
inMap[tagPair.Key][tagv] = true
|
||||
}
|
||||
|
@ -54,10 +55,10 @@ func getMatchedTags(tagsMap map[string][]string, include, exclude []*TagPair) ma
|
|||
}
|
||||
}
|
||||
|
||||
fullmatch := make(map[string][]string)
|
||||
fullMatch := make(map[string][]string)
|
||||
for tagk, tagvs := range tagsMap {
|
||||
for _, tagv := range tagvs {
|
||||
// 排除必须排除的, exclude的优先级高于include
|
||||
// 排除必须排除的, exclude 的优先级高于 include
|
||||
if _, tagkExists := exMap[tagk]; tagkExists {
|
||||
if _, tagvExists := exMap[tagk][tagv]; tagvExists {
|
||||
continue
|
||||
|
@ -66,53 +67,56 @@ func getMatchedTags(tagsMap map[string][]string, include, exclude []*TagPair) ma
|
|||
// 包含必须包含的
|
||||
if _, tagkExists := inMap[tagk]; tagkExists {
|
||||
if _, tagvExists := inMap[tagk][tagv]; tagvExists {
|
||||
if _, found := fullmatch[tagk]; !found {
|
||||
fullmatch[tagk] = make([]string, 0)
|
||||
if _, found := fullMatch[tagk]; !found {
|
||||
fullMatch[tagk] = make([]string, 0)
|
||||
}
|
||||
fullmatch[tagk] = append(fullmatch[tagk], tagv)
|
||||
fullMatch[tagk] = append(fullMatch[tagk], tagv)
|
||||
}
|
||||
continue
|
||||
}
|
||||
// 除此之外全都包含
|
||||
if _, found := fullmatch[tagk]; !found {
|
||||
fullmatch[tagk] = make([]string, 0)
|
||||
if _, found := fullMatch[tagk]; !found {
|
||||
fullMatch[tagk] = make([]string, 0)
|
||||
}
|
||||
fullmatch[tagk] = append(fullmatch[tagk], tagv)
|
||||
fullMatch[tagk] = append(fullMatch[tagk], tagv)
|
||||
}
|
||||
}
|
||||
|
||||
return fullmatch
|
||||
return fullMatch
|
||||
}
|
||||
|
||||
// GetAllCounter returns all possible tags combination.
|
||||
// But not all of them will be in the CounterMaps.
|
||||
func GetAllCounter(tags []*TagPair) []string {
|
||||
if len(tags) == 0 {
|
||||
return []string{}
|
||||
}
|
||||
firstStruct := tags[0]
|
||||
firstList := make([]string, len(firstStruct.Values))
|
||||
|
||||
for i, v := range firstStruct.Values {
|
||||
firstList[i] = firstStruct.Key + "=" + v
|
||||
head := tags[0]
|
||||
firstList := make([]string, len(head.Values))
|
||||
|
||||
for i, v := range head.Values {
|
||||
firstList[i] = head.Key + "=" + v
|
||||
}
|
||||
|
||||
otherList := GetAllCounter(tags[1:])
|
||||
if len(otherList) == 0 {
|
||||
return firstList
|
||||
} else {
|
||||
retList := make([]string, len(otherList)*len(firstList))
|
||||
i := 0
|
||||
for _, firstV := range firstList {
|
||||
for _, otherV := range otherList {
|
||||
retList[i] = firstV + "," + otherV
|
||||
i++
|
||||
}
|
||||
}
|
||||
|
||||
return retList
|
||||
}
|
||||
|
||||
rest := make([]string, len(otherList)*len(firstList))
|
||||
i := 0
|
||||
for _, firstV := range firstList {
|
||||
for _, otherV := range otherList {
|
||||
rest[i] = firstV + "," + otherV
|
||||
i++
|
||||
}
|
||||
}
|
||||
|
||||
return rest
|
||||
}
|
||||
|
||||
//Check if can over limit
|
||||
// OverMaxLimit check whether it can over limit or not.
|
||||
func OverMaxLimit(tagMap map[string][]string, limit int) bool {
|
||||
multiRes := 1
|
||||
|
||||
|
|
|
@ -4,10 +4,10 @@ import (
|
|||
"sync"
|
||||
)
|
||||
|
||||
//TagKeys
|
||||
// TagKeys
|
||||
type TagkvIndex struct {
|
||||
sync.RWMutex
|
||||
Tagkv map[string]map[string]int64 `json:"tagkv"` //map[tagk]map[tagv]ts
|
||||
Tagkv map[string]map[string]int64 `json:"tagkv"` // map[tagk]map[tagv]ts
|
||||
}
|
||||
|
||||
func NewTagkvIndex() *TagkvIndex {
|
||||
|
@ -29,10 +29,10 @@ func (t *TagkvIndex) Set(tagk, tagv string, now int64) {
|
|||
func (t *TagkvIndex) GetTagkv() []*TagPair {
|
||||
t.RLock()
|
||||
defer t.RUnlock()
|
||||
tagkvs := []*TagPair{}
|
||||
|
||||
tagkvs := make([]*TagPair, len(t.Tagkv))
|
||||
for k, vm := range t.Tagkv {
|
||||
var vs []string
|
||||
vs := make([]string, len(vm))
|
||||
for v := range vm {
|
||||
vs = append(vs, v)
|
||||
}
|
||||
|
@ -49,14 +49,13 @@ func (t *TagkvIndex) GetTagkv() []*TagPair {
|
|||
func (t *TagkvIndex) GetTagkvMap() map[string][]string {
|
||||
t.RLock()
|
||||
defer t.RUnlock()
|
||||
tagkvs := make(map[string][]string)
|
||||
|
||||
tagkvs := make(map[string][]string)
|
||||
for k, vm := range t.Tagkv {
|
||||
var vs []string
|
||||
vs := make([]string, len(vm))
|
||||
for v := range vm {
|
||||
vs = append(vs, v)
|
||||
}
|
||||
|
||||
tagkvs[k] = vs
|
||||
}
|
||||
|
||||
|
|
|
@ -91,11 +91,10 @@ func GetTagPairs(c *gin.Context) {
|
|||
recv := EndpointMetricRecv{}
|
||||
errors.Dangerous(c.ShouldBindJSON(&recv))
|
||||
|
||||
resp := []*IndexTagkvResp{}
|
||||
|
||||
resp := make([]*IndexTagkvResp, 0)
|
||||
for _, metric := range recv.Metrics {
|
||||
tagkvFilter := make(map[string]map[string]struct{})
|
||||
tagkvs := []*cache.TagPair{}
|
||||
tagkvs := make([]*cache.TagPair, 0)
|
||||
|
||||
for _, endpoint := range recv.Endpoints {
|
||||
metricIndex, exists := cache.IndexDB.GetMetricIndex(endpoint, metric)
|
||||
|
@ -124,7 +123,7 @@ func GetTagPairs(c *gin.Context) {
|
|||
}
|
||||
|
||||
for tagk, tagvFilter := range tagkvFilter {
|
||||
tagvs := []string{}
|
||||
tagvs := make([]string, len(tagvFilter))
|
||||
for v := range tagvFilter {
|
||||
tagvs = append(tagvs, v)
|
||||
}
|
||||
|
@ -161,15 +160,13 @@ type GetIndexByFullTagsResp struct {
|
|||
|
||||
func GetIndexByFullTags(c *gin.Context) {
|
||||
stats.Counter.Set("counter.qp10s", 1)
|
||||
|
||||
recv := []GetIndexByFullTagsRecv{}
|
||||
recv := make([]GetIndexByFullTagsRecv, 0)
|
||||
errors.Dangerous(c.ShouldBindJSON(&recv))
|
||||
|
||||
tagFilter := make(map[string]struct{})
|
||||
tagsList := []string{}
|
||||
tagsList := make([]string, 0)
|
||||
|
||||
var resp []GetIndexByFullTagsResp
|
||||
|
||||
for _, r := range recv {
|
||||
metric := r.Metric
|
||||
tagkv := r.Tagkv
|
||||
|
@ -178,20 +175,19 @@ func GetIndexByFullTags(c *gin.Context) {
|
|||
|
||||
for _, endpoint := range r.Endpoints {
|
||||
if endpoint == "" {
|
||||
logger.Debugf("非法请求: endpoint字段缺失:%v", r)
|
||||
logger.Debugf("invalid request: lack of endpoint param:%v\n", r)
|
||||
stats.Counter.Set("query.counter.miss", 1)
|
||||
|
||||
continue
|
||||
}
|
||||
if metric == "" {
|
||||
logger.Debugf("非法请求: metric字段缺失:%v", r)
|
||||
logger.Debugf("invalid request: lack of metric param:%v\n", r)
|
||||
stats.Counter.Set("query.counter.miss", 1)
|
||||
continue
|
||||
}
|
||||
|
||||
metricIndex, exists := cache.IndexDB.GetMetricIndex(endpoint, metric)
|
||||
if !exists {
|
||||
logger.Debugf("not found index by endpoint:%s metric:%v", endpoint, metric)
|
||||
logger.Debugf("can't found index by endpoint:%s metric:%v\n", endpoint, metric)
|
||||
stats.Counter.Set("query.counter.miss", 1)
|
||||
continue
|
||||
}
|
||||
|
@ -207,10 +203,11 @@ func GetIndexByFullTags(c *gin.Context) {
|
|||
tags := cache.GetAllCounter(tagPairs)
|
||||
|
||||
for _, tag := range tags {
|
||||
//校验和tag有关的counter是否存在,如果一个指标,比如port.listen有name=uic,port=8056和name=hsp,port=8002。避免产生4个曲线
|
||||
// 校验和 tag 有关的 counter 是否存在
|
||||
// 如果一个指标,比如 port.listen 有 name=uic,port=8056 和 name=hsp,port=8002。避免产生 4 个曲线
|
||||
if _, exists := countersMap[tag]; !exists {
|
||||
stats.Counter.Set("query.counter.miss", 1)
|
||||
logger.Debugf("not found counters byendpoint:%s metric:%v tags:%v\n", endpoint, metric, tag)
|
||||
logger.Debugf("can't found counters by endpoint:%s metric:%v tags:%v\n", endpoint, metric, tag)
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -251,7 +248,7 @@ type XcludeResp struct {
|
|||
func GetIndexByClude(c *gin.Context) {
|
||||
stats.Counter.Set("xclude.qp10s", 1)
|
||||
|
||||
recv := []CludeRecv{}
|
||||
recv := make([]CludeRecv, 0)
|
||||
errors.Dangerous(c.ShouldBindJSON(&recv))
|
||||
|
||||
var resp []XcludeResp
|
||||
|
@ -262,19 +259,18 @@ func GetIndexByClude(c *gin.Context) {
|
|||
excludeList := r.Exclude
|
||||
step := 0
|
||||
dsType := ""
|
||||
tagList := []string{}
|
||||
tagList := make([]string, 0)
|
||||
tagFilter := make(map[string]struct{})
|
||||
|
||||
for _, endpoint := range r.Endpoints {
|
||||
if endpoint == "" {
|
||||
logger.Debugf("非法请求: endpoint字段缺失:%v", r)
|
||||
logger.Debugf("invalid request: lack of endpoint param:%v\n", r)
|
||||
stats.Counter.Set("xclude.miss", 1)
|
||||
continue
|
||||
}
|
||||
if metric == "" {
|
||||
logger.Debugf("非法请求: metric字段缺失:%v", r)
|
||||
logger.Debugf("invalid request: lack of metric param:%v\n", r)
|
||||
stats.Counter.Set("xclude.miss", 1)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -288,7 +284,7 @@ func GetIndexByClude(c *gin.Context) {
|
|||
Step: step,
|
||||
DsType: dsType,
|
||||
})
|
||||
logger.Debugf("not found index by endpoint:%s metric:%v\n", endpoint, metric)
|
||||
logger.Debugf("can't found index by endpoint:%s metric:%v\n", endpoint, metric)
|
||||
stats.Counter.Set("xclude.miss", 1)
|
||||
|
||||
continue
|
||||
|
@ -299,7 +295,8 @@ func GetIndexByClude(c *gin.Context) {
|
|||
dsType = metricIndex.DsType
|
||||
}
|
||||
|
||||
//校验实际tag组合成的counter是否存在,如果一个指标,比如port.listen有name=uic,port=8056和name=hsp,port=8002。避免产生4个曲线
|
||||
// 校验和 tag 有关的 counter 是否存在
|
||||
// 如果一个指标,比如 port.listen 有 name=uic,port=8056 和 name=hsp,port=8002。避免产生 4 个曲线
|
||||
counterMap := metricIndex.CounterMap.GetCounters()
|
||||
|
||||
var err error
|
||||
|
@ -325,13 +322,15 @@ func GetIndexByClude(c *gin.Context) {
|
|||
}
|
||||
|
||||
for _, tag := range tags {
|
||||
if tag == "" { //过滤掉空字符串
|
||||
//过滤掉空字符串
|
||||
if tag == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
//校验实际tag组合成的counter是否存在,如果一个指标,比如port.listen有name=uic,port=8056和name=hsp,port=8002。避免产生4个曲线
|
||||
// 校验和 tag 有关的 counter 是否存在
|
||||
// 如果一个指标,比如 port.listen 有 name=uic,port=8056 和 name=hsp,port=8002。避免产生 4 个曲线
|
||||
if _, exists := counterMap[tag]; !exists {
|
||||
logger.Debugf("not found counters by endpoint:%s metric:%v tags:%v\n", endpoint, metric, tag)
|
||||
logger.Debugf("can't found counters by endpoint:%s metric:%v tags:%v\n", endpoint, metric, tag)
|
||||
stats.Counter.Set("xclude.miss", 1)
|
||||
continue
|
||||
}
|
||||
|
|
|
@ -7,24 +7,21 @@ import (
|
|||
"github.com/didi/nightingale/src/modules/index/cache"
|
||||
"github.com/didi/nightingale/src/toolkits/stats"
|
||||
|
||||
"github.com/toolkits/pkg/concurrent/semaphore"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
var nsemaPush *semaphore.Semaphore
|
||||
|
||||
func (this *Index) Ping(args string, reply *string) error {
|
||||
func (idx *Index) Ping(args string, reply *string) error {
|
||||
*reply = args
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *Index) IncrPush(args []*dataobj.IndexModel, reply *dataobj.IndexResp) error {
|
||||
func (idx *Index) IncrPush(args []*dataobj.IndexModel, reply *dataobj.IndexResp) error {
|
||||
push(args, reply)
|
||||
stats.Counter.Set("index.incr.in", len(args))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *Index) Push(args []*dataobj.IndexModel, reply *dataobj.IndexResp) error {
|
||||
func (idx *Index) Push(args []*dataobj.IndexModel, reply *dataobj.IndexResp) error {
|
||||
push(args, reply)
|
||||
stats.Counter.Set("index.all.in", len(args))
|
||||
|
||||
|
|
Loading…
Reference in New Issue