Merge branch 'master' of https://github.com/didi/nightingale
This commit is contained in:
commit
227652ec8f
|
@ -55,3 +55,4 @@ _test
|
|||
/src/modules/tsdb/tsdb
|
||||
|
||||
tmp/
|
||||
main
|
||||
|
|
|
@ -85,6 +85,7 @@ type CludeRecv struct {
|
|||
|
||||
type XcludeResp struct {
|
||||
Endpoint string `json:"endpoint"`
|
||||
Nid string `json:"nid"`
|
||||
Metric string `json:"metric"`
|
||||
Tags []string `json:"tags"`
|
||||
Step int `json:"step"`
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
enabled: true
|
||||
name: "m3db"
|
||||
namespace: "test"
|
||||
seriesLimit: 0
|
||||
docsLimit: 0
|
||||
daysLimit: 7 # max query time
|
||||
# https://m3db.github.io/m3/m3db/architecture/consistencylevels/
|
||||
writeConsistencyLevel: "majority" # one|majority|all
|
||||
readConsistencyLevel: "unstrict_majority" # one|unstrict_majority|majority|all
|
||||
config:
|
||||
service:
|
||||
# KV environment, zone, and service from which to write/read KV data (placement
|
||||
# and configuration). Leave these as the default values unless you know what
|
||||
# you're doing.
|
||||
env: default_env
|
||||
zone: embedded
|
||||
service: m3db
|
||||
etcdClusters:
|
||||
- zone: embedded
|
||||
endpoints:
|
||||
- 10.178.24.87:2379
|
||||
tls:
|
||||
caCrtPath: /opt/run/etcd/certs/root/ca.pem
|
||||
crtPath: /opt/run/etcd/certs/output/etcd-client.pem
|
||||
keyPath: /opt/run/etcd/certs/output/etcd-client-key.pem
|
|
@ -0,0 +1,114 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"math/rand"
|
||||
"os"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/didi/nightingale/src/common/dataobj"
|
||||
"github.com/didi/nightingale/src/modules/transfer/backend/m3db"
|
||||
"github.com/toolkits/pkg/concurrent/semaphore"
|
||||
"gopkg.in/yaml.v2"
|
||||
)
|
||||
|
||||
func getConf() m3db.M3dbSection {
|
||||
c := m3db.M3dbSection{}
|
||||
b, err := ioutil.ReadFile("benchmark.yml")
|
||||
if err != nil {
|
||||
log.Fatalf("readfile benchmark.yml err %s", err)
|
||||
}
|
||||
err = yaml.Unmarshal(b, &c)
|
||||
if err != nil {
|
||||
log.Fatalf("Unmarshal: %v", err)
|
||||
}
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func main() {
|
||||
runtime.GOMAXPROCS(runtime.NumCPU())
|
||||
|
||||
cfg := getConf()
|
||||
cli, err := m3db.NewClient(cfg)
|
||||
if err != nil {
|
||||
log.Fatalf("newclient err %s", err)
|
||||
}
|
||||
defer cli.Close()
|
||||
|
||||
if len(os.Args) != 2 {
|
||||
fmt.Printf("usage: %s <QPS>\n", os.Args[0])
|
||||
os.Exit(1)
|
||||
}
|
||||
qps, err := strconv.Atoi(os.Args[1])
|
||||
if err != nil || qps <= 0 {
|
||||
fmt.Printf("usage: %s <QPS>\n", os.Args[0])
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
t0 := time.Duration(float64(100000*time.Second) / float64(qps))
|
||||
|
||||
log.Printf("qps %d (10w per %.3fs)", qps, float64(t0)/float64(time.Second))
|
||||
|
||||
sema := semaphore.NewSemaphore(100)
|
||||
t1 := time.NewTicker(t0)
|
||||
log.Println("begin...")
|
||||
|
||||
for {
|
||||
<-t1.C
|
||||
for i := 0; i < 255; i++ {
|
||||
for j := 0; j < 4; j++ {
|
||||
endpoint := "192.168." + strconv.Itoa(i) + "." + strconv.Itoa(j)
|
||||
for k := 0; k < 1; k++ { //1000*100=10W
|
||||
metric := "metric." + strconv.Itoa(k)
|
||||
sema.Acquire()
|
||||
go func(endpoint, metric string) {
|
||||
defer sema.Release()
|
||||
items := getTransferItems(endpoint, metric)
|
||||
//log.Println(items[0])
|
||||
start := time.Now().UnixNano()
|
||||
err := cli.Push(items)
|
||||
if err != nil {
|
||||
fmt.Println("err:", err)
|
||||
} else {
|
||||
//fmt.Println("resp:", resp)
|
||||
}
|
||||
log.Println((time.Now().UnixNano() - start) / 1000000)
|
||||
}(endpoint, metric)
|
||||
}
|
||||
}
|
||||
}
|
||||
log.Println("push..")
|
||||
}
|
||||
}
|
||||
|
||||
func getTransferItems(endpoint, metric string) []*dataobj.MetricValue {
|
||||
ret := []*dataobj.MetricValue{}
|
||||
now := time.Now().Unix()
|
||||
ts := now - now%10 // 对齐时间戳
|
||||
r1 := rand.Intn(100)
|
||||
|
||||
l := rand.Intn(6) * 100
|
||||
if l == 0 {
|
||||
l = 100
|
||||
}
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
ret = append(ret, &dataobj.MetricValue{
|
||||
Endpoint: endpoint,
|
||||
Metric: metric,
|
||||
TagsMap: map[string]string{
|
||||
"errno": fmt.Sprintf("tsdb.%d", i),
|
||||
},
|
||||
Value: float64(r1),
|
||||
Timestamp: ts,
|
||||
CounterType: "GAUGE",
|
||||
Step: 10,
|
||||
})
|
||||
}
|
||||
return ret
|
||||
}
|
|
@ -96,8 +96,10 @@ func xcludeResp(iter ident.TagIterator) *dataobj.XcludeResp {
|
|||
switch key := tag.Name.String(); key {
|
||||
case METRIC_NAME:
|
||||
ret.Metric = tag.Value.String()
|
||||
case ENDPOINT_NAME, NID_NAME:
|
||||
case ENDPOINT_NAME:
|
||||
ret.Endpoint = tag.Value.String()
|
||||
case NID_NAME:
|
||||
ret.Nid = tag.Value.String()
|
||||
default:
|
||||
tags[key] = tag.Value.String()
|
||||
}
|
||||
|
@ -113,13 +115,18 @@ func xcludeResp(iter ident.TagIterator) *dataobj.XcludeResp {
|
|||
}
|
||||
|
||||
func aggregateResp(data []*dataobj.TsdbQueryResponse, opts dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse {
|
||||
if len(data) < 2 || opts.AggrFunc == "" {
|
||||
if len(data) < 2 {
|
||||
return data
|
||||
}
|
||||
|
||||
// resample the data
|
||||
for _, v := range data {
|
||||
v.Values = resample(v.Values, opts.Start, opts.End, int64(opts.Step), opts.AggrFunc)
|
||||
v.Values = resample(v.Values, opts.Start, opts.End, int64(opts.Step), opts.ConsolFunc)
|
||||
}
|
||||
|
||||
// aggregateResp
|
||||
if opts.AggrFunc == "" {
|
||||
return data
|
||||
}
|
||||
|
||||
// 没有聚合 tag, 或者曲线没有其他 tags, 直接所有曲线进行计算
|
||||
|
@ -180,7 +187,7 @@ func aggregateResp(data []*dataobj.TsdbQueryResponse, opts dataobj.QueryDataForU
|
|||
return aggrDatas
|
||||
}
|
||||
|
||||
func resample(data []*dataobj.RRDData, start, end, step int64, aggrFunc string) []*dataobj.RRDData {
|
||||
func resample(data []*dataobj.RRDData, start, end, step int64, consolFunc string) []*dataobj.RRDData {
|
||||
l := int((end - start) / step)
|
||||
if l <= 0 {
|
||||
return []*dataobj.RRDData{}
|
||||
|
@ -190,7 +197,7 @@ func resample(data []*dataobj.RRDData, start, end, step int64, aggrFunc string)
|
|||
|
||||
ts := start
|
||||
if t := data[0].Timestamp; t > start {
|
||||
ts = t - t%step
|
||||
ts = t
|
||||
}
|
||||
|
||||
j := 0
|
||||
|
@ -218,7 +225,7 @@ func resample(data []*dataobj.RRDData, start, end, step int64, aggrFunc string)
|
|||
}
|
||||
ret = append(ret, &dataobj.RRDData{
|
||||
Timestamp: ts,
|
||||
Value: aggrData(aggrFunc, get()),
|
||||
Value: aggrData(consolFunc, get()),
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -232,11 +239,11 @@ func aggrData(fn string, data []dataobj.JsonFloat) dataobj.JsonFloat {
|
|||
switch fn {
|
||||
case "sum":
|
||||
return sum(data)
|
||||
case "avg":
|
||||
case "avg", "AVERAGE":
|
||||
return avg(data)
|
||||
case "max":
|
||||
case "max", "MAX":
|
||||
return max(data)
|
||||
case "min":
|
||||
case "min", "MIN":
|
||||
return min(data)
|
||||
// case "last":
|
||||
default:
|
||||
|
|
|
@ -2,7 +2,6 @@ package m3db
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -37,6 +36,7 @@ type M3dbSection struct {
|
|||
DocsLimit int `yaml:"docsLimit"`
|
||||
MinStep int `yaml:"minStep"`
|
||||
Config client.Configuration `yaml:",inline"`
|
||||
timeLimit int64 `yaml:"-"`
|
||||
}
|
||||
|
||||
type Client struct {
|
||||
|
@ -66,6 +66,8 @@ func NewClient(cfg M3dbSection) (*Client, error) {
|
|||
cfg.MinStep = 1
|
||||
}
|
||||
|
||||
cfg.timeLimit = int64(86400 * cfg.DaysLimit)
|
||||
|
||||
ret := &Client{
|
||||
namespace: cfg.Namespace,
|
||||
config: &cfg,
|
||||
|
@ -226,13 +228,14 @@ func (p *Client) queryIndexByClude(session client.Session, input dataobj.CludeRe
|
|||
_, _, tagIter := iter.Current()
|
||||
|
||||
resp := xcludeResp(tagIter)
|
||||
if len(resp.Tags) > 0 && len(resp.Tags[0]) > 0 {
|
||||
key := fmt.Sprintf("%s-%s", resp.Endpoint, resp.Metric)
|
||||
if v, ok := respMap[key]; ok {
|
||||
key := fmt.Sprintf("%s-%s", resp.Endpoint, resp.Metric)
|
||||
|
||||
if v, ok := respMap[key]; ok {
|
||||
if len(resp.Tags) > 0 && len(resp.Tags[0]) > 0 {
|
||||
v.Tags = append(v.Tags, resp.Tags[0])
|
||||
} else {
|
||||
respMap[key] = resp
|
||||
}
|
||||
} else {
|
||||
respMap[key] = resp
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -267,7 +270,6 @@ func (p *Client) QueryIndexByFullTags(inputs []dataobj.IndexByFullTagsRecv) []da
|
|||
}
|
||||
|
||||
func (p *Client) queryIndexByFullTags(session client.Session, input dataobj.IndexByFullTagsRecv) (ret dataobj.IndexByFullTagsResp) {
|
||||
log.Printf("entering queryIndexByFullTags")
|
||||
|
||||
ret = dataobj.IndexByFullTagsResp{
|
||||
Metric: input.Metric,
|
||||
|
@ -278,7 +280,6 @@ func (p *Client) queryIndexByFullTags(session client.Session, input dataobj.Inde
|
|||
query, opts := p.config.queryIndexByFullTagsOptions(input)
|
||||
if query.Query.Equal(idx.NewAllQuery()) {
|
||||
ret.Endpoints = input.Endpoints
|
||||
log.Printf("all query")
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -435,7 +436,6 @@ func seriesIterWalk(iter encoding.SeriesIterator) (out *dataobj.TsdbQueryRespons
|
|||
values := []*dataobj.RRDData{}
|
||||
for iter.Next() {
|
||||
dp, _, _ := iter.Current()
|
||||
logger.Printf("%s: %v", dp.Timestamp.String(), dp.Value)
|
||||
values = append(values, dataobj.NewRRDData(dp.Timestamp.Unix(), dp.Value))
|
||||
}
|
||||
if err := iter.Err(); err != nil {
|
||||
|
@ -479,30 +479,29 @@ func (cfg M3dbSection) validateQueryDataForUI(in *dataobj.QueryDataForUI) (err e
|
|||
return fmt.Errorf("%s is invalid aggrfunc", in.AggrFunc)
|
||||
}
|
||||
|
||||
if err := cfg.validateTime(in.Start, in.End, &in.Step); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cfg M3dbSection) validateTime(start, end int64, step *int) error {
|
||||
if end <= start {
|
||||
return fmt.Errorf("query time range is invalid end %d <= start %d", end, start)
|
||||
if in.End <= in.Start {
|
||||
return fmt.Errorf("query time range is invalid end %d <= start %d", in.End, in.Start)
|
||||
}
|
||||
|
||||
if cfg.DaysLimit > 0 {
|
||||
if days := int((end - start) / 86400); days > cfg.DaysLimit {
|
||||
return fmt.Errorf("query time reange in invalid, daysLimit(%d/%d)", days, cfg.DaysLimit)
|
||||
if t := in.End - cfg.timeLimit; in.Start < t {
|
||||
// return fmt.Errorf("query time reange in invalid, daysLimit(%d/%d)", days, cfg.DaysLimit)
|
||||
in.Start = t
|
||||
}
|
||||
}
|
||||
|
||||
if *step == 0 {
|
||||
*step = int((end - start) / MAX_PONINTS)
|
||||
if in.Step > 0 {
|
||||
if n := (in.End - in.Start) / int64(in.Step); n > MAX_PONINTS {
|
||||
in.Step = 0
|
||||
}
|
||||
}
|
||||
|
||||
if *step > cfg.MinStep {
|
||||
*step = cfg.MinStep
|
||||
if in.Step <= 0 {
|
||||
in.Step = int((in.End - in.Start) / MAX_PONINTS)
|
||||
}
|
||||
|
||||
if in.Step < cfg.MinStep {
|
||||
in.Step = cfg.MinStep
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ func (cfg M3dbSection) queryDataOptions(inputs []dataobj.QueryData) (index.Query
|
|||
return index.Query{idx.NewDisjunctionQuery(q...)},
|
||||
index.QueryOptions{
|
||||
StartInclusive: time.Unix(inputs[0].Start, 0),
|
||||
EndExclusive: time.Unix(inputs[0].End, 0),
|
||||
EndExclusive: time.Unix(inputs[0].End+1, 0),
|
||||
SeriesLimit: cfg.SeriesLimit,
|
||||
DocsLimit: cfg.DocsLimit,
|
||||
}
|
||||
|
@ -38,7 +38,7 @@ func (cfg M3dbSection) queryDataUIOptions(input dataobj.QueryDataForUI) (index.Q
|
|||
return index.Query{idx.NewConjunctionQuery(q1, q2, q3)},
|
||||
index.QueryOptions{
|
||||
StartInclusive: time.Unix(input.Start, 0),
|
||||
EndExclusive: time.Unix(input.End, 0),
|
||||
EndExclusive: time.Unix(input.End+1, 0),
|
||||
SeriesLimit: cfg.SeriesLimit,
|
||||
DocsLimit: cfg.DocsLimit,
|
||||
}
|
||||
|
@ -62,6 +62,9 @@ func endpointsQuery(nids, endpoints []string) idx.Query {
|
|||
for _, v := range nids {
|
||||
q = append(q, idx.NewTermQuery([]byte(NID_NAME), []byte(v)))
|
||||
}
|
||||
if len(q) == 1 {
|
||||
return q[0]
|
||||
}
|
||||
return idx.NewDisjunctionQuery(q...)
|
||||
}
|
||||
|
||||
|
@ -70,6 +73,9 @@ func endpointsQuery(nids, endpoints []string) idx.Query {
|
|||
for _, v := range endpoints {
|
||||
q = append(q, idx.NewTermQuery([]byte(ENDPOINT_NAME), []byte(v)))
|
||||
}
|
||||
if len(q) == 1 {
|
||||
return q[0]
|
||||
}
|
||||
return idx.NewDisjunctionQuery(q...)
|
||||
}
|
||||
|
||||
|
@ -82,14 +88,19 @@ func counterQuery(counters []string) idx.Query {
|
|||
for _, v := range counters {
|
||||
items := strings.SplitN(v, "/", 2)
|
||||
|
||||
if len(items) != 2 {
|
||||
var metric, tag string
|
||||
if len(items) == 2 {
|
||||
metric, tag = items[0], items[1]
|
||||
} else if len(items) == 1 && len(items[0]) > 0 {
|
||||
metric = items[0]
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
|
||||
tagMap := dataobj.DictedTagstring(items[1])
|
||||
tagMap := dataobj.DictedTagstring(tag)
|
||||
|
||||
q2 := []idx.Query{}
|
||||
q2 = append(q2, idx.NewTermQuery([]byte(METRIC_NAME), []byte(items[0])))
|
||||
q2 = append(q2, idx.NewTermQuery([]byte(METRIC_NAME), []byte(metric)))
|
||||
|
||||
for k, v := range tagMap {
|
||||
q2 = append(q2, idx.NewTermQuery([]byte(k), []byte(v)))
|
||||
|
@ -183,6 +194,8 @@ func (cfg M3dbSection) queryIndexByCludeOptions(input dataobj.CludeRecv) (index.
|
|||
|
||||
if len(q) == 0 {
|
||||
query = index.Query{idx.NewAllQuery()}
|
||||
} else if len(q) == 1 {
|
||||
query = index.Query{q[0]}
|
||||
} else {
|
||||
query = index.Query{idx.NewConjunctionQuery(q...)}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue