* support openID2.0

* generate UUID if it's not set

* add m3db support

* add test shell

* update transfer.yml

* remove klog

* use remote m3 repo

* remove some file
This commit is contained in:
yubo 2020-11-09 19:52:44 +08:00 committed by GitHub
parent 6d02d8876a
commit 2d1a2fd187
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1765 additions and 83 deletions

View File

@ -92,5 +92,3 @@ wechat:
corp_id: "xxxxxxxxxxxxx" corp_id: "xxxxxxxxxxxxx"
agent_id: 1000000 agent_id: 1000000
secret: "xxxxxxxxxxxxxxxxx" secret: "xxxxxxxxxxxxxxxxx"
captcha: false

View File

@ -1,5 +1,24 @@
backend: backend:
datasource: "tsdb" datasource: "tsdb"
m3db:
enabled: false
name: "m3db"
namespace: "test"
writeConsistencyLevel: "majority"
readConsistencyLevel: "unstrict_majority"
config:
service:
env: default_env
zone: embedded
service: m3db
etcdClusters:
- zone: embedded
endpoints:
- 127.0.0.1:2379
tls:
caCrtPath: /opt/run/etcd/certs/root/ca.pem
crtPath: /opt/run/etcd/certs/output/etcd-client.pem
keyPath: /opt/run/etcd/certs/output/etcd-client-key.pem
tsdb: tsdb:
enabled: true enabled: true
name: "tsdb" name: "tsdb"
@ -19,7 +38,6 @@ backend:
enabled: false enabled: false
brokersPeers: "192.168.1.1:9092,192.168.1.2:9092" brokersPeers: "192.168.1.1:9092,192.168.1.2:9092"
topic: "n9e" topic: "n9e"
logger: logger:
dir: logs/transfer dir: logs/transfer
level: INFO level: INFO

51
go.mod
View File

@ -4,8 +4,6 @@ go 1.12
require ( require (
github.com/Shopify/sarama v1.19.0 github.com/Shopify/sarama v1.19.0
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
github.com/caio/go-tdigest v3.1.0+incompatible
github.com/cespare/xxhash v1.1.0 github.com/cespare/xxhash v1.1.0
github.com/codegangsta/negroni v1.0.0 github.com/codegangsta/negroni v1.0.0
github.com/coreos/go-oidc v2.2.1+incompatible github.com/coreos/go-oidc v2.2.1+incompatible
@ -14,25 +12,22 @@ require (
github.com/garyburd/redigo v1.6.2 github.com/garyburd/redigo v1.6.2
github.com/gin-contrib/pprof v1.3.0 github.com/gin-contrib/pprof v1.3.0
github.com/gin-gonic/gin v1.6.3 github.com/gin-gonic/gin v1.6.3
github.com/go-ole/go-ole v1.2.4 // indirect
github.com/go-sql-driver/mysql v1.5.0 github.com/go-sql-driver/mysql v1.5.0
github.com/google/go-cmp v0.5.1 // indirect github.com/google/uuid v1.1.2-0.20190416172445-c2e93f3ae59f
github.com/google/uuid v1.1.2 github.com/gorilla/mux v1.7.3
github.com/gorilla/mux v1.6.2 github.com/hashicorp/golang-lru v0.5.4
github.com/hashicorp/golang-lru v0.5.1
github.com/hpcloud/tail v1.0.0 github.com/hpcloud/tail v1.0.0
github.com/influxdata/influxdb v1.8.0 github.com/influxdata/influxdb v1.8.0
github.com/m3db/m3 v0.15.17
github.com/mattn/go-isatty v0.0.12 github.com/mattn/go-isatty v0.0.12
github.com/mattn/go-sqlite3 v1.14.0 // indirect github.com/mattn/go-sqlite3 v1.14.0 // indirect
github.com/mojocn/base64Captcha v1.3.1 github.com/mojocn/base64Captcha v1.3.1
github.com/onsi/ginkgo v1.7.0 // indirect
github.com/onsi/gomega v1.4.3 // indirect
github.com/open-falcon/rrdlite v0.0.0-20200214140804-bf5829f786ad github.com/open-falcon/rrdlite v0.0.0-20200214140804-bf5829f786ad
github.com/pquerna/cachecontrol v0.0.0-20200819021114-67c6ae64274f // indirect github.com/pquerna/cachecontrol v0.0.0-20200819021114-67c6ae64274f // indirect
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 // indirect github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 // indirect
github.com/robfig/go-cache v0.0.0-20130306151617-9fc39e0dbf62 // indirect github.com/robfig/go-cache v0.0.0-20130306151617-9fc39e0dbf62 // indirect
github.com/shirou/gopsutil v2.20.7+incompatible github.com/shirou/gopsutil v2.20.7+incompatible
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 github.com/spaolacci/murmur3 v1.1.0
github.com/spf13/viper v1.7.1 github.com/spf13/viper v1.7.1
github.com/streadway/amqp v1.0.0 github.com/streadway/amqp v1.0.0
github.com/stretchr/testify v1.6.1 github.com/stretchr/testify v1.6.1
@ -41,15 +36,43 @@ require (
github.com/unrolled/render v1.0.3 github.com/unrolled/render v1.0.3
go.uber.org/automaxprocs v1.3.0 // indirect go.uber.org/automaxprocs v1.3.0 // indirect
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de // indirect golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de // indirect
golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc // indirect
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
golang.org/x/sys v0.0.0-20200812155832-6a926be9bd1d // indirect
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
gopkg.in/ldap.v3 v3.1.0 gopkg.in/ldap.v3 v3.1.0
gopkg.in/square/go-jose.v2 v2.5.1 // indirect gopkg.in/square/go-jose.v2 v2.5.1 // indirect
gopkg.in/yaml.v2 v2.3.0 // indirect gopkg.in/yaml.v2 v2.3.0
k8s.io/apimachinery v0.0.0-20190817020851-f2f3a405f61d
xorm.io/core v0.7.3 xorm.io/core v0.7.3
xorm.io/xorm v0.8.1 xorm.io/xorm v0.8.1
) )
replace github.com/satori/go.uuid => github.com/satori/go.uuid v1.2.0
// branch 0.9.3-pool-read-binary-3
replace github.com/apache/thrift => github.com/m3db/thrift v0.0.0-20190820191926-05b5a2227fe4
// NB(nate): upgrading to the latest msgpack is not backwards compatibile as msgpack will no longer attempt to automatically
// write an integer into the smallest number of bytes it will fit in. We rely on this behavior by having helper methods
// in at least two encoders (see below) take int64s and expect that msgpack will size them down accordingly. We'll have
// to make integer sizing explicit before attempting to upgrade.
//
// Encoders:
// src/metrics/encoding/msgpack/base_encoder.go
// src/dbnode/persist/fs/msgpack/encoder.go
replace gopkg.in/vmihailenco/msgpack.v2 => github.com/vmihailenco/msgpack v2.8.3+incompatible
replace github.com/stretchr/testify => github.com/stretchr/testify v1.1.4-0.20160305165446-6fe211e49392
replace github.com/prometheus/common => github.com/prometheus/common v0.9.1
// Fix legacy import path - https://github.com/uber-go/atomic/pull/60
replace github.com/uber-go/atomic => github.com/uber-go/atomic v1.4.0
// Pull in https://github.com/etcd-io/bbolt/pull/220, required for go 1.14 compatibility
//
// etcd 3.14.13 depends on v1.3.3, but everything before v1.3.5 has unsafe misuses, and fails hard on go 1.14
// TODO: remove after etcd pulls in the change to a new release on 3.4 branch
replace go.etcd.io/bbolt => go.etcd.io/bbolt v1.3.5
// https://github.com/ory/dockertest/issues/212
replace golang.org/x/sys => golang.org/x/sys v0.0.0-20200826173525-f9321e4c35a6

725
go.sum

File diff suppressed because it is too large Load Diff

View File

@ -6,7 +6,7 @@ type QueryData struct {
ConsolFunc string `json:"consolFunc"` ConsolFunc string `json:"consolFunc"`
Endpoints []string `json:"endpoints"` Endpoints []string `json:"endpoints"`
Nids []string `json:"nids"` Nids []string `json:"nids"`
Counters []string `json:"counters"` Counters []string `json:"counters" description:"metric/tags"`
Step int `json:"step"` Step int `json:"step"`
DsType string `json:"dstype"` DsType string `json:"dstype"`
} }

View File

@ -506,3 +506,11 @@ func captchaGet(c *gin.Context) {
renderData(c, ret, err) renderData(c, ret, err)
} }
func authSettings(c *gin.Context) {
renderData(c, struct {
Sso bool `json:"sso"`
}{
Sso: config.Config.SSO.Enable,
}, nil)
}

