diff --git a/.gitignore b/.gitignore index 35411be2..36942256 100644 --- a/.gitignore +++ b/.gitignore @@ -55,3 +55,4 @@ _test /src/modules/tsdb/tsdb tmp/ +main diff --git a/src/common/dataobj/query_item.go b/src/common/dataobj/query_item.go index 5230c335..415ae92f 100644 --- a/src/common/dataobj/query_item.go +++ b/src/common/dataobj/query_item.go @@ -85,6 +85,7 @@ type CludeRecv struct { type XcludeResp struct { Endpoint string `json:"endpoint"` + Nid string `json:"nid"` Metric string `json:"metric"` Tags []string `json:"tags"` Step int `json:"step"` diff --git a/src/modules/transfer/backend/m3db/benchmark/benchmark.yml b/src/modules/transfer/backend/m3db/benchmark/benchmark.yml new file mode 100644 index 00000000..ac4d3546 --- /dev/null +++ b/src/modules/transfer/backend/m3db/benchmark/benchmark.yml @@ -0,0 +1,25 @@ +enabled: true +name: "m3db" +namespace: "test" +seriesLimit: 0 +docsLimit: 0 +daysLimit: 7 # max query time +# https://m3db.github.io/m3/m3db/architecture/consistencylevels/ +writeConsistencyLevel: "majority" # one|majority|all +readConsistencyLevel: "unstrict_majority" # one|unstrict_majority|majority|all +config: + service: + # KV environment, zone, and service from which to write/read KV data (placement + # and configuration). Leave these as the default values unless you know what + # you're doing. + env: default_env + zone: embedded + service: m3db + etcdClusters: + - zone: embedded + endpoints: + - 10.178.24.87:2379 + tls: + caCrtPath: /opt/run/etcd/certs/root/ca.pem + crtPath: /opt/run/etcd/certs/output/etcd-client.pem + keyPath: /opt/run/etcd/certs/output/etcd-client-key.pem diff --git a/src/modules/transfer/backend/m3db/benchmark/main.go b/src/modules/transfer/backend/m3db/benchmark/main.go new file mode 100644 index 00000000..7d27e5ae --- /dev/null +++ b/src/modules/transfer/backend/m3db/benchmark/main.go @@ -0,0 +1,114 @@ +package main + +import ( + "fmt" + "io/ioutil" + "log" + "math/rand" + "os" + "runtime" + "strconv" + "time" + + "github.com/didi/nightingale/src/common/dataobj" + "github.com/didi/nightingale/src/modules/transfer/backend/m3db" + "github.com/toolkits/pkg/concurrent/semaphore" + "gopkg.in/yaml.v2" +) + +func getConf() m3db.M3dbSection { + c := m3db.M3dbSection{} + b, err := ioutil.ReadFile("benchmark.yml") + if err != nil { + log.Fatalf("readfile benchmark.yml err %s", err) + } + err = yaml.Unmarshal(b, &c) + if err != nil { + log.Fatalf("Unmarshal: %v", err) + } + + return c +} + +func main() { + runtime.GOMAXPROCS(runtime.NumCPU()) + + cfg := getConf() + cli, err := m3db.NewClient(cfg) + if err != nil { + log.Fatalf("newclient err %s", err) + } + defer cli.Close() + + if len(os.Args) != 2 { + fmt.Printf("usage: %s \n", os.Args[0]) + os.Exit(1) + } + qps, err := strconv.Atoi(os.Args[1]) + if err != nil || qps <= 0 { + fmt.Printf("usage: %s \n", os.Args[0]) + os.Exit(1) + } + + t0 := time.Duration(float64(100000*time.Second) / float64(qps)) + + log.Printf("qps %d (10w per %.3fs)", qps, float64(t0)/float64(time.Second)) + + sema := semaphore.NewSemaphore(100) + t1 := time.NewTicker(t0) + log.Println("begin...") + + for { + <-t1.C + for i := 0; i < 255; i++ { + for j := 0; j < 4; j++ { + endpoint := "192.168." + strconv.Itoa(i) + "." + strconv.Itoa(j) + for k := 0; k < 1; k++ { //1000*100=10W + metric := "metric." + strconv.Itoa(k) + sema.Acquire() + go func(endpoint, metric string) { + defer sema.Release() + items := getTransferItems(endpoint, metric) + //log.Println(items[0]) + start := time.Now().UnixNano() + err := cli.Push(items) + if err != nil { + fmt.Println("err:", err) + } else { + //fmt.Println("resp:", resp) + } + log.Println((time.Now().UnixNano() - start) / 1000000) + }(endpoint, metric) + } + } + } + log.Println("push..") + } +} + +func getTransferItems(endpoint, metric string) []*dataobj.MetricValue { + ret := []*dataobj.MetricValue{} + now := time.Now().Unix() + ts := now - now%10 // 对齐时间戳 + r1 := rand.Intn(100) + + l := rand.Intn(6) * 100 + if l == 0 { + l = 100 + } + + for i := 0; i < 100; i++ { + ret = append(ret, &dataobj.MetricValue{ + Endpoint: endpoint, + Metric: metric, + TagsMap: map[string]string{ + "errno": fmt.Sprintf("tsdb.%d", i), + }, + Value: float64(r1), + Timestamp: ts, + CounterType: "GAUGE", + Step: 10, + }) + } + return ret +} diff --git a/src/modules/transfer/backend/m3db/convert.go b/src/modules/transfer/backend/m3db/convert.go index a33c6f8d..6c17ff88 100644 --- a/src/modules/transfer/backend/m3db/convert.go +++ b/src/modules/transfer/backend/m3db/convert.go @@ -96,8 +96,10 @@ func xcludeResp(iter ident.TagIterator) *dataobj.XcludeResp { switch key := tag.Name.String(); key { case METRIC_NAME: ret.Metric = tag.Value.String() - case ENDPOINT_NAME, NID_NAME: + case ENDPOINT_NAME: ret.Endpoint = tag.Value.String() + case NID_NAME: + ret.Nid = tag.Value.String() default: tags[key] = tag.Value.String() } diff --git a/src/modules/transfer/backend/m3db/m3db.go b/src/modules/transfer/backend/m3db/m3db.go index 6c8c680e..b7f22f23 100644 --- a/src/modules/transfer/backend/m3db/m3db.go +++ b/src/modules/transfer/backend/m3db/m3db.go @@ -2,7 +2,6 @@ package m3db import ( "fmt" - "log" "sync" "time" @@ -226,13 +225,14 @@ func (p *Client) queryIndexByClude(session client.Session, input dataobj.CludeRe _, _, tagIter := iter.Current() resp := xcludeResp(tagIter) - if len(resp.Tags) > 0 && len(resp.Tags[0]) > 0 { - key := fmt.Sprintf("%s-%s", resp.Endpoint, resp.Metric) - if v, ok := respMap[key]; ok { + key := fmt.Sprintf("%s-%s", resp.Endpoint, resp.Metric) + + if v, ok := respMap[key]; ok { + if len(resp.Tags) > 0 && len(resp.Tags[0]) > 0 { v.Tags = append(v.Tags, resp.Tags[0]) - } else { - respMap[key] = resp } + } else { + respMap[key] = resp } } @@ -267,7 +267,6 @@ func (p *Client) QueryIndexByFullTags(inputs []dataobj.IndexByFullTagsRecv) []da } func (p *Client) queryIndexByFullTags(session client.Session, input dataobj.IndexByFullTagsRecv) (ret dataobj.IndexByFullTagsResp) { - log.Printf("entering queryIndexByFullTags") ret = dataobj.IndexByFullTagsResp{ Metric: input.Metric, @@ -278,7 +277,6 @@ func (p *Client) queryIndexByFullTags(session client.Session, input dataobj.Inde query, opts := p.config.queryIndexByFullTagsOptions(input) if query.Query.Equal(idx.NewAllQuery()) { ret.Endpoints = input.Endpoints - log.Printf("all query") return } @@ -435,7 +433,6 @@ func seriesIterWalk(iter encoding.SeriesIterator) (out *dataobj.TsdbQueryRespons values := []*dataobj.RRDData{} for iter.Next() { dp, _, _ := iter.Current() - logger.Printf("%s: %v", dp.Timestamp.String(), dp.Value) values = append(values, dataobj.NewRRDData(dp.Timestamp.Unix(), dp.Value)) } if err := iter.Err(); err != nil { diff --git a/src/modules/transfer/backend/m3db/query.go b/src/modules/transfer/backend/m3db/query.go index eb20d210..db2464d7 100644 --- a/src/modules/transfer/backend/m3db/query.go +++ b/src/modules/transfer/backend/m3db/query.go @@ -62,6 +62,9 @@ func endpointsQuery(nids, endpoints []string) idx.Query { for _, v := range nids { q = append(q, idx.NewTermQuery([]byte(NID_NAME), []byte(v))) } + if len(q) == 1 { + return q[0] + } return idx.NewDisjunctionQuery(q...) } @@ -70,6 +73,9 @@ func endpointsQuery(nids, endpoints []string) idx.Query { for _, v := range endpoints { q = append(q, idx.NewTermQuery([]byte(ENDPOINT_NAME), []byte(v))) } + if len(q) == 1 { + return q[0] + } return idx.NewDisjunctionQuery(q...) } @@ -82,14 +88,19 @@ func counterQuery(counters []string) idx.Query { for _, v := range counters { items := strings.SplitN(v, "/", 2) - if len(items) != 2 { + var metric, tag string + if len(items) == 2 { + metric, tag = items[0], items[1] + } else if len(items) == 1 && len(items[0]) > 0 { + metric = items[0] + } else { continue } - tagMap := dataobj.DictedTagstring(items[1]) + tagMap := dataobj.DictedTagstring(tag) q2 := []idx.Query{} - q2 = append(q2, idx.NewTermQuery([]byte(METRIC_NAME), []byte(items[0]))) + q2 = append(q2, idx.NewTermQuery([]byte(METRIC_NAME), []byte(metric))) for k, v := range tagMap { q2 = append(q2, idx.NewTermQuery([]byte(k), []byte(v))) @@ -183,6 +194,8 @@ func (cfg M3dbSection) queryIndexByCludeOptions(input dataobj.CludeRecv) (index. if len(q) == 0 { query = index.Query{idx.NewAllQuery()} + } else if len(q) == 1 { + query = index.Query{q[0]} } else { query = index.Query{idx.NewConjunctionQuery(q...)} }