* bugfix: transfer ignore counter when tag is empty

* add m3db benchmark
This commit is contained in:
yubo 2020-11-17 22:07:50 +08:00 committed by GitHub
parent bddd93cd80
commit d78301567b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 166 additions and 13 deletions

1
.gitignore vendored
View File

@ -55,3 +55,4 @@ _test
/src/modules/tsdb/tsdb
tmp/
main

View File

@ -85,6 +85,7 @@ type CludeRecv struct {
type XcludeResp struct {
Endpoint string `json:"endpoint"`
Nid string `json:"nid"`
Metric string `json:"metric"`
Tags []string `json:"tags"`
Step int `json:"step"`

View File

@ -0,0 +1,25 @@
enabled: true
name: "m3db"
namespace: "test"
seriesLimit: 0
docsLimit: 0
daysLimit: 7 # max query time
# https://m3db.github.io/m3/m3db/architecture/consistencylevels/
writeConsistencyLevel: "majority" # one|majority|all
readConsistencyLevel: "unstrict_majority" # one|unstrict_majority|majority|all
config:
service:
# KV environment, zone, and service from which to write/read KV data (placement
# and configuration). Leave these as the default values unless you know what
# you're doing.
env: default_env
zone: embedded
service: m3db
etcdClusters:
- zone: embedded
endpoints:
- 10.178.24.87:2379
tls:
caCrtPath: /opt/run/etcd/certs/root/ca.pem
crtPath: /opt/run/etcd/certs/output/etcd-client.pem
keyPath: /opt/run/etcd/certs/output/etcd-client-key.pem

View File

@ -0,0 +1,114 @@
package main
import (
"fmt"
"io/ioutil"
"log"
"math/rand"
"os"
"runtime"
"strconv"
"time"
"github.com/didi/nightingale/src/common/dataobj"
"github.com/didi/nightingale/src/modules/transfer/backend/m3db"
"github.com/toolkits/pkg/concurrent/semaphore"
"gopkg.in/yaml.v2"
)
func getConf() m3db.M3dbSection {
c := m3db.M3dbSection{}
b, err := ioutil.ReadFile("benchmark.yml")
if err != nil {
log.Fatalf("readfile benchmark.yml err %s", err)
}
err = yaml.Unmarshal(b, &c)
if err != nil {
log.Fatalf("Unmarshal: %v", err)
}
return c
}
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
cfg := getConf()
cli, err := m3db.NewClient(cfg)
if err != nil {
log.Fatalf("newclient err %s", err)
}
defer cli.Close()
if len(os.Args) != 2 {
fmt.Printf("usage: %s <QPS>\n", os.Args[0])
os.Exit(1)
}
qps, err := strconv.Atoi(os.Args[1])
if err != nil || qps <= 0 {
fmt.Printf("usage: %s <QPS>\n", os.Args[0])
os.Exit(1)
}
t0 := time.Duration(float64(100000*time.Second) / float64(qps))
log.Printf("qps %d (10w per %.3fs)", qps, float64(t0)/float64(time.Second))
sema := semaphore.NewSemaphore(100)
t1 := time.NewTicker(t0)
log.Println("begin...")
for {
<-t1.C
for i := 0; i < 255; i++ {
for j := 0; j < 4; j++ {
endpoint := "192.168." + strconv.Itoa(i) + "." + strconv.Itoa(j)
for k := 0; k < 1; k++ { //1000*100=10W
metric := "metric." + strconv.Itoa(k)
sema.Acquire()
go func(endpoint, metric string) {
defer sema.Release()
items := getTransferItems(endpoint, metric)
//log.Println(items[0])
start := time.Now().UnixNano()
err := cli.Push(items)
if err != nil {
fmt.Println("err:", err)
} else {
//fmt.Println("resp:", resp)
}
log.Println((time.Now().UnixNano() - start) / 1000000)
}(endpoint, metric)
}
}
}
log.Println("push..")
}
}
func getTransferItems(endpoint, metric string) []*dataobj.MetricValue {
ret := []*dataobj.MetricValue{}
now := time.Now().Unix()
ts := now - now%10 // 对齐时间戳
r1 := rand.Intn(100)
l := rand.Intn(6) * 100
if l == 0 {
l = 100
}
for i := 0; i < 100; i++ {
ret = append(ret, &dataobj.MetricValue{
Endpoint: endpoint,
Metric: metric,
TagsMap: map[string]string{
"errno": fmt.Sprintf("tsdb.%d", i),
},
Value: float64(r1),
Timestamp: ts,
CounterType: "GAUGE",
Step: 10,
})
}
return ret
}

View File

@ -96,8 +96,10 @@ func xcludeResp(iter ident.TagIterator) *dataobj.XcludeResp {
switch key := tag.Name.String(); key {
case METRIC_NAME:
ret.Metric = tag.Value.String()
case ENDPOINT_NAME, NID_NAME:
case ENDPOINT_NAME:
ret.Endpoint = tag.Value.String()
case NID_NAME:
ret.Nid = tag.Value.String()
default:
tags[key] = tag.Value.String()
}

View File

@ -2,7 +2,6 @@ package m3db
import (
"fmt"
"log"
"sync"
"time"
@ -226,13 +225,14 @@ func (p *Client) queryIndexByClude(session client.Session, input dataobj.CludeRe
_, _, tagIter := iter.Current()
resp := xcludeResp(tagIter)
if len(resp.Tags) > 0 && len(resp.Tags[0]) > 0 {
key := fmt.Sprintf("%s-%s", resp.Endpoint, resp.Metric)
if v, ok := respMap[key]; ok {
key := fmt.Sprintf("%s-%s", resp.Endpoint, resp.Metric)
if v, ok := respMap[key]; ok {
if len(resp.Tags) > 0 && len(resp.Tags[0]) > 0 {
v.Tags = append(v.Tags, resp.Tags[0])
} else {
respMap[key] = resp
}
} else {
respMap[key] = resp
}
}
@ -267,7 +267,6 @@ func (p *Client) QueryIndexByFullTags(inputs []dataobj.IndexByFullTagsRecv) []da
}
func (p *Client) queryIndexByFullTags(session client.Session, input dataobj.IndexByFullTagsRecv) (ret dataobj.IndexByFullTagsResp) {
log.Printf("entering queryIndexByFullTags")
ret = dataobj.IndexByFullTagsResp{
Metric: input.Metric,
@ -278,7 +277,6 @@ func (p *Client) queryIndexByFullTags(session client.Session, input dataobj.Inde
query, opts := p.config.queryIndexByFullTagsOptions(input)
if query.Query.Equal(idx.NewAllQuery()) {
ret.Endpoints = input.Endpoints
log.Printf("all query")
return
}
@ -435,7 +433,6 @@ func seriesIterWalk(iter encoding.SeriesIterator) (out *dataobj.TsdbQueryRespons
values := []*dataobj.RRDData{}
for iter.Next() {
dp, _, _ := iter.Current()
logger.Printf("%s: %v", dp.Timestamp.String(), dp.Value)
values = append(values, dataobj.NewRRDData(dp.Timestamp.Unix(), dp.Value))
}
if err := iter.Err(); err != nil {

View File

@ -62,6 +62,9 @@ func endpointsQuery(nids, endpoints []string) idx.Query {
for _, v := range nids {
q = append(q, idx.NewTermQuery([]byte(NID_NAME), []byte(v)))
}
if len(q) == 1 {
return q[0]
}
return idx.NewDisjunctionQuery(q...)
}
@ -70,6 +73,9 @@ func endpointsQuery(nids, endpoints []string) idx.Query {
for _, v := range endpoints {
q = append(q, idx.NewTermQuery([]byte(ENDPOINT_NAME), []byte(v)))
}
if len(q) == 1 {
return q[0]
}
return idx.NewDisjunctionQuery(q...)
}
@ -82,14 +88,19 @@ func counterQuery(counters []string) idx.Query {
for _, v := range counters {
items := strings.SplitN(v, "/", 2)
if len(items) != 2 {
var metric, tag string
if len(items) == 2 {
metric, tag = items[0], items[1]
} else if len(items) == 1 && len(items[0]) > 0 {
metric = items[0]
} else {
continue
}
tagMap := dataobj.DictedTagstring(items[1])
tagMap := dataobj.DictedTagstring(tag)
q2 := []idx.Query{}
q2 = append(q2, idx.NewTermQuery([]byte(METRIC_NAME), []byte(items[0])))
q2 = append(q2, idx.NewTermQuery([]byte(METRIC_NAME), []byte(metric)))
for k, v := range tagMap {
q2 = append(q2, idx.NewTermQuery([]byte(k), []byte(v)))
@ -183,6 +194,8 @@ func (cfg M3dbSection) queryIndexByCludeOptions(input dataobj.CludeRecv) (index.
if len(q) == 0 {
query = index.Query{idx.NewAllQuery()}
} else if len(q) == 1 {
query = index.Query{q[0]}
} else {
query = index.Query{idx.NewConjunctionQuery(q...)}
}