View File

@ -1,7 +1,10 @@
package backend package backend
import ( import (
"log"
"github.com/didi/nightingale/src/modules/transfer/backend/influxdb" "github.com/didi/nightingale/src/modules/transfer/backend/influxdb"
"github.com/didi/nightingale/src/modules/transfer/backend/m3db"
"github.com/didi/nightingale/src/modules/transfer/backend/tsdb" "github.com/didi/nightingale/src/modules/transfer/backend/tsdb"
) )
@ -10,6 +13,7 @@ type BackendSection struct {
StraPath string `yaml:"straPath"` StraPath string `yaml:"straPath"`
Judge JudgeSection `yaml:"judge"` Judge JudgeSection `yaml:"judge"`
M3db m3db.M3dbSection `yaml:"m3db"`
Tsdb tsdb.TsdbSection `yaml:"tsdb"` Tsdb tsdb.TsdbSection `yaml:"tsdb"`
Influxdb influxdb.InfluxdbSection `yaml:"influxdb"` Influxdb influxdb.InfluxdbSection `yaml:"influxdb"`
OpenTsdb OpenTsdbSection `yaml:"opentsdb"` OpenTsdb OpenTsdbSection `yaml:"opentsdb"`
@ -23,6 +27,7 @@ var (
openTSDBPushEndpoint *OpenTsdbPushEndpoint openTSDBPushEndpoint *OpenTsdbPushEndpoint
influxdbDataSource *influxdb.InfluxdbDataSource influxdbDataSource *influxdb.InfluxdbDataSource
kafkaPushEndpoint *KafkaPushEndpoint kafkaPushEndpoint *KafkaPushEndpoint
m3dbDataSource *m3db.Client
) )
func Init(cfg BackendSection) { func Init(cfg BackendSection) {
@ -73,4 +78,13 @@ func Init(cfg BackendSection) {
// register // register
RegisterPushEndpoint(kafkaPushEndpoint.Section.Name, kafkaPushEndpoint) RegisterPushEndpoint(kafkaPushEndpoint.Section.Name, kafkaPushEndpoint)
} }
// init m3db
if cfg.M3db.Enabled {
var err error
m3dbDataSource, err = m3db.NewClient(cfg.M3db.Namespace, &cfg.M3db.Config)
if err != nil {
log.Fatalf("unable to new m3db client: %v", err)
}
RegisterDataSource(cfg.M3db.Name, m3dbDataSource)
}
} }

View File

@ -0,0 +1,108 @@
package m3db
import (
"github.com/didi/nightingale/src/common/dataobj"
"github.com/didi/nightingale/src/toolkits/str"
"github.com/m3db/m3/src/query/storage/m3/consolidators"
"github.com/m3db/m3/src/x/ident"
"github.com/toolkits/pkg/logger"
)
func mvID(in *dataobj.MetricValue) ident.ID {
if in.Nid != "" {
in.Endpoint = dataobj.NidToEndpoint(in.Nid)
}
return ident.StringID(str.MD5(in.Endpoint, in.Metric, str.SortedTags(in.TagsMap)))
}
func mvTags(item *dataobj.MetricValue) ident.Tags {
tags := ident.NewTags()
for k, v := range item.TagsMap {
tags.Append(ident.Tag{
Name: ident.StringID(k),
Value: ident.StringID(v),
})
}
if item.Nid != "" {
tags.Append(ident.Tag{
Name: ident.StringID(NID_NAME),
Value: ident.StringID(item.Nid),
})
}
if item.Endpoint != "" {
tags.Append(ident.Tag{
Name: ident.StringID(ENDPOINT_NAME),
Value: ident.StringID(item.Endpoint),
})
}
tags.Append(ident.Tag{
Name: ident.StringID(METRIC_NAME),
Value: ident.StringID(item.Metric),
})
return tags
}
func tagsMr(tags *consolidators.CompleteTagsResult) *dataobj.MetricResp {
for _, tag := range tags.CompletedTags {
if name := string(tag.Name); name == METRIC_NAME {
metrics := make([]string, len(tag.Values))
for i, v := range tag.Values {
metrics[i] = string(v)
}
return &dataobj.MetricResp{Metrics: metrics}
}
}
return nil
}
func tagsIndexTagkvResp(tags *consolidators.CompleteTagsResult) *dataobj.IndexTagkvResp {
ret := &dataobj.IndexTagkvResp{}
for _, tag := range tags.CompletedTags {
name := string(tag.Name)
switch name {
case METRIC_NAME:
ret.Metric = string(tag.Values[0])
case ENDPOINT_NAME:
ret.Endpoints = make([]string, len(tag.Values))
for i, v := range tag.Values {
ret.Endpoints[i] = string(v)
}
default:
kv := &dataobj.TagPair{Key: string(tag.Name)}
kv.Values = make([]string, len(tag.Values))
for i, v := range tag.Values {
kv.Values[i] = string(v)
}
ret.Tagkv = append(ret.Tagkv, kv)
}
}
return ret
}
func xcludeResp(iter ident.TagIterator) (ret dataobj.XcludeResp) {
tags := map[string]string{}
for iter.Next() {
tag := iter.Current()
switch key := tag.Name.String(); key {
case METRIC_NAME:
ret.Metric = tag.Value.String()
case ENDPOINT_NAME, NID_NAME:
ret.Endpoint = tag.Value.String()
default:
tags[key] = tag.Value.String()
}
}
ret.Tags = append(ret.Tags, dataobj.SortedTags(tags))
if err := iter.Err(); err != nil {
logger.Errorf("FetchTaggedIDs iter:", err)
}
return ret
}

View File

@ -0,0 +1,445 @@
package m3db
import (
"fmt"
"log"
"sync"
"time"
"github.com/didi/nightingale/src/common/dataobj"
"github.com/didi/nightingale/src/toolkits/stats"
"github.com/m3db/m3/src/dbnode/client"
"github.com/m3db/m3/src/dbnode/encoding"
"github.com/m3db/m3/src/dbnode/storage/index"
"github.com/m3db/m3/src/m3ninx/idx"
"github.com/m3db/m3/src/query/block"
"github.com/m3db/m3/src/query/storage/m3/consolidators"
"github.com/m3db/m3/src/x/ident"
xtime "github.com/m3db/m3/src/x/time"
"github.com/toolkits/pkg/logger"
)
const (
NID_NAME = "__nid__"
ENDPOINT_NAME = "__endpoint__"
METRIC_NAME = "__name__"
SERIES_LIMIT = 1000
DOCS_LIMIT = 100
)
type M3dbSection struct {
Name string `yaml:"name"`
Enabled bool `yaml:"enabled"`
Namespace string `yaml:"namespace"`
Config client.Configuration `yaml:",inline"`
}
type Client struct {
sync.RWMutex
client client.Client
active client.Session
opts client.Options
namespace string
config *client.Configuration
namespaceID ident.ID
}
func NewClient(namespace string, cfg *client.Configuration) (*Client, error) {
client, err := cfg.NewClient(client.ConfigurationParameters{})
if err != nil {
return nil, fmt.Errorf("unable to get new M3DB client: %v", err)
}
ret := &Client{
namespace: namespace,
config: cfg,
client: client,
namespaceID: ident.StringID(namespace),
}
if _, err := ret.session(); err != nil {
return nil, fmt.Errorf("unable to get new M3DB session: %v", err)
}
return ret, nil
}
// Push2Queue: push Metrics with values into m3.dbnode
func (p *Client) Push2Queue(items []*dataobj.MetricValue) {
session, err := p.session()
if err != nil {
logger.Errorf("unable to get m3db session: %s", err)
return
}
errCnt := 0
for _, item := range items {
if err := session.WriteTagged(
p.namespaceID,
mvID(item),
ident.NewTagsIterator(mvTags(item)),
time.Unix(item.Timestamp, 0),
item.Value,
xtime.Second,
nil,
); err != nil {
logger.Errorf("unable to writeTagged: %s", err)
errCnt++
}
}
stats.Counter.Set("m3db.queue.err", errCnt)
}
// QueryData: || (|| endpoints...) (&& tags...)
func (p *Client) QueryData(inputs []dataobj.QueryData) []*dataobj.TsdbQueryResponse {
logger.Debugf("query data, inputs: %+v", inputs)
session, err := p.session()
if err != nil {
logger.Errorf("unable to get m3db session: %s", err)
return nil
}
if len(inputs) == 0 {
return nil
}
query, opts := queryDataOptions(inputs)
ret, err := fetchTagged(session, p.namespaceID, query, opts)
if err != nil {
logger.Errorf("unable to query data: ", err)
return nil
}
return ret
}
// QueryDataForUi: && (metric) (|| endpoints...) (&& tags...)
// get kv
func (p *Client) QueryDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse {
logger.Debugf("query data for ui, input: %+v", input)
session, err := p.session()
if err != nil {
logger.Errorf("unable to get m3db session: %s", err)
return nil
}
query, opts := queryDataUIOptions(input)
ret, err := fetchTagged(session, p.namespaceID, query, opts)
if err != nil {
logger.Errorf("unable to query data for ui: %s", err)
return nil
}
// TODO: groupKey, aggrFunc, consolFunc, Comparisons
return ret
}
// QueryMetrics: || (&& (endpoint)) (counter)...
// return all the values that tag == __name__
func (p *Client) QueryMetrics(input dataobj.EndpointsRecv) *dataobj.MetricResp {
session, err := p.session()
if err != nil {
logger.Errorf("unable to get m3db session: %s", err)
return nil
}
query, opts := queryMetricsOptions(input)
tags, err := completeTags(session, p.namespaceID, query, opts)
if err != nil {
logger.Errorf("unable completeTags: ", err)
return nil
}
return tagsMr(tags)
}
// QueryTagPairs: && (|| endpoints...) (|| metrics...)
// return all the tags that matches
func (p *Client) QueryTagPairs(input dataobj.EndpointMetricRecv) []dataobj.IndexTagkvResp {
session, err := p.session()
if err != nil {
logger.Errorf("unable to get m3db session: %s", err)
return nil
}
query, opts := queryTagPairsOptions(input)
tags, err := completeTags(session, p.namespaceID, query, opts)
if err != nil {
logger.Errorf("unable completeTags: ", err)
return nil
}
return []dataobj.IndexTagkvResp{*tagsIndexTagkvResp(tags)}
}
// QueryIndexByClude: || (&& (|| endpoints...) (metric) (|| include...) (&& exclude..))
// return all the tags that matches
func (p *Client) QueryIndexByClude(inputs []dataobj.CludeRecv) (ret []dataobj.XcludeResp) {
session, err := p.session()
if err != nil {
logger.Errorf("unable to get m3db session: %s", err)
return nil
}
for _, input := range inputs {
ret = append(ret, p.queryIndexByClude(session, input)...)
}
return
}
func (p *Client) queryIndexByClude(session client.Session, input dataobj.CludeRecv) []dataobj.XcludeResp {
query, opts := queryIndexByCludeOptions(input)
iter, _, err := session.FetchTaggedIDs(p.namespaceID, query, opts)
if err != nil {
logger.Errorf("unable FetchTaggedIDs: ", err)
return nil
}
// group by endpoint-metric
respMap := make(map[string]dataobj.XcludeResp)
for iter.Next() {
_, _, tagIter := iter.Current()
resp := xcludeResp(tagIter)
key := fmt.Sprintf("%s-%s", resp.Endpoint, resp.Metric)
if v, ok := respMap[key]; ok {
if len(resp.Tags) > 0 {
v.Tags = append(v.Tags, resp.Tags[0])
}
} else {
respMap[key] = resp
}
}
if err := iter.Err(); err != nil {
logger.Errorf("FetchTaggedIDs iter:", err)
return nil
}
resp := make([]dataobj.XcludeResp, 0, len(respMap))
for _, v := range respMap {
resp = append(resp, v)
}
return resp
}
// QueryIndexByFullTags: && (|| endpoints...) (metric) (&& Tagkv...)
// return all the tags that matches
func (p *Client) QueryIndexByFullTags(inputs []dataobj.IndexByFullTagsRecv) []dataobj.IndexByFullTagsResp {
session, err := p.session()
if err != nil {
logger.Errorf("unable to get m3db session: %s", err)
return nil
}
ret := make([]dataobj.IndexByFullTagsResp, len(inputs))
for i, input := range inputs {
ret[i] = p.queryIndexByFullTags(session, input)
}
return ret
}
func (p *Client) queryIndexByFullTags(session client.Session, input dataobj.IndexByFullTagsRecv) (ret dataobj.IndexByFullTagsResp) {
log.Printf("entering queryIndexByFullTags")
ret = dataobj.IndexByFullTagsResp{
Metric: input.Metric,
Tags: []string{},
Step: 10,
DsType: "GAUGE",
}
query, opts := queryIndexByFullTagsOptions(input)
if query.Query.Equal(idx.NewAllQuery()) {
ret.Endpoints = input.Endpoints
log.Printf("all query")
return
}
iter, _, err := session.FetchTaggedIDs(p.namespaceID, query, opts)
if err != nil {
logger.Errorf("unable FetchTaggedIDs: ", err)
return
}
ret.Endpoints = input.Endpoints
for iter.Next() {
log.Printf("iter.next() ")
_, _, tagIter := iter.Current()
resp := xcludeResp(tagIter)
if len(resp.Tags) > 0 {
ret.Tags = append(ret.Tags, resp.Tags[0])
}
}
if err := iter.Err(); err != nil {
logger.Errorf("FetchTaggedIDs iter:", err)
}
return ret
}
// GetInstance: && (metric) (endpoint) (&& tags...)
// return: backend list which store the series
func (p *Client) GetInstance(metric, endpoint string, tags map[string]string) []string {
session, err := p.session()
if err != nil {
logger.Errorf("unable to get m3db session: %s", err)
return nil
}
adminSession, ok := session.(client.AdminSession)
if !ok {
logger.Errorf("unable to get an admin session")
return nil
}
tm, err := adminSession.TopologyMap()
if err != nil {
logger.Errorf("unable to get topologyMap with admin seesion")
return nil
}
hosts := []string{}
for _, host := range tm.Hosts() {
hosts = append(hosts, host.Address())
}
return hosts
}
func (s *Client) session() (client.Session, error) {
s.RLock()
session := s.active
s.RUnlock()
if session != nil {
return session, nil
}
s.Lock()
if s.active != nil {
session := s.active
s.Unlock()
return session, nil
}
session, err := s.client.DefaultSession()
if err != nil {
s.Unlock()
return nil, err
}
s.active = session
s.Unlock()
return session, nil
}
func (s *Client) Close() error {
var err error
s.Lock()
if s.active != nil {
err = s.active.Close()
}
s.Unlock()
return err
}
func fetchTagged(
session client.Session,
namespace ident.ID,
q index.Query,
opts index.QueryOptions,
) ([]*dataobj.TsdbQueryResponse, error) {
seriesIters, _, err := session.FetchTagged(namespace, q, opts)
if err != nil {
return nil, err
}
ret := []*dataobj.TsdbQueryResponse{}
for _, seriesIter := range seriesIters.Iters() {
v, err := seriesIterWalk(seriesIter)
if err != nil {
return nil, err
}
ret = append(ret, v)
}
return ret, nil
}
func completeTags(
session client.Session,
namespace ident.ID,
query index.Query,
opts index.AggregationOptions,
) (*consolidators.CompleteTagsResult, error) {
aggTagIter, metadata, err := session.Aggregate(namespace, query, opts)
if err != nil {
return nil, err
}
completedTags := make([]consolidators.CompletedTag, 0, aggTagIter.Remaining())
for aggTagIter.Next() {
name, values := aggTagIter.Current()
tagValues := make([][]byte, 0, values.Remaining())
for values.Next() {
tagValues = append(tagValues, values.Current().Bytes())
}
if err := values.Err(); err != nil {
return nil, err
}
completedTags = append(completedTags, consolidators.CompletedTag{
Name: name.Bytes(),
Values: tagValues,
})
}
if err := aggTagIter.Err(); err != nil {
return nil, err
}
blockMeta := block.NewResultMetadata()
blockMeta.Exhaustive = metadata.Exhaustive
return &consolidators.CompleteTagsResult{
CompleteNameOnly: opts.Type == index.AggregateTagNames,
CompletedTags: completedTags,
Metadata: blockMeta,
}, nil
}
func seriesIterWalk(iter encoding.SeriesIterator) (out *dataobj.TsdbQueryResponse, err error) {
values := []*dataobj.RRDData{}
for iter.Next() {
dp, _, _ := iter.Current()
logger.Printf("%s: %v", dp.Timestamp.String(), dp.Value)
values = append(values, dataobj.NewRRDData(dp.Timestamp.Unix(), dp.Value))
}
if err := iter.Err(); err != nil {
return nil, err
}
tagsIter := iter.Tags()
tags := map[string]string{}
for tagsIter.Next() {
tag := tagsIter.Current()
tags[tag.Name.String()] = tag.Value.String()
}
metric := tags[METRIC_NAME]
endpoint := tags[ENDPOINT_NAME]
counter, err := dataobj.GetCounter(metric, "", tags)
return &dataobj.TsdbQueryResponse{
Start: iter.Start().Unix(),
End: iter.End().Unix(),
Endpoint: endpoint,
Counter: counter,
Values: values,
}, nil
}

View File

@ -0,0 +1,285 @@
package m3db
import (
"strings"
"time"
"github.com/didi/nightingale/src/common/dataobj"
"github.com/m3db/m3/src/dbnode/storage/index"
"github.com/m3db/m3/src/m3ninx/idx"
)
// QueryData
func queryDataOptions(inputs []dataobj.QueryData) (index.Query, index.QueryOptions) {
q := []idx.Query{}
for _, input := range inputs {
q1 := endpointsQuery(input.Nids, input.Endpoints)
q2 := counterQuery(input.Counters)
q = append(q, idx.NewConjunctionQuery(q1, q2))
}
return index.Query{idx.NewDisjunctionQuery(q...)},
index.QueryOptions{
StartInclusive: time.Unix(inputs[0].Start, 0),
EndExclusive: time.Unix(inputs[0].End, 0),
DocsLimit: DOCS_LIMIT,
SeriesLimit: SERIES_LIMIT,
}
}
// QueryDataForUI
// metric && (endpoints[0] || endporint[1] ...) && (tags[0] || tags[1] ...)
func queryDataUIOptions(input dataobj.QueryDataForUI) (index.Query, index.QueryOptions) {
q1 := idx.NewTermQuery([]byte(METRIC_NAME), []byte(input.Metric))
q2 := endpointsQuery(input.Nids, input.Endpoints)
q3 := metricTagsQuery(input.Tags)
return index.Query{idx.NewConjunctionQuery(q1, q2, q3)},
index.QueryOptions{
StartInclusive: time.Unix(input.Start, 0),
EndExclusive: time.Unix(input.End, 0),
SeriesLimit: SERIES_LIMIT,
DocsLimit: DOCS_LIMIT,
}
}
func metricsQuery(metrics []string) idx.Query {
q := []idx.Query{}
for _, v := range metrics {
q = append(q, idx.NewTermQuery([]byte(METRIC_NAME), []byte(v)))
}
return idx.NewDisjunctionQuery(q...)
}
func metricQuery(metric string) idx.Query {
return idx.NewTermQuery([]byte(METRIC_NAME), []byte(metric))
}
func endpointsQuery(nids, endpoints []string) idx.Query {
if len(nids) > 0 {
q := []idx.Query{}
for _, v := range nids {
q = append(q, idx.NewTermQuery([]byte(NID_NAME), []byte(v)))
}
return idx.NewDisjunctionQuery(q...)
}
if len(endpoints) > 0 {
q := []idx.Query{}
for _, v := range endpoints {
q = append(q, idx.NewTermQuery([]byte(ENDPOINT_NAME), []byte(v)))
}
return idx.NewDisjunctionQuery(q...)
}
return idx.NewAllQuery()
}
func counterQuery(counters []string) idx.Query {
q := []idx.Query{}
for _, v := range counters {
items := strings.SplitN(v, "/", 2)
if len(items) != 2 {
continue
}
tagMap := dataobj.DictedTagstring(items[1])
q2 := []idx.Query{}
q2 = append(q2, idx.NewTermQuery([]byte(METRIC_NAME), []byte(items[0])))
for k, v := range tagMap {
q2 = append(q2, idx.NewTermQuery([]byte(k), []byte(v)))
}
q = append(q, idx.NewConjunctionQuery(q2...))
}
if len(q) > 0 {
return idx.NewDisjunctionQuery(q...)
}
return idx.NewAllQuery()
}
// (tags[0] || tags[2] || ...)
func metricTagsQuery(tags []string) idx.Query {
if len(tags) == 0 {
return idx.NewAllQuery()
}
q := []idx.Query{}
for _, v := range tags {
q1 := []idx.Query{}
tagMap := dataobj.DictedTagstring(v)
for k, v := range tagMap {
q1 = append(q1, idx.NewTermQuery([]byte(k), []byte(v)))
}
q = append(q, idx.NewConjunctionQuery(q1...))
}
return idx.NewDisjunctionQuery(q...)
}
// QueryMetrics
// (endpoint[0] || endpoint[1] ... )
func queryMetricsOptions(input dataobj.EndpointsRecv) (index.Query, index.AggregationOptions) {
nameByte := []byte(METRIC_NAME)
return index.Query{idx.NewConjunctionQuery(
endpointsQuery(nil, input.Endpoints),
idx.NewFieldQuery(nameByte),
)},
index.AggregationOptions{
QueryOptions: index.QueryOptions{
StartInclusive: time.Time{},
EndExclusive: time.Now(),
SeriesLimit: SERIES_LIMIT,
DocsLimit: DOCS_LIMIT,
},
FieldFilter: [][]byte{nameByte},
Type: index.AggregateTagNamesAndValues,
}
}
// QueryTagPairs
// (endpoint[0] || endpoint[1]...) && (metrics[0] || metrics[1] ... )
func queryTagPairsOptions(input dataobj.EndpointMetricRecv) (index.Query, index.AggregationOptions) {
q1 := endpointsQuery(nil, input.Endpoints)
q2 := metricsQuery(input.Metrics)
return index.Query{idx.NewConjunctionQuery(q1, q2)},
index.AggregationOptions{
QueryOptions: index.QueryOptions{
StartInclusive: time.Time{},
EndExclusive: time.Now(),
SeriesLimit: SERIES_LIMIT,
DocsLimit: DOCS_LIMIT,
},
FieldFilter: index.AggregateFieldFilter(nil),
Type: index.AggregateTagNamesAndValues,
}
}
// QueryIndexByClude: || (&& (|| endpoints...) (metric) (|| include...) (&& exclude..))
func queryIndexByCludeOptions(input dataobj.CludeRecv) (index.Query, index.QueryOptions) {
query := index.Query{}
q := []idx.Query{}
if len(input.Endpoints) > 0 {
q = append(q, endpointsQuery(nil, input.Endpoints))
}
if input.Metric != "" {
q = append(q, metricQuery(input.Metric))
}
if len(input.Include) > 0 {
q = append(q, includeTagsQuery(input.Include))
}
if len(input.Exclude) > 0 {
q = append(q, excludeTagsQuery(input.Exclude))
}
if len(q) == 0 {
query = index.Query{idx.NewAllQuery()}
} else {
query = index.Query{idx.NewDisjunctionQuery(q...)}
}
return query, index.QueryOptions{
StartInclusive: time.Time{},
EndExclusive: time.Now(),
SeriesLimit: SERIES_LIMIT,
DocsLimit: DOCS_LIMIT,
}
}
// QueryIndexByFullTags: && (|| endpoints) (metric) (&& tagkv)
func queryIndexByFullTagsOptions(input dataobj.IndexByFullTagsRecv) (index.Query, index.QueryOptions) {
query := index.Query{}
q := []idx.Query{}
if len(input.Endpoints) > 0 {
q = append(q, endpointsQuery(nil, input.Endpoints))
}
if input.Metric != "" {
q = append(q, metricQuery(input.Metric))
}
if len(input.Tagkv) > 0 {
q = append(q, includeTagsQuery2(input.Tagkv))
}
if len(q) == 0 {
query = index.Query{idx.NewAllQuery()}
} else {
query = index.Query{idx.NewConjunctionQuery(q...)}
}
return query, index.QueryOptions{
StartInclusive: time.Time{},
EndExclusive: time.Now(),
SeriesLimit: SERIES_LIMIT,
DocsLimit: DOCS_LIMIT,
}
}
// && ((|| values...))...
func includeTagsQuery(in []*dataobj.TagPair) idx.Query {
q := []idx.Query{}
for _, kvs := range in {
q1 := []idx.Query{}
for _, v := range kvs.Values {
q1 = append(q1, idx.NewTermQuery([]byte(kvs.Key), []byte(v)))
}
if len(q1) > 0 {
q = append(q, idx.NewDisjunctionQuery(q1...))
}
}
if len(q) == 0 {
return idx.NewAllQuery()
}
return idx.NewConjunctionQuery(q...)
}
func includeTagsQuery2(in []dataobj.TagPair) idx.Query {
q := []idx.Query{}
for _, kvs := range in {
q1 := []idx.Query{}
for _, v := range kvs.Values {
q1 = append(q1, idx.NewTermQuery([]byte(kvs.Key), []byte(v)))
}
if len(q1) > 0 {
q = append(q, idx.NewDisjunctionQuery(q1...))
}
}
if len(q) == 0 {
return idx.NewAllQuery()
}
return idx.NewConjunctionQuery(q...)
}
// && (&& !values...)
func excludeTagsQuery(in []*dataobj.TagPair) idx.Query {
q := []idx.Query{}
for _, kvs := range in {
q1 := []idx.Query{}
for _, v := range kvs.Values {
q1 = append(q1, idx.NewNegationQuery(idx.NewTermQuery([]byte(kvs.Key), []byte(v))))
}
if len(q1) > 0 {
q = append(q, idx.NewConjunctionQuery(q1...))
}
}
if len(q) == 0 {
return idx.NewAllQuery()
}
return idx.NewConjunctionQuery(q...)
}

View File

@ -0,0 +1,35 @@
#!/bin/bash
# type MetricValue struct {
# Nid string `json:"nid"`
# Metric string `json:"metric"`
# Endpoint string `json:"endpoint"`
# Timestamp int64 `json:"timestamp"`
# Step int64 `json:"step"`
# ValueUntyped interface{} `json:"value"`
# Value float64 `json:"-"`
# CounterType string `json:"counterType"`
# Tags string `json:"tags"`
# TagsMap map[string]string `json:"tagsMap"` //保留2种格式方便后端组件使用
# Extra string `json:"extra"`
# }
curl -X POST \
http://localhost:8008/api/transfer/push \
-d '[{
"metric": "test2",
"endpoint": "m3db-dev01-yubo.py",
"timestamp": '$(date "+%s")',
"step": 60,
"value": 1.111,
"counterType": "GAUGE",
"tags": "",
"tagsMap": {
"city":"bj",
"region":"c1",
"test": "end"
},
"extra": ""
}]'

View File

@ -0,0 +1,27 @@
#!/bin/bash
# type QueryData struct {
# Start int64 `json:"start"`
# End int64 `json:"end"`
# ConsolFunc string `json:"consolFunc"`
# Endpoints []string `json:"endpoints"`
# Nids []string `json:"nids"`
# Counters []string `json:"counters"`
# Step int `json:"step"`
# DsType string `json:"dstype"`
# }
curl -X POST \
http://localhost:8008/api/transfer/data \
-d '[{
"start": '$(date -d "1 hour ago" "+%s")',
"end": '$(date "+%s")',
"consolFunc": "",
"endpoints": ["m3db-dev01-yubo.py"],
"nids": ["1"],
"counters": [],
"step": 60,
"dstype": "GAUGE"
}]' | jq .

View File

@ -0,0 +1,38 @@
#!/bin/bash
# type QueryDataForUI struct {
# Start int64 `json:"start"`
# End int64 `json:"end"`
# Metric string `json:"metric"`
# Endpoints []string `json:"endpoints"`
# Nids []string `json:"nids"`
# Tags []string `json:"tags"`
# Step int `json:"step"`
# DsType string `json:"dstype"`
# GroupKey []string `json:"groupKey"` //聚合维度
# AggrFunc string `json:"aggrFunc"` //聚合计算
# ConsolFunc string `json:"consolFunc"`
# Comparisons []int64 `json:"comparisons"` //环比多少时间
# }
curl -X POST \
http://localhost:8008/api/transfer/data/ui \
-d '[{
"start": "1",
"end": '$(data "+%s")',
"metric": "test",
"endpoints": [],
"nids": [],
"tags": [],
"step": 60,
"dstype": "",
"groupKey": [],
"aggrFunc": "",
"consolFunc": "",
"Comparisons": []
}]'

View File

@ -0,0 +1,13 @@
#!/bin/bash
# type EndpointsRecv struct {
# Endpoints []string `json:"endpoints"`
# }
curl -X POST \
http://localhost:8008/api/index/metrics \
-d '{
"endpoints": []
}'

View File

@ -0,0 +1,15 @@
#!/bin/bash
# type EndpointMetricRecv struct {
# Endpoints []string `json:"endpoints"`
# Metrics []string `json:"metrics"`
# }
curl -X POST \
http://localhost:8008/api/index/tagkv \
-d '{
"endpoints": [],
"metrics": ["test"]
}' | jq .

View File

@ -0,0 +1,19 @@
#!/bin/bash
# type CludeRecv struct {
# Endpoints []string `json:"endpoints"`
# Metric string `json:"metric"`
# Include []*TagPair `json:"include"`
# Exclude []*TagPair `json:"exclude"`
# }
curl -X POST \
http://localhost:8008/api/index/counter/clude \
-d '[{
"endpoints": [],
"metric": "test",
"include": [],
"exclude": [{"tagk":"city", "tagv": ["bjo"]}]
}]' | jq .

View File

@ -0,0 +1,18 @@
#!/bin/bash
# type IndexByFullTagsRecv struct {
# Endpoints []string `json:"endpoints"`
# Metric string `json:"metric"`
# Tagkv []TagPair `json:"tagkv"`
# }
curl -X POST \
http://localhost:8008/api/index/counter/fullmatch \
-d '[{
"endpoints": ["m3db-dev01-yubo.py"],
"metric": "test2",
"tagkv": []
}]' | jq .

View File

@ -0,0 +1,5 @@
#!/bin/bash
curl -X POST \
http://localhost:8008/api/transfer/which-tsdb -d '{}'

View File

@ -16,6 +16,7 @@ import (
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/toolkits/pkg/file" "github.com/toolkits/pkg/file"
"gopkg.in/yaml.v2"
) )
type ConfYaml struct { type ConfYaml struct {
@ -171,5 +172,15 @@ func Parse(conf string) error {
Config.Report.HTTPPort = strconv.Itoa(address.GetHTTPPort("transfer")) Config.Report.HTTPPort = strconv.Itoa(address.GetHTTPPort("transfer"))
Config.Report.RPCPort = strconv.Itoa(address.GetRPCPort("transfer")) Config.Report.RPCPort = strconv.Itoa(address.GetRPCPort("transfer"))
if Config.Backend.M3db.Enabled {
// viper.Unmarshal not compatible with yaml.Unmarshal
var b *ConfYaml
err := yaml.Unmarshal([]byte(bs), &b)
if err != nil {
return err
}
Config.Backend.M3db = b.Backend.M3db
}
return identity.Parse() return identity.Parse()
} }

View File

@ -4,15 +4,14 @@ import (
"fmt" "fmt"
"os" "os"
"github.com/toolkits/pkg/logger"
"github.com/didi/nightingale/src/modules/transfer/backend" "github.com/didi/nightingale/src/modules/transfer/backend"
"github.com/didi/nightingale/src/modules/transfer/cache" "github.com/didi/nightingale/src/modules/transfer/cache"
"github.com/didi/nightingale/src/modules/transfer/config"
"github.com/didi/nightingale/src/toolkits/http/render" "github.com/didi/nightingale/src/toolkits/http/render"
"github.com/didi/nightingale/src/toolkits/str" "github.com/didi/nightingale/src/toolkits/str"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/toolkits/pkg/errors" "github.com/toolkits/pkg/errors"
"github.com/toolkits/pkg/logger"
) )
func ping(c *gin.Context) { func ping(c *gin.Context) {
@ -51,7 +50,7 @@ func tsdbInstance(c *gin.Context) {
var input tsdbInstanceRecv var input tsdbInstanceRecv
errors.Dangerous(c.ShouldBindJSON(&input)) errors.Dangerous(c.ShouldBindJSON(&input))
dataSource, err := backend.GetDataSourceFor("tsdb") dataSource, err := backend.GetDataSourceFor(config.Config.Backend.DataSource)
if err != nil { if err != nil {
logger.Warningf("could not find datasource") logger.Warningf("could not find datasource")
render.Message(c, err) render.Message(c, err)