From 520dda70c032d61a6655907bdb16eb69bed8eaf6 Mon Sep 17 00:00:00 2001 From: xingren23 Date: Mon, 20 Jul 2020 09:57:22 +0800 Subject: [PATCH] =?UTF-8?q?refactor=20transfer=20datasources=20for=20ui/ju?= =?UTF-8?q?dge,=20implement=20tsdb(+index)=20an=E2=80=A6=20(#246)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * refactor transfer datasources for ui/judge, implement tsdb(+index) and influxdb * fix error string; fix import identidy ; refactor pushendpoint init * fix influx queryData Co-authored-by: wangzhiguo04 --- etc/judge.yml | 1 + etc/transfer.yml | 17 +- src/dataobj/metric.go | 13 + src/dataobj/query_item.go | 53 ++ src/modules/judge/backend/query/index.go | 2 +- src/modules/judge/backend/query/init.go | 1 + src/modules/judge/config/config.go | 1 + src/modules/transfer/backend/datasource.go | 84 +++ .../transfer/backend/influxdb/influxdb.go | 187 ++++++ .../transfer/backend/influxdb/model.go | 169 ++++++ .../transfer/backend/influxdb/query.go | 426 ++++++++++++++ src/modules/transfer/backend/init.go | 194 ++----- src/modules/transfer/backend/judge.go | 191 +++++++ src/modules/transfer/backend/kafka.go | 83 ++- src/modules/transfer/backend/opentsdb.go | 136 +++++ src/modules/transfer/backend/sender.go | 535 ------------------ src/modules/transfer/backend/tsdb/index.go | 57 ++ .../transfer/backend/{ => tsdb}/query.go | 267 ++++----- .../transfer/backend/{ => tsdb}/ring.go | 2 +- src/modules/transfer/backend/tsdb/tsdb.go | 212 +++++++ src/modules/transfer/config/config.go | 66 ++- src/modules/transfer/cron/queue.go | 2 +- src/modules/transfer/cron/stra.go | 2 +- .../transfer/http/routes/health_router.go | 17 +- .../transfer/http/routes/push_router.go | 29 +- .../transfer/http/routes/query_router.go | 101 +++- src/modules/transfer/http/routes/routes.go | 8 + src/modules/transfer/rpc/push.go | 29 +- src/modules/transfer/rpc/query.go | 8 +- src/modules/transfer/transfer.go | 4 + 30 files changed, 1991 insertions(+), 906 deletions(-) create mode 100644 src/modules/transfer/backend/datasource.go create mode 100644 src/modules/transfer/backend/influxdb/influxdb.go create mode 100644 src/modules/transfer/backend/influxdb/model.go create mode 100644 src/modules/transfer/backend/influxdb/query.go create mode 100644 src/modules/transfer/backend/judge.go create mode 100644 src/modules/transfer/backend/opentsdb.go delete mode 100644 src/modules/transfer/backend/sender.go create mode 100644 src/modules/transfer/backend/tsdb/index.go rename src/modules/transfer/backend/{ => tsdb}/query.go (56%) rename src/modules/transfer/backend/{ => tsdb}/ring.go (98%) create mode 100644 src/modules/transfer/backend/tsdb/tsdb.go diff --git a/etc/judge.yml b/etc/judge.yml index b16a0ff9..d52ccffe 100644 --- a/etc/judge.yml +++ b/etc/judge.yml @@ -3,6 +3,7 @@ query: connTimeout: 1000 callTimeout: 2000 indexCallTimeout: 2000 + indexMod: "index" redis: addrs: diff --git a/etc/transfer.yml b/etc/transfer.yml index 9b4a9d73..0e7da315 100644 --- a/etc/transfer.yml +++ b/etc/transfer.yml @@ -1,10 +1,10 @@ backend: - maxConns: 20000 - # in ms - # connTimeout: 1000 - # callTimeout: 3000 - cluster: - tsdb01: 127.0.0.1:5821 + datasource: "tsdb" + tsdb: + enabled: true + name: "tsdb" + cluster: + tsdb01: 127.0.0.1:5821 influxdb: enabled: false username: "influx" @@ -21,6 +21,11 @@ backend: brokersPeers: "192.168.1.1:9092,192.168.1.2:9092" topic: "n9e" +identity: + specify: "" + shell: ifconfig `route|grep '^default'|awk '{print $NF}'`|grep inet|awk '{print $2}'|awk -F ':' '{print $NF}'|head -n 1 + + logger: dir: logs/transfer level: WARNING diff --git a/src/dataobj/metric.go b/src/dataobj/metric.go index e7ddc434..4d9742ef 100644 --- a/src/dataobj/metric.go +++ b/src/dataobj/metric.go @@ -314,6 +314,19 @@ func PKWithCounter(endpoint, counter string) string { return ret.String() } +func GetCounter(metric, tag string, tagMap map[string]string) (counter string, err error) { + if tagMap == nil { + tagMap, err = SplitTagsString(tag) + if err != nil { + return + } + } + + tagStr := SortedTags(tagMap) + counter = PKWithTags(metric, tagStr) + return +} + func PKWithTags(metric, tags string) string { ret := bufferPool.Get().(*bytes.Buffer) ret.Reset() diff --git a/src/dataobj/query_item.go b/src/dataobj/query_item.go index 442a4b10..3291a414 100644 --- a/src/dataobj/query_item.go +++ b/src/dataobj/query_item.go @@ -48,3 +48,56 @@ func (req *QueryData) Key() string { func (resp *TsdbQueryResponse) Key() string { return resp.Endpoint + "/" + resp.Counter } + +type EndpointsRecv struct { + Endpoints []string `json:"endpoints"` +} + +type MetricResp struct { + Metrics []string `json:"metrics"` +} + +type EndpointMetricRecv struct { + Endpoints []string `json:"endpoints"` + Metrics []string `json:"metrics"` +} + +type IndexTagkvResp struct { + Endpoints []string `json:"endpoints"` + Metric string `json:"metric"` + Tagkv []*TagPair `json:"tagkv"` +} + +type TagPair struct { + Key string `json:"tagk"` // json 和变量不一致为了兼容前端 + Values []string `json:"tagv"` +} + +type CludeRecv struct { + Endpoints []string `json:"endpoints"` + Metric string `json:"metric"` + Include []*TagPair `json:"include"` + Exclude []*TagPair `json:"exclude"` +} + +type XcludeResp struct { + Endpoints []string `json:"endpoints"` + Metric string `json:"metric"` + Tags []string `json:"tags"` + Step int `json:"step"` + DsType string `json:"dstype"` +} + +type IndexByFullTagsRecv struct { + Endpoints []string `json:"endpoints"` + Metric string `json:"metric"` + Tagkv []TagPair `json:"tagkv"` +} + +type IndexByFullTagsResp struct { + Endpoints []string `json:"endpoints"` + Metric string `json:"metric"` + Tags []string `json:"tags"` + Step int `json:"step"` + DsType string `json:"dstype"` +} diff --git a/src/modules/judge/backend/query/index.go b/src/modules/judge/backend/query/index.go index 18b4a48d..0d6a50dd 100644 --- a/src/modules/judge/backend/query/index.go +++ b/src/modules/judge/backend/query/index.go @@ -40,7 +40,7 @@ func GetIndexLoop() { } func GetIndex() { - instances, err := report.GetAlive("index", "monapi") + instances, err := report.GetAlive(Config.IndexMod, "monapi") if err != nil { stats.Counter.Set("get.index.err", 1) logger.Warningf("get index list err:%v", err) diff --git a/src/modules/judge/backend/query/init.go b/src/modules/judge/backend/query/init.go index 3f12361a..ab6620cc 100644 --- a/src/modules/judge/backend/query/init.go +++ b/src/modules/judge/backend/query/init.go @@ -19,6 +19,7 @@ type SeriesQuerySection struct { MaxIdle int `json:"maxIdle"` // ConnTimeout int `json:"connTimeout"` // 连接超时 CallTimeout int `json:"callTimeout"` // 请求超时 + IndexMod string `json:"indexMod"` IndexPath string `json:"indexPath"` IndexCallTimeout int `json:"indexCallTimeout"` // 请求超时 } diff --git a/src/modules/judge/config/config.go b/src/modules/judge/config/config.go index bda992c9..9d8c35d7 100644 --- a/src/modules/judge/config/config.go +++ b/src/modules/judge/config/config.go @@ -49,6 +49,7 @@ func Parse(conf string) error { "connTimeout": 1000, "callTimeout": 2000, "indexCallTimeout": 2000, + "indexMod": "index", "indexPath": "/api/index/counter/clude", }) diff --git a/src/modules/transfer/backend/datasource.go b/src/modules/transfer/backend/datasource.go new file mode 100644 index 00000000..f9d94f1c --- /dev/null +++ b/src/modules/transfer/backend/datasource.go @@ -0,0 +1,84 @@ +package backend + +import ( + "fmt" + "time" + + "github.com/didi/nightingale/src/dataobj" +) + +// send +const ( + DefaultSendTaskSleepInterval = time.Millisecond * 50 //默认睡眠间隔为50ms + DefaultSendQueueMaxSize = 102400 //10.24w + MaxSendRetry = 10 +) + +var ( + MinStep int //最小上报周期,单位sec +) + +type DataSource interface { + PushEndpoint + + // query data for judge + QueryData(inputs []dataobj.QueryData) []*dataobj.TsdbQueryResponse + // query data for ui + QueryDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse + + // query metrics & tags + QueryMetrics(recv dataobj.EndpointsRecv) *dataobj.MetricResp + QueryTagPairs(recv dataobj.EndpointMetricRecv) []dataobj.IndexTagkvResp + QueryIndexByClude(recv []dataobj.CludeRecv) []dataobj.XcludeResp + QueryIndexByFullTags(recv []dataobj.IndexByFullTagsRecv) []dataobj.IndexByFullTagsResp + + // tsdb instance + GetInstance(metric, endpoint string, tags map[string]string) []string +} + +type PushEndpoint interface { + // push data + Push2Queue(items []*dataobj.MetricValue) +} + +var registryDataSources map[string]DataSource +var registryPushEndpoints map[string]PushEndpoint + +func init() { + registryDataSources = make(map[string]DataSource) + registryPushEndpoints = make(map[string]PushEndpoint) +} + +// get backend datasource +// (pluginId == "" for default datasource) +func GetDataSourceFor(pluginId string) (DataSource, error) { + if pluginId == "" { + pluginId = defaultDataSource + } + if source, exists := registryDataSources[pluginId]; exists { + return source, nil + } + return nil, fmt.Errorf("could not find datasource for plugin: %s", pluginId) +} + +// get all push endpoints +func GetPushEndpoints() ([]PushEndpoint, error) { + if len(registryPushEndpoints) > 0 { + items := make([]PushEndpoint, 0, len(registryPushEndpoints)) + for _, value := range registryPushEndpoints { + items = append(items, value) + } + return items, nil + } + return nil, fmt.Errorf("could not find any pushendpoint") +} + +func RegisterDataSource(pluginId string, datasource DataSource) { + + registryDataSources[pluginId] = datasource + registryPushEndpoints[pluginId] = datasource +} + +func RegisterPushEndpoint(pluginId string, push PushEndpoint) { + registryPushEndpoints[pluginId] = push +} diff --git a/src/modules/transfer/backend/influxdb/influxdb.go b/src/modules/transfer/backend/influxdb/influxdb.go new file mode 100644 index 00000000..cf594cc3 --- /dev/null +++ b/src/modules/transfer/backend/influxdb/influxdb.go @@ -0,0 +1,187 @@ +package influxdb + +import ( + "time" + + "github.com/didi/nightingale/src/dataobj" + "github.com/didi/nightingale/src/toolkits/stats" + "github.com/influxdata/influxdb/client/v2" + "github.com/toolkits/pkg/concurrent/semaphore" + "github.com/toolkits/pkg/container/list" + "github.com/toolkits/pkg/logger" +) + +type InfluxdbSection struct { + Enabled bool `yaml:"enabled"` + Name string `yaml:"name"` + Batch int `yaml:"batch"` + MaxRetry int `yaml:"maxRetry"` + WorkerNum int `yaml:"workerNum"` + Timeout int `yaml:"timeout"` + Address string `yaml:"address"` + Database string `yaml:"database"` + Username string `yaml:"username"` + Password string `yaml:"password"` + Precision string `yaml:"precision"` +} + +type InfluxdbDataSource struct { + // config + Section InfluxdbSection + SendQueueMaxSize int + SendTaskSleepInterval time.Duration + + // 发送缓存队列 node -> queue_of_data + InfluxdbQueue *list.SafeListLimited +} + +func (influxdb *InfluxdbDataSource) Init() { + + // init queue + if influxdb.Section.Enabled { + influxdb.InfluxdbQueue = list.NewSafeListLimited(influxdb.SendQueueMaxSize) + } + + // init task + influxdbConcurrent := influxdb.Section.WorkerNum + if influxdbConcurrent < 1 { + influxdbConcurrent = 1 + } + go influxdb.send2InfluxdbTask(influxdbConcurrent) +} + +// 将原始数据插入到influxdb缓存队列 +func (influxdb *InfluxdbDataSource) Push2Queue(items []*dataobj.MetricValue) { + errCnt := 0 + for _, item := range items { + influxdbItem := influxdb.convert2InfluxdbItem(item) + isSuccess := influxdb.InfluxdbQueue.PushFront(influxdbItem) + + if !isSuccess { + errCnt += 1 + } + } + stats.Counter.Set("influxdb.queue.err", errCnt) +} + +func (influxdb *InfluxdbDataSource) send2InfluxdbTask(concurrent int) { + batch := influxdb.Section.Batch // 一次发送,最多batch条数据 + retry := influxdb.Section.MaxRetry + addr := influxdb.Section.Address + sema := semaphore.NewSemaphore(concurrent) + + var err error + c, err := NewInfluxdbClient(influxdb.Section) + defer c.Client.Close() + + if err != nil { + logger.Errorf("init influxdb client fail: %v", err) + return + } + + for { + items := influxdb.InfluxdbQueue.PopBackBy(batch) + count := len(items) + if count == 0 { + time.Sleep(influxdb.SendTaskSleepInterval) + continue + } + + influxdbItems := make([]*dataobj.InfluxdbItem, count) + for i := 0; i < count; i++ { + influxdbItems[i] = items[i].(*dataobj.InfluxdbItem) + stats.Counter.Set("points.out.influxdb", 1) + logger.Debug("send to influxdb: ", influxdbItems[i]) + } + + // 同步Call + 有限并发 进行发送 + sema.Acquire() + go func(addr string, influxdbItems []*dataobj.InfluxdbItem, count int) { + defer sema.Release() + sendOk := false + + for i := 0; i < retry; i++ { + err = c.Send(influxdbItems) + if err == nil { + sendOk = true + break + } + logger.Warningf("send influxdb fail: %v", err) + time.Sleep(time.Millisecond * 10) + } + + if !sendOk { + stats.Counter.Set("points.out.influxdb.err", count) + logger.Errorf("send %v to influxdb %s fail: %v", influxdbItems, addr, err) + } else { + logger.Debugf("send to influxdb %s ok", addr) + } + }(addr, influxdbItems, count) + } +} + +func (influxdb *InfluxdbDataSource) convert2InfluxdbItem(d *dataobj.MetricValue) *dataobj.InfluxdbItem { + t := dataobj.InfluxdbItem{Tags: make(map[string]string), Fields: make(map[string]interface{})} + + for k, v := range d.TagsMap { + t.Tags[k] = v + } + t.Tags["endpoint"] = d.Endpoint + t.Measurement = d.Metric + t.Fields["value"] = d.Value + t.Timestamp = d.Timestamp + + return &t +} + +type InfluxClient struct { + Client client.Client + Database string + Precision string +} + +func NewInfluxdbClient(section InfluxdbSection) (*InfluxClient, error) { + c, err := client.NewHTTPClient(client.HTTPConfig{ + Addr: section.Address, + Username: section.Username, + Password: section.Password, + Timeout: time.Millisecond * time.Duration(section.Timeout), + }) + + if err != nil { + return nil, err + } + + return &InfluxClient{ + Client: c, + Database: section.Database, + Precision: section.Precision, + }, nil +} + +func (c *InfluxClient) Send(items []*dataobj.InfluxdbItem) error { + bp, err := client.NewBatchPoints(client.BatchPointsConfig{ + Database: c.Database, + Precision: c.Precision, + }) + if err != nil { + logger.Error("create batch points error: ", err) + return err + } + + for _, item := range items { + pt, err := client.NewPoint(item.Measurement, item.Tags, item.Fields, time.Unix(item.Timestamp, 0)) + if err != nil { + logger.Error("create new points error: ", err) + continue + } + bp.AddPoint(pt) + } + + return c.Client.Write(bp) +} + +func (influxdb *InfluxdbDataSource) GetInstance(metric, endpoint string, tags map[string]string) []string { + // influxdb 单实例 或 influx-proxy + return []string{influxdb.Section.Address} +} diff --git a/src/modules/transfer/backend/influxdb/model.go b/src/modules/transfer/backend/influxdb/model.go new file mode 100644 index 00000000..addeda23 --- /dev/null +++ b/src/modules/transfer/backend/influxdb/model.go @@ -0,0 +1,169 @@ +package influxdb + +import ( + "fmt" + "strings" + "time" + + "github.com/didi/nightingale/src/dataobj" + "github.com/toolkits/pkg/logger" +) + +type ShowSeries struct { + Database string + Metric string + Endpoints []string + Include []*dataobj.TagPair + Exclude []*dataobj.TagPair + Start int64 + End int64 + + RawQuery string +} + +func (query *ShowSeries) renderShow() { + query.RawQuery = fmt.Sprintf("SHOW SERIES ON \"%s\" FROM \"%s\"", query.Database, + query.Metric) +} + +func (query *ShowSeries) renderEndpoints() { + if len(query.Endpoints) > 0 { + // endpoints + endpointPart := "(" + for _, endpoint := range query.Endpoints { + endpointPart += fmt.Sprintf(" \"endpoint\"='%s' OR", endpoint) + } + endpointPart = endpointPart[:len(endpointPart)-len("OR")] + endpointPart += ")" + query.RawQuery = fmt.Sprintf("\"%s\" WHERE \"%s\"", query.RawQuery, endpointPart) + } +} + +func (query *ShowSeries) renderInclude() { + if len(query.Include) > 0 { + // include + includePart := "(" + for _, include := range query.Include { + for _, value := range include.Values { + includePart += fmt.Sprintf(" \"%s\"='%s' OR", include.Key, value) + } + } + includePart = includePart[:len(includePart)-len("OR")] + includePart += ")" + if !strings.Contains(query.RawQuery, "WHERE") { + query.RawQuery += " WHERE" + } + query.RawQuery = fmt.Sprintf(" %s AND %s", query.RawQuery, includePart) + } +} + +func (query *ShowSeries) renderExclude() { + if len(query.Exclude) > 0 { + // exclude + excludePart := "(" + for _, exclude := range query.Exclude { + for _, value := range exclude.Values { + excludePart += fmt.Sprintf(" \"%s\"='%s' OR", exclude.Key, value) + } + } + excludePart = excludePart[:len(excludePart)-len("OR")] + excludePart += ")" + if !strings.Contains(query.RawQuery, "WHERE") { + query.RawQuery += " WHERE" + } + query.RawQuery = fmt.Sprintf(" %s AND %s", query.RawQuery, excludePart) + } +} + +func (query *ShowSeries) renderTimeRange() { + // time + if strings.Contains(query.RawQuery, "WHERE") { + query.RawQuery = fmt.Sprintf("%s AND time >= %d AND time <= %d", query.RawQuery, + time.Duration(query.Start)*time.Second, + time.Duration(query.End)*time.Second) + } else { + query.RawQuery = fmt.Sprintf("%s WHERE time >= %d AND time <= %d", query.RawQuery, + time.Duration(query.Start)*time.Second, + time.Duration(query.End)*time.Second) + } +} + +type QueryData struct { + Start int64 + End int64 + Metric string + Endpoints []string + Tags []string + Step int + DsType string + GroupKey []string //聚合维度 + AggrFunc string //聚合计算 + + RawQuery string +} + +func (query *QueryData) renderSelect() { + // select + if query.AggrFunc != "" && len(query.GroupKey) > 0 { + query.RawQuery = "" + } else { + query.RawQuery = fmt.Sprintf("SELECT \"value\" FROM \"%s\"", query.Metric) + } +} + +func (query *QueryData) renderEndpoints() { + // where endpoint + if len(query.Endpoints) > 0 { + endpointPart := "(" + for _, endpoint := range query.Endpoints { + endpointPart += fmt.Sprintf(" \"endpoint\"='%s' OR", endpoint) + } + endpointPart = endpointPart[:len(endpointPart)-len("OR")] + endpointPart += ")" + query.RawQuery = fmt.Sprintf("%s WHERE %s", query.RawQuery, endpointPart) + } +} + +func (query *QueryData) renderTags() { + // where tags + if len(query.Tags) > 0 { + s := strings.Join(query.Tags, ",") + tags, err := dataobj.SplitTagsString(s) + if err != nil { + logger.Warningf("split tags error, %+v", err) + return + } + + tagPart := "(" + for tagK, tagV := range tags { + tagPart += fmt.Sprintf(" \"%s\"='%s' AND", tagK, tagV) + } + tagPart = tagPart[:len(tagPart)-len("AND")] + tagPart += ")" + + if strings.Contains(query.RawQuery, "WHERE") { + query.RawQuery = fmt.Sprintf("%s AND %s", query.RawQuery, tagPart) + } else { + query.RawQuery = fmt.Sprintf("%s WHERE %s", query.RawQuery, tagPart) + } + } +} + +func (query *QueryData) renderTimeRange() { + // time + if strings.Contains(query.RawQuery, "WHERE") { + query.RawQuery = fmt.Sprintf("%s AND time >= %d AND time <= %d", query.RawQuery, + time.Duration(query.Start)*time.Second, + time.Duration(query.End)*time.Second) + } else { + query.RawQuery = fmt.Sprintf("%s WHERE time >= %d AND time <= %d", query.RawQuery, query.Start, query.End) + } +} + +func (query *QueryData) renderGroupBy() { + // group by + if len(query.GroupKey) > 0 { + groupByPart := strings.Join(query.GroupKey, ",") + query.RawQuery = fmt.Sprintf("%s GROUP BY %s", query.RawQuery, groupByPart) + } +} diff --git a/src/modules/transfer/backend/influxdb/query.go b/src/modules/transfer/backend/influxdb/query.go new file mode 100644 index 00000000..74b06544 --- /dev/null +++ b/src/modules/transfer/backend/influxdb/query.go @@ -0,0 +1,426 @@ +package influxdb + +import ( + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/influxdata/influxdb/models" + + "github.com/didi/nightingale/src/dataobj" + "github.com/influxdata/influxdb/client/v2" + "github.com/toolkits/pkg/logger" +) + +// select value from metric where ... +func (influxdb *InfluxdbDataSource) QueryData(inputs []dataobj.QueryData) []*dataobj.TsdbQueryResponse { + logger.Debugf("query data, inputs: %+v", inputs) + + c, err := NewInfluxdbClient(influxdb.Section) + defer c.Client.Close() + + if err != nil { + logger.Errorf("init influxdb client fail: %v", err) + return nil + } + + queryResponse := make([]*dataobj.TsdbQueryResponse, 0) + for _, input := range inputs { + for _, counter := range input.Counters { + items := strings.Split(counter, "/") + metric := items[0] + var tags = make([]string, 0) + if len(items) > 1 && len(items[1]) > 0 { + tags = strings.Split(items[1], ",") + } + influxdbQuery := QueryData{ + Start: input.Start, + End: input.End, + Metric: metric, + Endpoints: input.Endpoints, + Tags: tags, + Step: input.Step, + DsType: input.DsType, + } + influxdbQuery.renderSelect() + influxdbQuery.renderEndpoints() + influxdbQuery.renderTags() + influxdbQuery.renderTimeRange() + logger.Debugf("query influxql %s", influxdbQuery.RawQuery) + + query := client.NewQuery(influxdbQuery.RawQuery, c.Database, c.Precision) + if response, err := c.Client.Query(query); err == nil && response.Error() == nil { + for _, result := range response.Results { + for _, series := range result.Series { + + // fixme : influx client get series.Tags is nil + endpoint := series.Tags["endpoint"] + delete(series.Tags, endpoint) + counter, err := dataobj.GetCounter(series.Name, "", series.Tags) + if err != nil { + logger.Warningf("get counter error: %+v", err) + continue + } + values := convertValues(series) + + resp := &dataobj.TsdbQueryResponse{ + Start: influxdbQuery.Start, + End: influxdbQuery.End, + Endpoint: endpoint, + Counter: counter, + DsType: influxdbQuery.DsType, + Step: influxdbQuery.Step, + Values: values, + } + queryResponse = append(queryResponse, resp) + } + } + } + } + } + return queryResponse +} + +// todo : 支持 comparison +// select value from metric where ... +func (influxdb *InfluxdbDataSource) QueryDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse { + + logger.Debugf("query data for ui, input: %+v", input) + + c, err := NewInfluxdbClient(influxdb.Section) + defer c.Client.Close() + + if err != nil { + logger.Errorf("init influxdb client fail: %v", err) + return nil + } + + influxdbQuery := QueryData{ + Start: input.Start, + End: input.End, + Metric: input.Metric, + Endpoints: input.Endpoints, + Tags: input.Tags, + Step: input.Step, + DsType: input.DsType, + GroupKey: input.GroupKey, + AggrFunc: input.AggrFunc, + } + influxdbQuery.renderSelect() + influxdbQuery.renderEndpoints() + influxdbQuery.renderTags() + influxdbQuery.renderTimeRange() + influxdbQuery.renderGroupBy() + logger.Debugf("query influxql %s", influxdbQuery.RawQuery) + + queryResponse := make([]*dataobj.TsdbQueryResponse, 0) + query := client.NewQuery(influxdbQuery.RawQuery, c.Database, c.Precision) + if response, err := c.Client.Query(query); err == nil && response.Error() == nil { + + for _, result := range response.Results { + for _, series := range result.Series { + + // fixme : influx client get series.Tags is nil + endpoint := series.Tags["endpoint"] + delete(series.Tags, endpoint) + counter, err := dataobj.GetCounter(series.Name, "", series.Tags) + if err != nil { + logger.Warningf("get counter error: %+v", err) + continue + } + values := convertValues(series) + + resp := &dataobj.TsdbQueryResponse{ + Start: influxdbQuery.Start, + End: influxdbQuery.End, + Endpoint: endpoint, + Counter: counter, + DsType: influxdbQuery.DsType, + Step: influxdbQuery.Step, + Values: values, + } + queryResponse = append(queryResponse, resp) + } + } + } + return queryResponse +} + +// show measurements on n9e +func (influxdb *InfluxdbDataSource) QueryMetrics(recv dataobj.EndpointsRecv) *dataobj.MetricResp { + logger.Debugf("query metric, recv: %+v", recv) + + c, err := NewInfluxdbClient(influxdb.Section) + defer c.Client.Close() + + if err != nil { + logger.Errorf("init influxdb client fail: %v", err) + return nil + } + + influxql := fmt.Sprintf("SHOW MEASUREMENTS ON \"%s\"", influxdb.Section.Database) + query := client.NewQuery(influxql, c.Database, c.Precision) + if response, err := c.Client.Query(query); err == nil && response.Error() == nil { + resp := &dataobj.MetricResp{ + Metrics: make([]string, 0), + } + for _, result := range response.Results { + for _, series := range result.Series { + for _, valuePair := range series.Values { + metric := valuePair[0].(string) + resp.Metrics = append(resp.Metrics, metric) + } + } + } + return resp + } + return nil +} + +// show tag keys / values from metric ... +func (influxdb *InfluxdbDataSource) QueryTagPairs(recv dataobj.EndpointMetricRecv) []dataobj.IndexTagkvResp { + logger.Debugf("query tag pairs, recv: %+v", recv) + + c, err := NewInfluxdbClient(influxdb.Section) + defer c.Client.Close() + + if err != nil { + logger.Errorf("init influxdb client fail: %v", err) + return nil + } + + resp := make([]dataobj.IndexTagkvResp, 0) + for _, metric := range recv.Metrics { + tagkvResp := dataobj.IndexTagkvResp{ + Endpoints: recv.Endpoints, + Metric: metric, + Tagkv: make([]*dataobj.TagPair, 0), + } + // show tag keys + keys := showTagKeys(c, metric, influxdb.Section.Database) + if len(keys) > 0 { + // show tag values + tagkvResp.Tagkv = showTagValues(c, keys, metric, influxdb.Section.Database) + } + resp = append(resp, tagkvResp) + } + + return resp +} + +// show tag keys on n9e from metric where ... +// (exclude default endpoint tag) +func showTagKeys(c *InfluxClient, metric, database string) []string { + keys := make([]string, 0) + influxql := fmt.Sprintf("SHOW TAG KEYS ON \"%s\" FROM \"%s\"", database, metric) + query := client.NewQuery(influxql, c.Database, c.Precision) + if response, err := c.Client.Query(query); err == nil && response.Error() == nil { + for _, result := range response.Results { + for _, series := range result.Series { + for _, valuePair := range series.Values { + tagKey := valuePair[0].(string) + // 去掉默认tag endpoint + if tagKey != "endpoint" { + keys = append(keys, tagKey) + } + } + } + } + } + return keys +} + +// show tag values on n9e from metric where ... +func showTagValues(c *InfluxClient, keys []string, metric, database string) []*dataobj.TagPair { + tagkv := make([]*dataobj.TagPair, 0) + influxql := fmt.Sprintf("SHOW TAG VALUES ON \"%s\" FROM \"%s\" WITH KEY in (\"%s\")", + database, + metric, strings.Join(keys, "\",\"")) + query := client.NewQuery(influxql, c.Database, c.Precision) + if response, err := c.Client.Query(query); err == nil && response.Error() == nil { + tagPairs := make(map[string]*dataobj.TagPair) + for _, result := range response.Results { + for _, series := range result.Series { + for _, valuePair := range series.Values { + tagKey := valuePair[0].(string) + tagValue := valuePair[1].(string) + if pair, exist := tagPairs[tagKey]; exist { + pair.Values = append(pair.Values, tagValue) + } else { + pair := &dataobj.TagPair{ + Key: tagKey, + Values: []string{tagValue}, + } + tagPairs[pair.Key] = pair + tagkv = append(tagkv, pair) + } + } + } + } + } + return tagkv +} + +// show series from metric where ... +func (influxdb *InfluxdbDataSource) QueryIndexByClude(recvs []dataobj.CludeRecv) []dataobj.XcludeResp { + logger.Debugf("query IndexByClude , recv: %+v", recvs) + + c, err := NewInfluxdbClient(influxdb.Section) + defer c.Client.Close() + + if err != nil { + logger.Errorf("init influxdb client fail: %v", err) + return nil + } + resp := make([]dataobj.XcludeResp, 0) + for _, recv := range recvs { + xcludeResp := dataobj.XcludeResp{ + Endpoints: recv.Endpoints, + Metric: recv.Metric, + Tags: make([]string, 0), + Step: -1, // fixme + DsType: "GAUGE", + } + + if len(recv.Endpoints) == 0 { + resp = append(resp, xcludeResp) + continue + } + + showSeries := ShowSeries{ + Database: influxdb.Section.Database, + Metric: recv.Metric, + Endpoints: recv.Endpoints, + Start: time.Now().AddDate(0, 0, -30).Unix(), + End: time.Now().Unix(), + Include: recv.Include, + Exclude: recv.Exclude, + } + showSeries.renderShow() + showSeries.renderEndpoints() + showSeries.renderInclude() + showSeries.renderExclude() + + query := client.NewQuery(showSeries.RawQuery, c.Database, c.Precision) + if response, err := c.Client.Query(query); err == nil && response.Error() == nil { + for _, result := range response.Results { + for _, series := range result.Series { + for _, valuePair := range series.Values { + + // proc.port.listen,endpoint=localhost,port=22,service=sshd + tagKey := valuePair[0].(string) + + // process + items := strings.Split(tagKey, ",") + newItems := make([]string, 0) + for _, item := range items { + if item != recv.Metric && !strings.Contains(item, "endpoint") { + newItems = append(newItems, item) + } + } + + if len(newItems) > 0 { + if tags, err := dataobj.SplitTagsString(strings.Join(newItems, ",")); err == nil { + xcludeResp.Tags = append(xcludeResp.Tags, dataobj.SortedTags(tags)) + } + } + } + } + } + } + resp = append(resp, xcludeResp) + } + + return resp +} + +// show series from metric where ... +func (influxdb *InfluxdbDataSource) QueryIndexByFullTags(recvs []dataobj.IndexByFullTagsRecv) []dataobj. + IndexByFullTagsResp { + logger.Debugf("query IndexByFullTags , recv: %+v", recvs) + + c, err := NewInfluxdbClient(influxdb.Section) + defer c.Client.Close() + + if err != nil { + logger.Errorf("init influxdb client fail: %v", err) + return nil + } + + resp := make([]dataobj.IndexByFullTagsResp, 0) + for _, recv := range recvs { + fullTagResp := dataobj.IndexByFullTagsResp{ + Endpoints: recv.Endpoints, + Metric: recv.Metric, + Tags: make([]string, 0), + Step: -1, // FIXME + DsType: "GAUGE", + } + + // 兼容夜莺逻辑,不选择endpoint则返回空 + if len(recv.Endpoints) == 0 { + resp = append(resp, fullTagResp) + continue + } + + // build influxql + influxdbShow := ShowSeries{ + Database: influxdb.Section.Database, + Metric: recv.Metric, + Endpoints: recv.Endpoints, + Start: time.Now().AddDate(0, 0, -30).Unix(), + End: time.Now().Unix(), + } + influxdbShow.renderShow() + influxdbShow.renderEndpoints() + influxdbShow.renderTimeRange() + + // do query + query := client.NewQuery(influxdbShow.RawQuery, c.Database, c.Precision) + if response, err := c.Client.Query(query); err == nil && response.Error() == nil { + for _, result := range response.Results { + for _, series := range result.Series { + for _, valuePair := range series.Values { + + // proc.port.listen,endpoint=localhost,port=22,service=sshd + tagKey := valuePair[0].(string) + + // process + items := strings.Split(tagKey, ",") + newItems := make([]string, 0) + for _, item := range items { + if item != recv.Metric && !strings.Contains(item, "endpoint") { + newItems = append(newItems, item) + } + } + + if len(newItems) > 0 { + if tags, err := dataobj.SplitTagsString(strings.Join(newItems, ",")); err == nil { + fullTagResp.Tags = append(fullTagResp.Tags, dataobj.SortedTags(tags)) + } + } + } + } + } + } + resp = append(resp, fullTagResp) + } + + return resp +} + +func convertValues(series models.Row) []*dataobj.RRDData { + + // convert values + values := make([]*dataobj.RRDData, 0, len(series.Values)) + for _, valuePair := range series.Values { + timestampNumber, _ := valuePair[0].(json.Number) + timestamp, _ := timestampNumber.Int64() + + valueNumber, _ := valuePair[1].(json.Number) + valueFloat, _ := valueNumber.Float64() + values = append(values, dataobj.NewRRDData(timestamp, valueFloat)) + } + return values +} diff --git a/src/modules/transfer/backend/init.go b/src/modules/transfer/backend/init.go index 9b4d8324..69f09d2b 100644 --- a/src/modules/transfer/backend/init.go +++ b/src/modules/transfer/backend/init.go @@ -1,166 +1,76 @@ package backend import ( - "github.com/toolkits/pkg/container/list" - "github.com/toolkits/pkg/container/set" - "github.com/toolkits/pkg/str" - - "github.com/didi/nightingale/src/modules/transfer/cache" - "github.com/didi/nightingale/src/toolkits/pools" - "github.com/didi/nightingale/src/toolkits/report" - "github.com/didi/nightingale/src/toolkits/stats" + "github.com/didi/nightingale/src/modules/transfer/backend/influxdb" + "github.com/didi/nightingale/src/modules/transfer/backend/tsdb" ) -type InfluxdbSection struct { - Enabled bool `yaml:"enabled"` - Batch int `yaml:"batch"` - MaxRetry int `yaml:"maxRetry"` - WorkerNum int `yaml:"workerNum"` - Timeout int `yaml:"timeout"` - Address string `yaml:"address"` - Database string `yaml:"database"` - Username string `yaml:"username"` - Password string `yaml:"password"` - Precision string `yaml:"precision"` -} - -type OpenTsdbSection struct { - Enabled bool `yaml:"enabled"` - Batch int `yaml:"batch"` - ConnTimeout int `yaml:"connTimeout"` - CallTimeout int `yaml:"callTimeout"` - WorkerNum int `yaml:"workerNum"` - MaxConns int `yaml:"maxConns"` - MaxIdle int `yaml:"maxIdle"` - MaxRetry int `yaml:"maxRetry"` - Address string `yaml:"address"` -} - -type KafkaSection struct { - Enabled bool `yaml:"enabled"` - Topic string `yaml:"topic"` - BrokersPeers string `yaml:"brokersPeers"` - SaslUser string `yaml:"saslUser"` - SaslPasswd string `yaml:"saslPasswd"` - Retry int `yaml:"retry"` - KeepAlive int64 `yaml:"keepAlive"` -} - type BackendSection struct { - Enabled bool `yaml:"enabled"` - Batch int `yaml:"batch"` - ConnTimeout int `yaml:"connTimeout"` - CallTimeout int `yaml:"callTimeout"` - WorkerNum int `yaml:"workerNum"` - MaxConns int `yaml:"maxConns"` - MaxIdle int `yaml:"maxIdle"` - IndexTimeout int `yaml:"indexTimeout"` - StraPath string `yaml:"straPath"` - HbsMod string `yaml:"hbsMod"` + DataSource string `yaml:"datasource"` + StraPath string `yaml:"straPath"` - Replicas int `yaml:"replicas"` - Cluster map[string]string `yaml:"cluster"` - ClusterList map[string]*ClusterNode `json:"clusterList"` - Influxdb InfluxdbSection `yaml:"influxdb"` - OpenTsdb OpenTsdbSection `yaml:"opentsdb"` - Kafka KafkaSection `yaml:"kafka"` -} - -const DefaultSendQueueMaxSize = 102400 //10.24w - -type ClusterNode struct { - Addrs []string `json:"addrs"` + Judge JudgeSection `yaml:"judge"` + Tsdb tsdb.TsdbSection `yaml:"tsdb"` + Influxdb influxdb.InfluxdbSection `yaml:"influxdb"` + OpenTsdb OpenTsdbSection `yaml:"opentsdb"` + Kafka KafkaSection `yaml:"kafka"` } var ( - Config BackendSection - // 服务节点的一致性哈希环 pk -> node - TsdbNodeRing *ConsistentHashRing - - // 发送缓存队列 node -> queue_of_data - TsdbQueues = make(map[string]*list.SafeListLimited) - JudgeQueues = cache.SafeJudgeQueue{} - InfluxdbQueue *list.SafeListLimited - OpenTsdbQueue *list.SafeListLimited - KafkaQueue = make(chan KafkaData, 10) - - // 连接池 node_address -> connection_pool - TsdbConnPools *pools.ConnPools - JudgeConnPools *pools.ConnPools - OpenTsdbConnPoolHelper *pools.OpenTsdbConnPoolHelper - - connTimeout int32 - callTimeout int32 + defaultDataSource string + StraPath string + tsdbDataSource *tsdb.TsdbDataSource + openTSDBPushEndpoint *OpenTsdbPushEndpoint + influxdbDataSource *influxdb.InfluxdbDataSource + kafkaPushEndpoint *KafkaPushEndpoint ) func Init(cfg BackendSection) { - Config = cfg - // 初始化默认参数 - connTimeout = int32(Config.ConnTimeout) - callTimeout = int32(Config.CallTimeout) + defaultDataSource = cfg.DataSource + StraPath = cfg.StraPath - initHashRing() - initConnPools() - initSendQueues() + // init judge + InitJudge(cfg.Judge) - startSendTasks() -} - -func initHashRing() { - TsdbNodeRing = NewConsistentHashRing(int32(Config.Replicas), str.KeysOfMap(Config.Cluster)) -} - -func initConnPools() { - tsdbInstances := set.NewSafeSet() - for _, item := range Config.ClusterList { - for _, addr := range item.Addrs { - tsdbInstances.Add(addr) + // init tsdb + if cfg.Tsdb.Enabled { + tsdbDataSource = &tsdb.TsdbDataSource{ + Section: cfg.Tsdb, + SendQueueMaxSize: DefaultSendQueueMaxSize, + SendTaskSleepInterval: DefaultSendTaskSleepInterval, } + tsdbDataSource.Init() // register + RegisterDataSource(tsdbDataSource.Section.Name, tsdbDataSource) } - TsdbConnPools = pools.NewConnPools( - Config.MaxConns, Config.MaxIdle, Config.ConnTimeout, Config.CallTimeout, tsdbInstances.ToSlice(), - ) - JudgeConnPools = pools.NewConnPools( - Config.MaxConns, Config.MaxIdle, Config.ConnTimeout, Config.CallTimeout, GetJudges(), - ) - if Config.OpenTsdb.Enabled { - OpenTsdbConnPoolHelper = pools.NewOpenTsdbConnPoolHelper(Config.OpenTsdb.Address, Config.OpenTsdb.MaxConns, Config.OpenTsdb.MaxIdle, Config.OpenTsdb.ConnTimeout, Config.OpenTsdb.CallTimeout) - } -} - -func initSendQueues() { - for node, item := range Config.ClusterList { - for _, addr := range item.Addrs { - TsdbQueues[node+addr] = list.NewSafeListLimited(DefaultSendQueueMaxSize) + // init influxdb + if cfg.Influxdb.Enabled { + influxdbDataSource = &influxdb.InfluxdbDataSource{ + Section: cfg.Influxdb, + SendQueueMaxSize: DefaultSendQueueMaxSize, + SendTaskSleepInterval: DefaultSendTaskSleepInterval, } - } + influxdbDataSource.Init() + // register + RegisterDataSource(influxdbDataSource.Section.Name, influxdbDataSource) - JudgeQueues = cache.NewJudgeQueue() - judges := GetJudges() - for _, judge := range judges { - JudgeQueues.Set(judge, list.NewSafeListLimited(DefaultSendQueueMaxSize)) } - - if Config.Influxdb.Enabled { - InfluxdbQueue = list.NewSafeListLimited(DefaultSendQueueMaxSize) + // init opentsdb + if cfg.OpenTsdb.Enabled { + openTSDBPushEndpoint = &OpenTsdbPushEndpoint{ + Section: cfg.OpenTsdb, + } + openTSDBPushEndpoint.Init() + // register + RegisterPushEndpoint(openTSDBPushEndpoint.Section.Name, openTSDBPushEndpoint) } - - if Config.OpenTsdb.Enabled { - OpenTsdbQueue = list.NewSafeListLimited(DefaultSendQueueMaxSize) + // init kafka + if cfg.Kafka.Enabled { + kafkaPushEndpoint = &KafkaPushEndpoint{ + Section: cfg.Kafka, + } + kafkaPushEndpoint.Init() + // register + RegisterPushEndpoint(kafkaPushEndpoint.Section.Name, kafkaPushEndpoint) } } - -func GetJudges() []string { - var judgeInstances []string - instances, err := report.GetAlive("judge", Config.HbsMod) - if err != nil { - stats.Counter.Set("judge.get.err", 1) - return judgeInstances - } - for _, instance := range instances { - judgeInstance := instance.Identity + ":" + instance.RPCPort - judgeInstances = append(judgeInstances, judgeInstance) - } - return judgeInstances -} diff --git a/src/modules/transfer/backend/judge.go b/src/modules/transfer/backend/judge.go new file mode 100644 index 00000000..50b9d9df --- /dev/null +++ b/src/modules/transfer/backend/judge.go @@ -0,0 +1,191 @@ +package backend + +import ( + "time" + + "github.com/didi/nightingale/src/dataobj" + "github.com/didi/nightingale/src/model" + "github.com/didi/nightingale/src/modules/transfer/cache" + "github.com/didi/nightingale/src/toolkits/pools" + "github.com/didi/nightingale/src/toolkits/report" + "github.com/didi/nightingale/src/toolkits/stats" + "github.com/didi/nightingale/src/toolkits/str" + "github.com/toolkits/pkg/concurrent/semaphore" + "github.com/toolkits/pkg/container/list" + "github.com/toolkits/pkg/logger" +) + +type JudgeSection struct { + Batch int `yaml:"batch"` + ConnTimeout int `yaml:"connTimeout"` + CallTimeout int `yaml:"callTimeout"` + WorkerNum int `yaml:"workerNum"` + MaxConns int `yaml:"maxConns"` + MaxIdle int `yaml:"maxIdle"` + HbsMod string `yaml:"hbsMod"` +} + +var ( + // config + Judge JudgeSection + + // 连接池 node_address -> connection_pool + JudgeConnPools *pools.ConnPools + + // queue + JudgeQueues = cache.SafeJudgeQueue{} +) + +func InitJudge(section JudgeSection) { + Judge = section + + judges := GetJudges() + + // init connPool + JudgeConnPools = pools.NewConnPools(Judge.MaxConns, Judge.MaxIdle, Judge.ConnTimeout, Judge.CallTimeout, judges) + + // init queue + JudgeQueues = cache.NewJudgeQueue() + for _, judgeNode := range judges { + JudgeQueues.Set(judgeNode, list.NewSafeListLimited(DefaultSendQueueMaxSize)) + } + + // start task + judgeConcurrent := Judge.WorkerNum + if judgeConcurrent < 1 { + judgeConcurrent = 1 + } + judgeQueue := JudgeQueues.GetAll() + for instance, queue := range judgeQueue { + go Send2JudgeTask(queue, instance, judgeConcurrent) + } + +} + +func Send2JudgeTask(Q *list.SafeListLimited, addr string, concurrent int) { + batch := Judge.Batch + sema := semaphore.NewSemaphore(concurrent) + + for { + items := Q.PopBackBy(batch) + count := len(items) + if count == 0 { + time.Sleep(DefaultSendTaskSleepInterval) + continue + } + judgeItems := make([]*dataobj.JudgeItem, count) + stats.Counter.Set("points.out.judge", count) + for i := 0; i < count; i++ { + judgeItems[i] = items[i].(*dataobj.JudgeItem) + logger.Debug("send to judge: ", judgeItems[i]) + } + + sema.Acquire() + go func(addr string, judgeItems []*dataobj.JudgeItem, count int) { + defer sema.Release() + + resp := &dataobj.SimpleRpcResponse{} + var err error + sendOk := false + for i := 0; i < MaxSendRetry; i++ { + err = JudgeConnPools.Call(addr, "Judge.Send", judgeItems, resp) + if err == nil { + sendOk = true + break + } + logger.Warningf("send judge %s fail: %v", addr, err) + time.Sleep(time.Millisecond * 10) + } + + if !sendOk { + stats.Counter.Set("points.out.err", count) + for _, item := range judgeItems { + logger.Errorf("send %v to judge %s fail: %v", item, addr, err) + } + } + + }(addr, judgeItems, count) + } +} + +func Push2JudgeQueue(items []*dataobj.MetricValue) { + errCnt := 0 + for _, item := range items { + key := str.PK(item.Metric, item.Endpoint) + stras := cache.StraMap.GetByKey(key) + + for _, stra := range stras { + if !TagMatch(stra.Tags, item.TagsMap) { + continue + } + judgeItem := &dataobj.JudgeItem{ + Endpoint: item.Endpoint, + Metric: item.Metric, + Value: item.Value, + Timestamp: item.Timestamp, + DsType: item.CounterType, + Tags: item.Tags, + TagsMap: item.TagsMap, + Step: int(item.Step), + Sid: stra.Id, + Extra: item.Extra, + } + + q, exists := JudgeQueues.Get(stra.JudgeInstance) + if exists { + if !q.PushFront(judgeItem) { + errCnt += 1 + } + } + } + } + stats.Counter.Set("judge.queue.err", errCnt) +} + +func alignTs(ts int64, period int64) int64 { + return ts - ts%period +} + +func TagMatch(straTags []model.Tag, tag map[string]string) bool { + for _, stag := range straTags { + if _, exists := tag[stag.Tkey]; !exists { + return false + } + var match bool + if stag.Topt == "=" { //当前策略 tagkey 对应的 tagv + for _, v := range stag.Tval { + if tag[stag.Tkey] == v { + match = true + break + } + } + } else { + match = true + for _, v := range stag.Tval { + if tag[stag.Tkey] == v { + match = false + return match + } + } + } + + if !match { + return false + } + } + return true +} + +func GetJudges() []string { + var judgeInstances []string + instances, err := report.GetAlive("judge", Judge.HbsMod) + if err != nil { + stats.Counter.Set("judge.get.err", 1) + return judgeInstances + } + for _, instance := range instances { + judgeInstance := instance.Identity + ":" + instance.RPCPort + judgeInstances = append(judgeInstances, judgeInstance) + } + return judgeInstances +} diff --git a/src/modules/transfer/backend/kafka.go b/src/modules/transfer/backend/kafka.go index 0bcc03f0..0092d9f1 100644 --- a/src/modules/transfer/backend/kafka.go +++ b/src/modules/transfer/backend/kafka.go @@ -4,13 +4,81 @@ import ( "encoding/json" "errors" "fmt" - "github.com/Shopify/sarama" - "github.com/toolkits/pkg/logger" "os" "strings" "time" + + "github.com/Shopify/sarama" + "github.com/didi/nightingale/src/dataobj" + "github.com/didi/nightingale/src/toolkits/stats" + "github.com/toolkits/pkg/logger" ) +type KafkaSection struct { + Enabled bool `yaml:"enabled"` + Name string `yaml:"name"` + Topic string `yaml:"topic"` + BrokersPeers string `yaml:"brokersPeers"` + ConnTimeout int `yaml:"connTimeout"` + CallTimeout int `yaml:"callTimeout"` + MaxRetry int `yaml:"maxRetry"` + KeepAlive int64 `yaml:"keepAlive"` + SaslUser string `yaml:"saslUser"` + SaslPasswd string `yaml:"saslPasswd"` +} +type KafkaPushEndpoint struct { + // config + Section KafkaSection + + // 发送缓存队列 node -> queue_of_data + KafkaQueue chan KafkaData +} + +func (kafka *KafkaPushEndpoint) Init() { + + // init queue + kafka.KafkaQueue = make(chan KafkaData, 10) + + // start task + go kafka.send2KafkaTask() +} + +func (kafka *KafkaPushEndpoint) Push2Queue(items []*dataobj.MetricValue) { + for _, item := range items { + kafka.KafkaQueue <- kafka.convert2KafkaItem(item) + } +} + +func (kafka *KafkaPushEndpoint) convert2KafkaItem(d *dataobj.MetricValue) KafkaData { + m := make(KafkaData) + m["metric"] = d.Metric + m["value"] = d.Value + m["timestamp"] = d.Timestamp + m["value"] = d.Value + m["step"] = d.Step + m["endpoint"] = d.Endpoint + m["tags"] = d.Tags + return m +} + +func (kafka *KafkaPushEndpoint) send2KafkaTask() { + kf, err := NewKfClient(kafka.Section) + if err != nil { + logger.Errorf("init kafka client fail: %v", err) + return + } + defer kf.Close() + for { + kafkaItem := <-kafka.KafkaQueue + stats.Counter.Set("points.out.kafka", 1) + err = kf.Send(kafkaItem) + if err != nil { + stats.Counter.Set("points.out.kafka.err", 1) + logger.Errorf("send %v to kafka %s fail: %v", kafkaItem, kafka.Section.BrokersPeers, err) + } + } +} + type KafkaData map[string]interface{} type KfClient struct { producer sarama.AsyncProducer @@ -45,11 +113,11 @@ func NewKfClient(c KafkaSection) (kafkaSender *KfClient, err error) { cfg.Net.SASL.User = c.SaslUser cfg.Net.SASL.Password = c.SaslPasswd } - if c.Retry > 0 { - cfg.Producer.Retry.Max = c.Retry + if c.MaxRetry > 0 { + cfg.Producer.Retry.Max = c.MaxRetry } - cfg.Net.DialTimeout = time.Duration(connTimeout) * time.Millisecond + cfg.Net.DialTimeout = time.Duration(c.ConnTimeout) * time.Millisecond if c.KeepAlive > 0 { cfg.Net.KeepAlive = time.Duration(c.KeepAlive) * time.Millisecond @@ -58,10 +126,11 @@ func NewKfClient(c KafkaSection) (kafkaSender *KfClient, err error) { if err != nil { return } - kafkaSender = newSender(brokers, topic, cfg, producer) + kafkaSender = newSender(brokers, topic, cfg, producer, c.CallTimeout) return } -func newSender(brokers []string, topic string, cfg *sarama.Config, producer sarama.AsyncProducer) (kf *KfClient) { +func newSender(brokers []string, topic string, cfg *sarama.Config, producer sarama.AsyncProducer, + callTimeout int) (kf *KfClient) { kf = &KfClient{ producer: producer, Topic: topic, diff --git a/src/modules/transfer/backend/opentsdb.go b/src/modules/transfer/backend/opentsdb.go new file mode 100644 index 00000000..bfbb04e5 --- /dev/null +++ b/src/modules/transfer/backend/opentsdb.go @@ -0,0 +1,136 @@ +package backend + +import ( + "bytes" + "time" + + "github.com/didi/nightingale/src/dataobj" + "github.com/didi/nightingale/src/toolkits/pools" + "github.com/didi/nightingale/src/toolkits/stats" + "github.com/toolkits/pkg/concurrent/semaphore" + "github.com/toolkits/pkg/container/list" + "github.com/toolkits/pkg/logger" +) + +type OpenTsdbSection struct { + Enabled bool `yaml:"enabled"` + Name string `yaml:"name"` + Batch int `yaml:"batch"` + ConnTimeout int `yaml:"connTimeout"` + CallTimeout int `yaml:"callTimeout"` + WorkerNum int `yaml:"workerNum"` + MaxConns int `yaml:"maxConns"` + MaxIdle int `yaml:"maxIdle"` + MaxRetry int `yaml:"maxRetry"` + Address string `yaml:"address"` +} + +type OpenTsdbPushEndpoint struct { + // config + Section OpenTsdbSection + + OpenTsdbConnPoolHelper *pools.OpenTsdbConnPoolHelper + + // 发送缓存队列 node -> queue_of_data + OpenTsdbQueue *list.SafeListLimited +} + +func (opentsdb *OpenTsdbPushEndpoint) Init() { + // init connPool + if opentsdb.Section.Enabled { + opentsdb.OpenTsdbConnPoolHelper = pools.NewOpenTsdbConnPoolHelper(opentsdb.Section.Address, + opentsdb.Section.MaxConns, opentsdb.Section.MaxIdle, opentsdb.Section.ConnTimeout, + opentsdb.Section.CallTimeout) + } + + // init queue + if opentsdb.Section.Enabled { + opentsdb.OpenTsdbQueue = list.NewSafeListLimited(DefaultSendQueueMaxSize) + } + + // start task + openTsdbConcurrent := opentsdb.Section.WorkerNum + if openTsdbConcurrent < 1 { + openTsdbConcurrent = 1 + } + go opentsdb.send2OpenTsdbTask(openTsdbConcurrent) + +} + +// 将原始数据入到tsdb发送缓存队列 +func (opentsdb *OpenTsdbPushEndpoint) Push2Queue(items []*dataobj.MetricValue) { + errCnt := 0 + for _, item := range items { + tsdbItem := opentsdb.convert2OpenTsdbItem(item) + isSuccess := opentsdb.OpenTsdbQueue.PushFront(tsdbItem) + + if !isSuccess { + errCnt += 1 + } + } + stats.Counter.Set("opentsdb.queue.err", errCnt) +} + +func (opentsdb *OpenTsdbPushEndpoint) send2OpenTsdbTask(concurrent int) { + batch := opentsdb.Section.Batch // 一次发送,最多batch条数据 + retry := opentsdb.Section.MaxRetry + addr := opentsdb.Section.Address + sema := semaphore.NewSemaphore(concurrent) + + for { + items := opentsdb.OpenTsdbQueue.PopBackBy(batch) + count := len(items) + if count == 0 { + time.Sleep(DefaultSendTaskSleepInterval) + continue + } + var openTsdbBuffer bytes.Buffer + + for i := 0; i < count; i++ { + tsdbItem := items[i].(*dataobj.OpenTsdbItem) + openTsdbBuffer.WriteString(tsdbItem.OpenTsdbString()) + openTsdbBuffer.WriteString("\n") + stats.Counter.Set("points.out.opentsdb", 1) + logger.Debug("send to opentsdb: ", tsdbItem) + } + // 同步Call + 有限并发 进行发送 + sema.Acquire() + go func(addr string, openTsdbBuffer bytes.Buffer, count int) { + defer sema.Release() + + var err error + sendOk := false + for i := 0; i < retry; i++ { + err = opentsdb.OpenTsdbConnPoolHelper.Send(openTsdbBuffer.Bytes()) + if err == nil { + sendOk = true + break + } + logger.Warningf("send opentsdb %s fail: %v", addr, err) + time.Sleep(100 * time.Millisecond) + } + + if !sendOk { + stats.Counter.Set("points.out.opentsdb.err", count) + for _, item := range items { + logger.Errorf("send %v to opentsdb %s fail: %v", item, addr, err) + } + } else { + logger.Debugf("send to opentsdb %s ok", addr) + } + }(addr, openTsdbBuffer, count) + } +} + +func (opentsdb *OpenTsdbPushEndpoint) convert2OpenTsdbItem(d *dataobj.MetricValue) *dataobj.OpenTsdbItem { + t := dataobj.OpenTsdbItem{Tags: make(map[string]string)} + + for k, v := range d.TagsMap { + t.Tags[k] = v + } + t.Tags["endpoint"] = d.Endpoint + t.Metric = d.Metric + t.Timestamp = d.Timestamp + t.Value = d.Value + return &t +} diff --git a/src/modules/transfer/backend/sender.go b/src/modules/transfer/backend/sender.go deleted file mode 100644 index 60f4caf3..00000000 --- a/src/modules/transfer/backend/sender.go +++ /dev/null @@ -1,535 +0,0 @@ -package backend - -import ( - "bytes" - "time" - - "github.com/didi/nightingale/src/dataobj" - "github.com/didi/nightingale/src/model" - "github.com/didi/nightingale/src/modules/transfer/cache" - "github.com/didi/nightingale/src/toolkits/stats" - "github.com/didi/nightingale/src/toolkits/str" - - client "github.com/influxdata/influxdb/client/v2" - "github.com/toolkits/pkg/concurrent/semaphore" - "github.com/toolkits/pkg/container/list" - "github.com/toolkits/pkg/logger" -) - -// send -const ( - DefaultSendTaskSleepInterval = time.Millisecond * 50 //默认睡眠间隔为50ms - MaxSendRetry = 10 -) - -var ( - MinStep int //最小上报周期,单位sec -) - -func startSendTasks() { - - tsdbConcurrent := Config.WorkerNum - if tsdbConcurrent < 1 { - tsdbConcurrent = 1 - } - - judgeConcurrent := Config.WorkerNum - if judgeConcurrent < 1 { - judgeConcurrent = 1 - } - - influxdbConcurrent := Config.Influxdb.WorkerNum - if influxdbConcurrent < 1 { - influxdbConcurrent = 1 - } - - openTsdbConcurrent := Config.OpenTsdb.WorkerNum - if openTsdbConcurrent < 1 { - openTsdbConcurrent = 1 - } - - if Config.Enabled { - for node, item := range Config.ClusterList { - for _, addr := range item.Addrs { - queue := TsdbQueues[node+addr] - go Send2TsdbTask(queue, node, addr, tsdbConcurrent) - } - } - } - - if Config.Enabled { - judgeQueue := JudgeQueues.GetAll() - for instance, queue := range judgeQueue { - go Send2JudgeTask(queue, instance, judgeConcurrent) - } - } - - if Config.Influxdb.Enabled { - go send2InfluxdbTask(influxdbConcurrent) - - } - - if Config.OpenTsdb.Enabled { - go send2OpenTsdbTask(openTsdbConcurrent) - - } - - if Config.Kafka.Enabled { - go send2KafkaTask() - } -} - -func Send2TsdbTask(Q *list.SafeListLimited, node, addr string, concurrent int) { - batch := Config.Batch // 一次发送,最多batch条数据 - Q = TsdbQueues[node+addr] - - sema := semaphore.NewSemaphore(concurrent) - - for { - items := Q.PopBackBy(batch) - count := len(items) - if count == 0 { - time.Sleep(DefaultSendTaskSleepInterval) - continue - } - - tsdbItems := make([]*dataobj.TsdbItem, count) - stats.Counter.Set("points.out.tsdb", count) - for i := 0; i < count; i++ { - tsdbItems[i] = items[i].(*dataobj.TsdbItem) - logger.Debug("send to tsdb->: ", tsdbItems[i]) - } - - //控制并发 - sema.Acquire() - go func(addr string, tsdbItems []*dataobj.TsdbItem, count int) { - defer sema.Release() - - resp := &dataobj.SimpleRpcResponse{} - var err error - sendOk := false - for i := 0; i < 3; i++ { //最多重试3次 - err = TsdbConnPools.Call(addr, "Tsdb.Send", tsdbItems, resp) - if err == nil { - sendOk = true - break - } - time.Sleep(time.Millisecond * 10) - } - - if !sendOk { - stats.Counter.Set("points.out.tsdb.err", count) - logger.Errorf("send %v to tsdb %s:%s fail: %v", tsdbItems, node, addr, err) - } else { - logger.Debugf("send to tsdb %s:%s ok", node, addr) - } - }(addr, tsdbItems, count) - } -} - -// Push2TsdbSendQueue pushes data to a TSDB instance which depends on the consistent ring. -func Push2TsdbSendQueue(items []*dataobj.MetricValue) { - errCnt := 0 - for _, item := range items { - tsdbItem := convert2TsdbItem(item) - stats.Counter.Set("tsdb.queue.push", 1) - - node, err := TsdbNodeRing.GetNode(item.PK()) - if err != nil { - logger.Warningf("get tsdb node error: %v", err) - continue - } - - cnode := Config.ClusterList[node] - for _, addr := range cnode.Addrs { - Q := TsdbQueues[node+addr] - // 队列已满 - if !Q.PushFront(tsdbItem) { - errCnt += 1 - } - } - } - - // statistics - if errCnt > 0 { - stats.Counter.Set("tsdb.queue.err", errCnt) - logger.Error("Push2TsdbSendQueue err num: ", errCnt) - } -} - -func Send2JudgeTask(Q *list.SafeListLimited, addr string, concurrent int) { - batch := Config.Batch - sema := semaphore.NewSemaphore(concurrent) - - for { - items := Q.PopBackBy(batch) - count := len(items) - if count == 0 { - time.Sleep(DefaultSendTaskSleepInterval) - continue - } - judgeItems := make([]*dataobj.JudgeItem, count) - stats.Counter.Set("points.out.judge", count) - for i := 0; i < count; i++ { - judgeItems[i] = items[i].(*dataobj.JudgeItem) - logger.Debug("send to judge: ", judgeItems[i]) - } - - sema.Acquire() - go func(addr string, judgeItems []*dataobj.JudgeItem, count int) { - defer sema.Release() - - resp := &dataobj.SimpleRpcResponse{} - var err error - sendOk := false - for i := 0; i < MaxSendRetry; i++ { - err = JudgeConnPools.Call(addr, "Judge.Send", judgeItems, resp) - if err == nil { - sendOk = true - break - } - logger.Warningf("send judge %s fail: %v", addr, err) - time.Sleep(time.Millisecond * 10) - } - - if !sendOk { - stats.Counter.Set("points.out.judge.err", count) - for _, item := range judgeItems { - logger.Errorf("send %v to judge %s fail: %v", item, addr, err) - } - } - - }(addr, judgeItems, count) - } -} - -func Push2JudgeSendQueue(items []*dataobj.MetricValue) { - errCnt := 0 - for _, item := range items { - key := str.PK(item.Metric, item.Endpoint) - stras := cache.StraMap.GetByKey(key) - - for _, stra := range stras { - if !TagMatch(stra.Tags, item.TagsMap) { - continue - } - judgeItem := &dataobj.JudgeItem{ - Endpoint: item.Endpoint, - Metric: item.Metric, - Value: item.Value, - Timestamp: item.Timestamp, - DsType: item.CounterType, - Tags: item.Tags, - TagsMap: item.TagsMap, - Step: int(item.Step), - Sid: stra.Id, - Extra: item.Extra, - } - - q, exists := JudgeQueues.Get(stra.JudgeInstance) - if exists { - if !q.PushFront(judgeItem) { - errCnt += 1 - } - } - } - } - stats.Counter.Set("judge.queue.err", errCnt) -} - -// 打到 Tsdb 的数据,要根据 rrdtool 的特定 来限制 step、counterType、timestamp -func convert2TsdbItem(d *dataobj.MetricValue) *dataobj.TsdbItem { - item := &dataobj.TsdbItem{ - Endpoint: d.Endpoint, - Metric: d.Metric, - Value: d.Value, - Timestamp: d.Timestamp, - Tags: d.Tags, - TagsMap: d.TagsMap, - Step: int(d.Step), - Heartbeat: int(d.Step) * 2, - DsType: dataobj.GAUGE, - Min: "U", - Max: "U", - } - - return item -} - -func alignTs(ts int64, period int64) int64 { - return ts - ts%period -} - -func TagMatch(straTags []model.Tag, tag map[string]string) bool { - for _, stag := range straTags { - if _, exists := tag[stag.Tkey]; !exists { - return false - } - var match bool - if stag.Topt == "=" { //当前策略 tagkey 对应的 tagv - for _, v := range stag.Tval { - if tag[stag.Tkey] == v { - match = true - break - } - } - } else { - match = true - for _, v := range stag.Tval { - if tag[stag.Tkey] == v { - match = false - return match - } - } - } - - if !match { - return false - } - } - return true -} - -type InfluxClient struct { - Client client.Client - Database string - Precision string -} - -func NewInfluxdbClient() (*InfluxClient, error) { - c, err := client.NewHTTPClient(client.HTTPConfig{ - Addr: Config.Influxdb.Address, - Username: Config.Influxdb.Username, - Password: Config.Influxdb.Password, - Timeout: time.Millisecond * time.Duration(Config.Influxdb.Timeout), - }) - - if err != nil { - return nil, err - } - - return &InfluxClient{ - Client: c, - Database: Config.Influxdb.Database, - Precision: Config.Influxdb.Precision, - }, nil -} - -func (c *InfluxClient) Send(items []*dataobj.InfluxdbItem) error { - bp, err := client.NewBatchPoints(client.BatchPointsConfig{ - Database: c.Database, - Precision: c.Precision, - }) - if err != nil { - logger.Error("create batch points error: ", err) - return err - } - - for _, item := range items { - pt, err := client.NewPoint(item.Measurement, item.Tags, item.Fields, time.Unix(item.Timestamp, 0)) - if err != nil { - logger.Error("create new points error: ", err) - continue - } - bp.AddPoint(pt) - } - - return c.Client.Write(bp) -} - -// 将原始数据插入到influxdb缓存队列 -func Push2InfluxdbSendQueue(items []*dataobj.MetricValue) { - errCnt := 0 - for _, item := range items { - influxdbItem := convert2InfluxdbItem(item) - isSuccess := InfluxdbQueue.PushFront(influxdbItem) - - if !isSuccess { - errCnt += 1 - } - } - stats.Counter.Set("influxdb.queue.err", errCnt) -} - -func convert2InfluxdbItem(d *dataobj.MetricValue) *dataobj.InfluxdbItem { - t := dataobj.InfluxdbItem{Tags: make(map[string]string), Fields: make(map[string]interface{})} - - for k, v := range d.TagsMap { - t.Tags[k] = v - } - t.Tags["endpoint"] = d.Endpoint - t.Measurement = d.Metric - t.Fields["value"] = d.Value - t.Timestamp = d.Timestamp - - return &t -} - -func send2InfluxdbTask(concurrent int) { - batch := Config.Influxdb.Batch // 一次发送,最多batch条数据 - retry := Config.Influxdb.MaxRetry - addr := Config.Influxdb.Address - sema := semaphore.NewSemaphore(concurrent) - - var err error - c, err := NewInfluxdbClient() - defer c.Client.Close() - - if err != nil { - logger.Errorf("init influxdb client fail: %v", err) - return - } - - for { - items := InfluxdbQueue.PopBackBy(batch) - count := len(items) - if count == 0 { - time.Sleep(DefaultSendTaskSleepInterval) - continue - } - - influxdbItems := make([]*dataobj.InfluxdbItem, count) - for i := 0; i < count; i++ { - influxdbItems[i] = items[i].(*dataobj.InfluxdbItem) - stats.Counter.Set("points.out.influxdb", 1) - logger.Debug("send to influxdb: ", influxdbItems[i]) - } - - // 同步Call + 有限并发 进行发送 - sema.Acquire() - go func(addr string, influxdbItems []*dataobj.InfluxdbItem, count int) { - defer sema.Release() - sendOk := false - - for i := 0; i < retry; i++ { - err = c.Send(influxdbItems) - if err == nil { - sendOk = true - break - } - logger.Warningf("send influxdb fail: %v", err) - time.Sleep(time.Millisecond * 10) - } - - if !sendOk { - stats.Counter.Set("points.out.influxdb.err", count) - logger.Errorf("send %v to influxdb %s fail: %v", influxdbItems, addr, err) - } else { - logger.Debugf("send to influxdb %s ok", addr) - } - }(addr, influxdbItems, count) - } -} - -// 将原始数据入到tsdb发送缓存队列 -func Push2OpenTsdbSendQueue(items []*dataobj.MetricValue) { - errCnt := 0 - for _, item := range items { - tsdbItem := convert2OpenTsdbItem(item) - isSuccess := OpenTsdbQueue.PushFront(tsdbItem) - - if !isSuccess { - errCnt += 1 - } - } - stats.Counter.Set("opentsdb.queue.err", errCnt) -} - -func send2OpenTsdbTask(concurrent int) { - batch := Config.OpenTsdb.Batch // 一次发送,最多batch条数据 - retry := Config.OpenTsdb.MaxRetry - addr := Config.OpenTsdb.Address - sema := semaphore.NewSemaphore(concurrent) - - for { - items := OpenTsdbQueue.PopBackBy(batch) - count := len(items) - if count == 0 { - time.Sleep(DefaultSendTaskSleepInterval) - continue - } - var openTsdbBuffer bytes.Buffer - - for i := 0; i < count; i++ { - tsdbItem := items[i].(*dataobj.OpenTsdbItem) - openTsdbBuffer.WriteString(tsdbItem.OpenTsdbString()) - openTsdbBuffer.WriteString("\n") - stats.Counter.Set("points.out.opentsdb", 1) - logger.Debug("send to opentsdb: ", tsdbItem) - } - // 同步Call + 有限并发 进行发送 - sema.Acquire() - go func(addr string, openTsdbBuffer bytes.Buffer, count int) { - defer sema.Release() - - var err error - sendOk := false - for i := 0; i < retry; i++ { - err = OpenTsdbConnPoolHelper.Send(openTsdbBuffer.Bytes()) - if err == nil { - sendOk = true - break - } - logger.Warningf("send opentsdb %s fail: %v", addr, err) - time.Sleep(100 * time.Millisecond) - } - - if !sendOk { - stats.Counter.Set("points.out.opentsdb.err", count) - for _, item := range items { - logger.Errorf("send %v to opentsdb %s fail: %v", item, addr, err) - } - } else { - logger.Debugf("send to opentsdb %s ok", addr) - } - }(addr, openTsdbBuffer, count) - } -} - -func convert2OpenTsdbItem(d *dataobj.MetricValue) *dataobj.OpenTsdbItem { - t := dataobj.OpenTsdbItem{Tags: make(map[string]string)} - - for k, v := range d.TagsMap { - t.Tags[k] = v - } - t.Tags["endpoint"] = d.Endpoint - t.Metric = d.Metric - t.Timestamp = d.Timestamp - t.Value = d.Value - return &t -} - -func Push2KafkaSendQueue(items []*dataobj.MetricValue) { - for _, item := range items { - KafkaQueue <- convert2KafkaItem(item) - } -} -func convert2KafkaItem(d *dataobj.MetricValue) KafkaData { - m := make(KafkaData) - m["metric"] = d.Metric - m["value"] = d.Value - m["timestamp"] = d.Timestamp - m["value"] = d.Value - m["step"] = d.Step - m["endpoint"] = d.Endpoint - m["tags"] = d.Tags - return m -} - -func send2KafkaTask() { - kf, err := NewKfClient(Config.Kafka) - if err != nil { - logger.Errorf("init kafka client fail: %v", err) - return - } - defer kf.Close() - for { - kafkaItem := <-KafkaQueue - stats.Counter.Set("points.out.kafka", 1) - err = kf.Send(kafkaItem) - if err != nil { - stats.Counter.Set("points.out.kafka.err", 1) - logger.Errorf("send %v to kafka %s fail: %v", kafkaItem, Config.Kafka.BrokersPeers, err) - } - } -} diff --git a/src/modules/transfer/backend/tsdb/index.go b/src/modules/transfer/backend/tsdb/index.go new file mode 100644 index 00000000..7e3cc08e --- /dev/null +++ b/src/modules/transfer/backend/tsdb/index.go @@ -0,0 +1,57 @@ +package tsdb + +import ( + "fmt" + "sync" + "time" + + "github.com/didi/nightingale/src/toolkits/report" + "github.com/didi/nightingale/src/toolkits/stats" + + "github.com/toolkits/pkg/logger" +) + +var IndexList IndexAddrs + +type IndexAddrs struct { + sync.RWMutex + Data []string +} + +func (i *IndexAddrs) Set(addrs []string) { + i.Lock() + defer i.Unlock() + i.Data = addrs +} + +func (i *IndexAddrs) Get() []string { + i.RLock() + defer i.RUnlock() + return i.Data +} + +func GetIndexLoop() { + t1 := time.NewTicker(time.Duration(9) * time.Second) + GetIndex() + for { + <-t1.C + GetIndex() + } +} + +func GetIndex() { + instances, err := report.GetAlive("index", "monapi") + if err != nil { + stats.Counter.Set("get.index.err", 1) + logger.Warningf("get index list err:%v", err) + return + } + + activeIndexs := []string{} + for _, instance := range instances { + activeIndexs = append(activeIndexs, fmt.Sprintf("%s:%s", instance.Identity, instance.HTTPPort)) + } + + IndexList.Set(activeIndexs) + return +} diff --git a/src/modules/transfer/backend/query.go b/src/modules/transfer/backend/tsdb/query.go similarity index 56% rename from src/modules/transfer/backend/query.go rename to src/modules/transfer/backend/tsdb/query.go index 3d91e634..c2b11fd1 100644 --- a/src/modules/transfer/backend/query.go +++ b/src/modules/transfer/backend/tsdb/query.go @@ -1,26 +1,24 @@ -package backend +package tsdb import ( - "encoding/json" "errors" "fmt" - "math/rand" - "strings" "time" + "github.com/toolkits/pkg/net/httplib" + "github.com/didi/nightingale/src/dataobj" "github.com/didi/nightingale/src/modules/transfer/calc" - "github.com/didi/nightingale/src/toolkits/address" "github.com/didi/nightingale/src/toolkits/pools" "github.com/didi/nightingale/src/toolkits/stats" - "github.com/toolkits/pkg/logger" - "github.com/toolkits/pkg/net/httplib" "github.com/toolkits/pkg/pool" ) -func FetchData(inputs []dataobj.QueryData) []*dataobj.TsdbQueryResponse { +func (tsdb *TsdbDataSource) QueryData(inputs []dataobj.QueryData) []*dataobj.TsdbQueryResponse { + logger.Debugf("query data, inputs: %+v", inputs) + workerNum := 100 worker := make(chan struct{}, workerNum) // 控制 goroutine 并发数 dataChan := make(chan *dataobj.TsdbQueryResponse, 20000) @@ -38,7 +36,8 @@ func FetchData(inputs []dataobj.QueryData) []*dataobj.TsdbQueryResponse { for _, endpoint := range input.Endpoints { for _, counter := range input.Counters { worker <- struct{}{} - go fetchDataSync(input.Start, input.End, input.ConsolFunc, endpoint, counter, input.Step, worker, dataChan) + go tsdb.fetchDataSync(input.Start, input.End, input.ConsolFunc, endpoint, counter, input.Step, worker, + dataChan) } } } @@ -55,7 +54,10 @@ func FetchData(inputs []dataobj.QueryData) []*dataobj.TsdbQueryResponse { return resp } -func FetchDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse { +func (tsdb *TsdbDataSource) QueryDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse { + + logger.Debugf("query data for ui, input: %+v", input) + workerNum := 100 worker := make(chan struct{}, workerNum) // 控制 goroutine 并发数 dataChan := make(chan *dataobj.TsdbQueryResponse, 20000) @@ -71,22 +73,24 @@ func FetchDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse { for _, endpoint := range input.Endpoints { if len(input.Tags) == 0 { - counter, err := GetCounter(input.Metric, "", nil) + counter, err := dataobj.GetCounter(input.Metric, "", nil) if err != nil { logger.Warningf("get counter error: %+v", err) continue } worker <- struct{}{} - go fetchDataSync(input.Start, input.End, input.ConsolFunc, endpoint, counter, input.Step, worker, dataChan) + go tsdb.fetchDataSync(input.Start, input.End, input.ConsolFunc, endpoint, counter, input.Step, worker, + dataChan) } else { for _, tag := range input.Tags { - counter, err := GetCounter(input.Metric, tag, nil) + counter, err := dataobj.GetCounter(input.Metric, tag, nil) if err != nil { logger.Warningf("get counter error: %+v", err) continue } worker <- struct{}{} - go fetchDataSync(input.Start, input.End, input.ConsolFunc, endpoint, counter, input.Step, worker, dataChan) + go tsdb.fetchDataSync(input.Start, input.End, input.ConsolFunc, endpoint, counter, input.Step, worker, + dataChan) } } } @@ -154,30 +158,16 @@ func FetchDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse { return resp } -func GetCounter(metric, tag string, tagMap map[string]string) (counter string, err error) { - if tagMap == nil { - tagMap, err = dataobj.SplitTagsString(tag) - if err != nil { - logger.Warningf("split tag string error: %+v", err) - return - } - } - - tagStr := dataobj.SortedTags(tagMap) - counter = dataobj.PKWithTags(metric, tagStr) - return -} - -func fetchDataSync(start, end int64, consolFun, endpoint, counter string, step int, worker chan struct{}, dataChan chan *dataobj.TsdbQueryResponse) { +func (tsdb *TsdbDataSource) fetchDataSync(start, end int64, consolFun, endpoint, counter string, step int, worker chan struct{}, dataChan chan *dataobj.TsdbQueryResponse) { defer func() { <-worker }() stats.Counter.Set("query.tsdb", 1) - data, err := fetchData(start, end, consolFun, endpoint, counter, step) + data, err := tsdb.fetchData(start, end, consolFun, endpoint, counter, step) if err != nil { logger.Warningf("fetch tsdb data error: %+v", err) - stats.Counter.Set("query.data.err", 1) + stats.Counter.Set("query.tsdb.err", 1) data.Endpoint = endpoint data.Counter = counter data.Step = step @@ -185,11 +175,11 @@ func fetchDataSync(start, end int64, consolFun, endpoint, counter string, step i dataChan <- data } -func fetchData(start, end int64, consolFun, endpoint, counter string, step int) (*dataobj.TsdbQueryResponse, error) { +func (tsdb *TsdbDataSource) fetchData(start, end int64, consolFun, endpoint, counter string, step int) (*dataobj.TsdbQueryResponse, error) { var resp *dataobj.TsdbQueryResponse - qparm := GenQParam(start, end, consolFun, endpoint, counter, step) - resp, err := QueryOne(qparm) + qparm := genQParam(start, end, consolFun, endpoint, counter, step) + resp, err := tsdb.QueryOne(qparm) if err != nil { return resp, err } @@ -200,12 +190,7 @@ func fetchData(start, end int64, consolFun, endpoint, counter string, step int) return resp, nil } -func getCounterStep(endpoint, counter string) (step int, err error) { - //从内存中获取 - return -} - -func GenQParam(start, end int64, consolFunc, endpoint, counter string, step int) dataobj.TsdbQueryParam { +func genQParam(start, end int64, consolFunc, endpoint, counter string, step int) dataobj.TsdbQueryParam { return dataobj.TsdbQueryParam{ Start: start, End: end, @@ -216,12 +201,12 @@ func GenQParam(start, end int64, consolFunc, endpoint, counter string, step int) } } -func QueryOne(para dataobj.TsdbQueryParam) (resp *dataobj.TsdbQueryResponse, err error) { +func (tsdb *TsdbDataSource) QueryOne(para dataobj.TsdbQueryParam) (resp *dataobj.TsdbQueryResponse, err error) { start, end := para.Start, para.End resp = &dataobj.TsdbQueryResponse{} pk := dataobj.PKWithCounter(para.Endpoint, para.Counter) - ps, err := SelectPoolByPK(pk) + ps, err := tsdb.SelectPoolByPK(pk) if err != nil { return resp, err } @@ -259,7 +244,7 @@ func QueryOne(para dataobj.TsdbQueryParam) (resp *dataobj.TsdbQueryResponse, err }() select { - case <-time.After(time.Duration(callTimeout) * time.Millisecond): + case <-time.After(time.Duration(tsdb.Section.CallTimeout) * time.Millisecond): onePool.ForceClose(conn) logger.Errorf("%s, call timeout. proc: %s", addr, onePool.Proc()) break @@ -297,20 +282,20 @@ type Pool struct { Addr string } -func SelectPoolByPK(pk string) ([]Pool, error) { - node, err := TsdbNodeRing.GetNode(pk) +func (tsdb *TsdbDataSource) SelectPoolByPK(pk string) ([]Pool, error) { + node, err := tsdb.TsdbNodeRing.GetNode(pk) if err != nil { return []Pool{}, err } - nodeAddrs, found := Config.ClusterList[node] + nodeAddrs, found := tsdb.Section.ClusterList[node] if !found { return []Pool{}, errors.New("node not found") } var ps []Pool for _, addr := range nodeAddrs.Addrs { - onePool, found := TsdbConnPools.Get(addr) + onePool, found := tsdb.TsdbConnPools.Get(addr) if !found { logger.Errorf("addr %s not found", addr) continue @@ -325,97 +310,113 @@ func SelectPoolByPK(pk string) ([]Pool, error) { return ps, nil } -func getTags(counter string) (tags string) { - idx := strings.IndexAny(counter, "/") - if idx == -1 { - return "" - } - return counter[idx+1:] +type IndexMetricsResp struct { + Data *dataobj.MetricResp `json:"dat"` + Err string `json:"err"` } -type Tagkv struct { - TagK string `json:"tagk"` - TagV []string `json:"tagv"` -} - -type SeriesReq struct { - Endpoints []string `json:"endpoints"` - Metric string `json:"metric"` - Tagkv []*Tagkv `json:"tagkv"` -} - -type SeriesResp struct { - Dat []Series `json:"dat"` - Err string `json:"err"` -} - -type Series struct { - Endpoints []string `json:"endpoints"` - Metric string `json:"metric"` - Tags []string `json:"tags"` - Step int `json:"step"` - DsType string `json:"dstype"` -} - -func GetSeries(start, end int64, req []SeriesReq) ([]dataobj.QueryData, error) { - var res SeriesResp - var queryDatas []dataobj.QueryData - - if len(req) < 1 { - return queryDatas, fmt.Errorf("req length < 1") - } - - addrs := address.GetHTTPAddresses("index") - - if len(addrs) < 1 { - return queryDatas, fmt.Errorf("index addr is nil") - } - - i := rand.Intn(len(addrs)) - addr := fmt.Sprintf("http://%s/api/index/counter/fullmatch", addrs[i]) - - resp, code, err := httplib.PostJSON(addr, time.Duration(Config.IndexTimeout)*time.Millisecond, req, nil) +func (tsdb *TsdbDataSource) QueryMetrics(recv dataobj.EndpointsRecv) *dataobj.MetricResp { + var result IndexMetricsResp + err := PostIndex("/api/index/metrics", int64(tsdb.Section.CallTimeout), recv, &result) if err != nil { - return queryDatas, err + logger.Errorf("post index failed, %+v", err) + return nil } - if code != 200 { - return nil, fmt.Errorf("index response status code != 200") + if result.Err != "" { + logger.Errorf("index xclude failed, %+v", result.Err) + return nil } - if err = json.Unmarshal(resp, &res); err != nil { - logger.Error(string(resp)) - return queryDatas, err - } - - for _, item := range res.Dat { - counters := make([]string, 0) - if len(item.Tags) == 0 { - counters = append(counters, item.Metric) - } else { - for _, tag := range item.Tags { - tagMap, err := dataobj.SplitTagsString(tag) - if err != nil { - logger.Warning(err, tag) - continue - } - tagStr := dataobj.SortedTags(tagMap) - counter := dataobj.PKWithTags(item.Metric, tagStr) - counters = append(counters, counter) - } - } - - queryData := dataobj.QueryData{ - Start: start, - End: end, - Endpoints: item.Endpoints, - Counters: counters, - ConsolFunc: "AVERAGE", - DsType: item.DsType, - Step: item.Step, - } - queryDatas = append(queryDatas, queryData) - } - - return queryDatas, err + return result.Data +} + +type IndexTagPairsResp struct { + Data []dataobj.IndexTagkvResp `json:"dat"` + Err string `json:"err"` +} + +func (tsdb *TsdbDataSource) QueryTagPairs(recv dataobj.EndpointMetricRecv) []dataobj.IndexTagkvResp { + var result IndexTagPairsResp + err := PostIndex("/api/index/tagkv", int64(tsdb.Section.CallTimeout), recv, &result) + if err != nil { + logger.Errorf("post index failed, %+v", err) + return nil + } + + if result.Err != "" || len(result.Data) == 0 { + logger.Errorf("index xclude failed, %+v", result.Err) + return nil + } + + return result.Data +} + +type IndexCludeResp struct { + Data []dataobj.XcludeResp `json:"dat"` + Err string `json:"err"` +} + +func (tsdb *TsdbDataSource) QueryIndexByClude(recv []dataobj.CludeRecv) []dataobj.XcludeResp { + var result IndexCludeResp + err := PostIndex("/api/index/counter/clude", int64(tsdb.Section.CallTimeout), recv, &result) + if err != nil { + logger.Errorf("post index failed, %+v", err) + return nil + } + + if result.Err != "" || len(result.Data) == 0 { + logger.Errorf("index xclude failed, %+v", result.Err) + return nil + } + + return result.Data +} + +type IndexByFullTagsResp struct { + Data []dataobj.IndexByFullTagsResp `json:"dat"` + Err string `json:"err"` +} + +func (tsdb *TsdbDataSource) QueryIndexByFullTags(recv []dataobj.IndexByFullTagsRecv) []dataobj.IndexByFullTagsResp { + var result IndexByFullTagsResp + err := PostIndex("/api/index/counter/fullmatch", int64(tsdb.Section.CallTimeout), + recv, &result) + if err != nil { + logger.Errorf("post index failed, %+v", err) + return nil + } + + if result.Err != "" || len(result.Data) == 0 { + logger.Errorf("index fullTags failed, %+v", result.Err) + return nil + } + + return result.Data +} + +func PostIndex(url string, calltimeout int64, recv interface{}, resp interface{}) error { + addrs := IndexList.Get() + if len(addrs) == 0 { + logger.Errorf("empty index addr") + return errors.New("empty index addr") + } + + perm := rand.Perm(len(addrs)) + var err error + for i := range perm { + url := fmt.Sprintf("http://%s%s", addrs[perm[i]], url) + err = httplib.Post(url).JSONBodyQuiet(recv).SetTimeout( + time.Duration(calltimeout) * time.Millisecond).ToJSON(&resp) + if err == nil { + break + } + logger.Warningf("index %s failed, error:%v, req:%+v", url, err, recv) + } + + if err != nil { + logger.Errorf("index %s failed, error:%v, req:%+v", url, err, recv) + return err + } + return nil } diff --git a/src/modules/transfer/backend/ring.go b/src/modules/transfer/backend/tsdb/ring.go similarity index 98% rename from src/modules/transfer/backend/ring.go rename to src/modules/transfer/backend/tsdb/ring.go index cd53ec81..8d8806f1 100644 --- a/src/modules/transfer/backend/ring.go +++ b/src/modules/transfer/backend/tsdb/ring.go @@ -1,4 +1,4 @@ -package backend +package tsdb import ( "sync" diff --git a/src/modules/transfer/backend/tsdb/tsdb.go b/src/modules/transfer/backend/tsdb/tsdb.go new file mode 100644 index 00000000..4c47c094 --- /dev/null +++ b/src/modules/transfer/backend/tsdb/tsdb.go @@ -0,0 +1,212 @@ +package tsdb + +import ( + "strings" + "time" + + "github.com/toolkits/pkg/errors" + + "github.com/didi/nightingale/src/dataobj" + "github.com/didi/nightingale/src/toolkits/pools" + "github.com/didi/nightingale/src/toolkits/stats" + "github.com/toolkits/pkg/concurrent/semaphore" + "github.com/toolkits/pkg/container/list" + "github.com/toolkits/pkg/container/set" + "github.com/toolkits/pkg/logger" + "github.com/toolkits/pkg/str" +) + +type TsdbSection struct { + Enabled bool `yaml:"enabled"` + Name string `yaml:"name"` + Batch int `yaml:"batch"` + ConnTimeout int `yaml:"connTimeout"` + CallTimeout int `yaml:"callTimeout"` + WorkerNum int `yaml:"workerNum"` + MaxConns int `yaml:"maxConns"` + MaxIdle int `yaml:"maxIdle"` + IndexTimeout int `yaml:"indexTimeout"` + + Replicas int `yaml:"replicas"` + Cluster map[string]string `yaml:"cluster"` + ClusterList map[string]*ClusterNode `json:"clusterList"` +} + +type ClusterNode struct { + Addrs []string `json:"addrs"` +} + +type TsdbDataSource struct { + //config + Section TsdbSection + SendQueueMaxSize int + SendTaskSleepInterval time.Duration + + // 服务节点的一致性哈希环 pk -> node + TsdbNodeRing *ConsistentHashRing + + // 发送缓存队列 node -> queue_of_data + TsdbQueues map[string]*list.SafeListLimited + + // 连接池 node_address -> connection_pool + TsdbConnPools *pools.ConnPools +} + +func (tsdb *TsdbDataSource) Init() { + + // init hash ring + tsdb.TsdbNodeRing = NewConsistentHashRing(int32(tsdb.Section.Replicas), + str.KeysOfMap(tsdb.Section.Cluster)) + + // init connPool + tsdbInstances := set.NewSafeSet() + for _, item := range tsdb.Section.ClusterList { + for _, addr := range item.Addrs { + tsdbInstances.Add(addr) + } + } + tsdb.TsdbConnPools = pools.NewConnPools( + tsdb.Section.MaxConns, tsdb.Section.MaxIdle, tsdb.Section.ConnTimeout, tsdb.Section.CallTimeout, + tsdbInstances.ToSlice(), + ) + + // init queues + tsdb.TsdbQueues = make(map[string]*list.SafeListLimited) + for node, item := range tsdb.Section.ClusterList { + for _, addr := range item.Addrs { + tsdb.TsdbQueues[node+addr] = list.NewSafeListLimited(tsdb.SendQueueMaxSize) + } + } + + // start task + tsdbConcurrent := tsdb.Section.WorkerNum + if tsdbConcurrent < 1 { + tsdbConcurrent = 1 + } + for node, item := range tsdb.Section.ClusterList { + for _, addr := range item.Addrs { + queue := tsdb.TsdbQueues[node+addr] + go tsdb.Send2TsdbTask(queue, node, addr, tsdbConcurrent) + } + } + + go GetIndexLoop() +} + +// Push2TsdbSendQueue pushes data to a TSDB instance which depends on the consistent ring. +func (tsdb *TsdbDataSource) Push2Queue(items []*dataobj.MetricValue) { + errCnt := 0 + for _, item := range items { + tsdbItem := convert2TsdbItem(item) + stats.Counter.Set("tsdb.queue.push", 1) + + node, err := tsdb.TsdbNodeRing.GetNode(item.PK()) + if err != nil { + logger.Warningf("get tsdb node error: %v", err) + continue + } + + cnode := tsdb.Section.ClusterList[node] + for _, addr := range cnode.Addrs { + Q := tsdb.TsdbQueues[node+addr] + // 队列已满 + if !Q.PushFront(tsdbItem) { + errCnt += 1 + } + } + } + + // statistics + if errCnt > 0 { + stats.Counter.Set("tsdb.queue.err", errCnt) + logger.Error("Push2TsdbSendQueue err num: ", errCnt) + } +} + +func (tsdb *TsdbDataSource) Send2TsdbTask(Q *list.SafeListLimited, node, addr string, concurrent int) { + batch := tsdb.Section.Batch // 一次发送,最多batch条数据 + Q = tsdb.TsdbQueues[node+addr] + + sema := semaphore.NewSemaphore(concurrent) + + for { + items := Q.PopBackBy(batch) + count := len(items) + if count == 0 { + time.Sleep(tsdb.SendTaskSleepInterval) + continue + } + + tsdbItems := make([]*dataobj.TsdbItem, count) + stats.Counter.Set("points.out.tsdb", count) + for i := 0; i < count; i++ { + tsdbItems[i] = items[i].(*dataobj.TsdbItem) + logger.Debug("send to tsdb->: ", tsdbItems[i]) + } + + //控制并发 + sema.Acquire() + go func(addr string, tsdbItems []*dataobj.TsdbItem, count int) { + defer sema.Release() + + resp := &dataobj.SimpleRpcResponse{} + var err error + sendOk := false + for i := 0; i < 3; i++ { //最多重试3次 + err = tsdb.TsdbConnPools.Call(addr, "Tsdb.Send", tsdbItems, resp) + if err == nil { + sendOk = true + break + } + time.Sleep(time.Millisecond * 10) + } + + if !sendOk { + stats.Counter.Set("points.out.tsdb.err", count) + logger.Errorf("send %v to tsdb %s:%s fail: %v", tsdbItems, node, addr, err) + } else { + logger.Debugf("send to tsdb %s:%s ok", node, addr) + } + }(addr, tsdbItems, count) + } +} + +func (tsdb *TsdbDataSource) GetInstance(metric, endpoint string, tags map[string]string) []string { + counter, err := dataobj.GetCounter(metric, "", tags) + errors.Dangerous(err) + + pk := dataobj.PKWithCounter(endpoint, counter) + pools, err := tsdb.SelectPoolByPK(pk) + addrs := make([]string, len(pools)) + for i, pool := range pools { + addrs[i] = pool.Addr + } + return addrs +} + +// 打到 Tsdb 的数据,要根据 rrdtool 的特定 来限制 step、counterType、timestamp +func convert2TsdbItem(d *dataobj.MetricValue) *dataobj.TsdbItem { + item := &dataobj.TsdbItem{ + Endpoint: d.Endpoint, + Metric: d.Metric, + Value: d.Value, + Timestamp: d.Timestamp, + Tags: d.Tags, + TagsMap: d.TagsMap, + Step: int(d.Step), + Heartbeat: int(d.Step) * 2, + DsType: dataobj.GAUGE, + Min: "U", + Max: "U", + } + + return item +} + +func getTags(counter string) (tags string) { + idx := strings.IndexAny(counter, "/") + if idx == -1 { + return "" + } + return counter[idx+1:] +} diff --git a/src/modules/transfer/config/config.go b/src/modules/transfer/config/config.go index 8d8fbd6d..2fbf3f4f 100644 --- a/src/modules/transfer/config/config.go +++ b/src/modules/transfer/config/config.go @@ -3,9 +3,12 @@ package config import ( "bytes" "fmt" + "github.com/didi/nightingale/src/toolkits/identity" + "github.com/didi/nightingale/src/toolkits/report" "strings" "github.com/didi/nightingale/src/modules/transfer/backend" + "github.com/didi/nightingale/src/modules/transfer/backend/tsdb" "github.com/didi/nightingale/src/toolkits/logger" "github.com/spf13/viper" @@ -13,12 +16,14 @@ import ( ) type ConfYaml struct { - Debug bool `yaml:"debug"` - MinStep int `yaml:"minStep"` - Logger logger.LoggerSection `yaml:"logger"` - Backend backend.BackendSection `yaml:"backend"` - HTTP HTTPSection `yaml:"http"` - RPC RPCSection `yaml:"rpc"` + Debug bool `yaml:"debug"` + MinStep int `yaml:"minStep"` + Logger logger.LoggerSection `yaml:"logger"` + Backend backend.BackendSection `yaml:"backend"` + HTTP HTTPSection `yaml:"http"` + RPC RPCSection `yaml:"rpc"` + Identity identity.IdentitySection `yaml:"identity"` + Report report.ReportSection `yaml:"report"` } type IndexSection struct { @@ -45,13 +50,13 @@ var ( Config *ConfYaml ) -func NewClusterNode(addrs []string) *backend.ClusterNode { - return &backend.ClusterNode{Addrs: addrs} +func NewClusterNode(addrs []string) *tsdb.ClusterNode { + return &tsdb.ClusterNode{Addrs: addrs} } // map["node"]="host1,host2" --> map["node"]=["host1", "host2"] -func formatClusterItems(cluster map[string]string) map[string]*backend.ClusterNode { - ret := make(map[string]*backend.ClusterNode) +func formatClusterItems(cluster map[string]string) map[string]*tsdb.ClusterNode { + ret := make(map[string]*tsdb.ClusterNode) for node, clusterStr := range cluster { items := strings.Split(clusterStr, ",") nitems := make([]string, 0) @@ -80,21 +85,36 @@ func Parse(conf string) error { viper.SetDefault("minStep", 1) viper.SetDefault("backend", map[string]interface{}{ + "datasource": "tsdb", + "straPath": "/api/portal/stras/effective?all=1", + }) + + viper.SetDefault("backend.judge", map[string]interface{}{ + "batch": 200, //每次拉取文件的个数 + "workerNum": 32, + "maxConns": 2000, //查询和推送数据的并发个数 + "maxIdle": 32, //建立的连接池的最大空闲数 + "connTimeout": 1000, //链接超时时间,单位毫秒 + "callTimeout": 3000, //访问超时时间,单位毫秒 + "hbsMod": "monapi", + }) + + viper.SetDefault("backend.tsdb", map[string]interface{}{ "enabled": true, + "name": "tsdb", "batch": 200, //每次拉取文件的个数 - "replicas": 500, //一致性hash虚拟节点 "workerNum": 32, "maxConns": 2000, //查询和推送数据的并发个数 "maxIdle": 32, //建立的连接池的最大空闲数 "connTimeout": 1000, //链接超时时间,单位毫秒 "callTimeout": 3000, //访问超时时间,单位毫秒 "indexTimeout": 3000, //访问index超时时间,单位毫秒 - "straPath": "/api/portal/stras/effective?all=1", - "hbsMod": "monapi", + "replicas": 500, //一致性hash虚拟节点 }) viper.SetDefault("backend.influxdb", map[string]interface{}{ "enabled": false, + "name": "influxdb", "batch": 200, //每次拉取文件的个数 "maxRetry": 3, //重试次数 "workerNum": 32, @@ -104,6 +124,7 @@ func Parse(conf string) error { viper.SetDefault("backend.opentsdb", map[string]interface{}{ "enabled": false, + "name": "opentsdb", "batch": 200, //每次拉取文件的个数 "maxRetry": 3, //重试次数 "workerNum": 32, @@ -113,12 +134,29 @@ func Parse(conf string) error { "callTimeout": 3000, //访问超时时间,单位毫秒 }) + viper.SetDefault("backend.kafka", map[string]interface{}{ + "enabled": false, + "name": "kafka", + "maxRetry": 3, //重试次数 + "connTimeout": 1000, //链接超时时间,单位毫秒 + "callTimeout": 3000, //访问超时时间,单位毫秒 + }) + + viper.SetDefault("report", map[string]interface{}{ + "mod": "transfer", + "enabled": true, + "interval": 4000, + "timeout": 3000, + "api": "api/hbs/heartbeat", + "remark": "", + }) + err = viper.Unmarshal(&Config) if err != nil { return fmt.Errorf("cannot read yml[%s]: %v", conf, err) } - Config.Backend.ClusterList = formatClusterItems(Config.Backend.Cluster) + Config.Backend.Tsdb.ClusterList = formatClusterItems(Config.Backend.Tsdb.Cluster) return err } diff --git a/src/modules/transfer/cron/queue.go b/src/modules/transfer/cron/queue.go index 10d8bc84..487dde6d 100644 --- a/src/modules/transfer/cron/queue.go +++ b/src/modules/transfer/cron/queue.go @@ -26,7 +26,7 @@ func updateJudgeQueue() { if !backend.JudgeQueues.Exists(instance) { q := list.NewSafeListLimited(backend.DefaultSendQueueMaxSize) backend.JudgeQueues.Set(instance, q) - go backend.Send2JudgeTask(q, instance, backend.Config.WorkerNum) + go backend.Send2JudgeTask(q, instance, backend.Judge.WorkerNum) } else { backend.JudgeQueues.UpdateTS(instance) } diff --git a/src/modules/transfer/cron/stra.go b/src/modules/transfer/cron/stra.go index 3281346a..0cdb0128 100644 --- a/src/modules/transfer/cron/stra.go +++ b/src/modules/transfer/cron/stra.go @@ -41,7 +41,7 @@ func getStrategy() { perm := rand.Perm(len(addrs)) var err error for i := range perm { - url := fmt.Sprintf("http://%s%s", addrs[perm[i]], backend.Config.StraPath) + url := fmt.Sprintf("http://%s%s", addrs[perm[i]], backend.StraPath) err = httplib.Get(url).SetTimeout(time.Duration(3000) * time.Millisecond).ToJSON(&stras) if err != nil { diff --git a/src/modules/transfer/http/routes/health_router.go b/src/modules/transfer/http/routes/health_router.go index 419162e7..68d79a77 100644 --- a/src/modules/transfer/http/routes/health_router.go +++ b/src/modules/transfer/http/routes/health_router.go @@ -4,7 +4,8 @@ import ( "fmt" "os" - "github.com/didi/nightingale/src/dataobj" + "github.com/toolkits/pkg/logger" + "github.com/didi/nightingale/src/modules/transfer/backend" "github.com/didi/nightingale/src/modules/transfer/cache" "github.com/didi/nightingale/src/toolkits/http/render" @@ -50,16 +51,14 @@ func tsdbInstance(c *gin.Context) { var input tsdbInstanceRecv errors.Dangerous(c.ShouldBindJSON(&input)) - counter, err := backend.GetCounter(input.Metric, "", input.TagMap) - errors.Dangerous(err) - - pk := dataobj.PKWithCounter(input.Endpoint, counter) - pools, err := backend.SelectPoolByPK(pk) - addrs := make([]string, len(pools)) - for i, pool := range pools { - addrs[i] = pool.Addr + dataSource, err := backend.GetDataSourceFor("tsdb") + if err != nil { + logger.Warningf("could not find datasource") + render.Message(c, err) + return } + addrs := dataSource.GetInstance(input.Metric, input.Endpoint, input.TagMap) render.Data(c, addrs, nil) } diff --git a/src/modules/transfer/http/routes/push_router.go b/src/modules/transfer/http/routes/push_router.go index 8cb45d90..d23ada67 100644 --- a/src/modules/transfer/http/routes/push_router.go +++ b/src/modules/transfer/http/routes/push_router.go @@ -40,24 +40,19 @@ func PushData(c *gin.Context) { metricValues = append(metricValues, v) } - if backend.Config.Enabled { - backend.Push2TsdbSendQueue(metricValues) - } + // send to judge + backend.Push2JudgeQueue(metricValues) - if backend.Config.Enabled { - backend.Push2JudgeSendQueue(metricValues) - } - - if backend.Config.Influxdb.Enabled { - backend.Push2InfluxdbSendQueue(metricValues) - } - - if backend.Config.OpenTsdb.Enabled { - backend.Push2OpenTsdbSendQueue(metricValues) - } - - if backend.Config.Kafka.Enabled { - backend.Push2KafkaSendQueue(metricValues) + // send to push endpoints + pushEndpoints, err := backend.GetPushEndpoints() + if err != nil { + logger.Errorf("could not find pushendpoint") + render.Data(c, "error", err) + return + } else { + for _, pushendpoint := range pushEndpoints { + pushendpoint.Push2Queue(metricValues) + } } if msg != "" { diff --git a/src/modules/transfer/http/routes/query_router.go b/src/modules/transfer/http/routes/query_router.go index 2be8f9fe..8c64a4b6 100644 --- a/src/modules/transfer/http/routes/query_router.go +++ b/src/modules/transfer/http/routes/query_router.go @@ -12,34 +12,22 @@ import ( ) type QueryDataReq struct { - Start int64 `json:"start"` - End int64 `json:"end"` - Series []backend.SeriesReq `json:"series"` -} - -func QueryDataForJudge(c *gin.Context) { - var inputs []dataobj.QueryData - - errors.Dangerous(c.ShouldBindJSON(&inputs)) - resp := backend.FetchData(inputs) - render.Data(c, resp, nil) + queryData []dataobj.QueryData } func QueryData(c *gin.Context) { stats.Counter.Set("data.api.qp10s", 1) - var input QueryDataReq - - errors.Dangerous(c.ShouldBindJSON(&input)) - - queryData, err := backend.GetSeries(input.Start, input.End, input.Series) + dataSource, err := backend.GetDataSourceFor("") if err != nil { - logger.Error(err, input) - render.Message(c, "query err") + logger.Warningf("could not find datasource") + render.Message(c, err) return } - resp := backend.FetchData(queryData) + var queryDataReq QueryDataReq + errors.Dangerous(c.ShouldBindJSON(&queryDataReq)) + resp := dataSource.QueryData(queryDataReq.queryData) render.Data(c, resp, nil) } @@ -51,7 +39,13 @@ func QueryDataForUI(c *gin.Context) { start := input.Start end := input.End - resp := backend.FetchDataForUI(input) + dataSource, err := backend.GetDataSourceFor("") + if err != nil { + logger.Warningf("could not find datasource") + render.Message(c, err) + return + } + resp := dataSource.QueryDataForUI(input) for _, d := range resp { data := &dataobj.QueryDataForUIResp{ Start: d.Start, @@ -70,7 +64,7 @@ func QueryDataForUI(c *gin.Context) { comparison := input.Comparisons[i] input.Start = start - comparison input.End = end - comparison - res := backend.FetchDataForUI(input) + res := dataSource.QueryDataForUI(input) for _, d := range res { for j := range d.Values { d.Values[j].Timestamp += comparison @@ -93,3 +87,68 @@ func QueryDataForUI(c *gin.Context) { render.Data(c, respData, nil) } + +func GetMetrics(c *gin.Context) { + stats.Counter.Set("metric.qp10s", 1) + recv := dataobj.EndpointsRecv{} + errors.Dangerous(c.ShouldBindJSON(&recv)) + + dataSource, err := backend.GetDataSourceFor("") + if err != nil { + logger.Warningf("could not find datasource") + render.Message(c, err) + return + } + + resp := dataSource.QueryMetrics(recv) + + render.Data(c, resp, nil) +} + +func GetTagPairs(c *gin.Context) { + stats.Counter.Set("tag.qp10s", 1) + recv := dataobj.EndpointMetricRecv{} + errors.Dangerous(c.ShouldBindJSON(&recv)) + + dataSource, err := backend.GetDataSourceFor("") + if err != nil { + logger.Warningf("could not find datasource") + render.Message(c, err) + return + } + + resp := dataSource.QueryTagPairs(recv) + render.Data(c, resp, nil) +} + +func GetIndexByClude(c *gin.Context) { + stats.Counter.Set("xclude.qp10s", 1) + recvs := make([]dataobj.CludeRecv, 0) + errors.Dangerous(c.ShouldBindJSON(&recvs)) + + dataSource, err := backend.GetDataSourceFor("") + if err != nil { + logger.Warningf("could not find datasource") + render.Message(c, err) + return + } + + resp := dataSource.QueryIndexByClude(recvs) + render.Data(c, resp, nil) +} + +func GetIndexByFullTags(c *gin.Context) { + stats.Counter.Set("counter.qp10s", 1) + recvs := make([]dataobj.IndexByFullTagsRecv, 0) + errors.Dangerous(c.ShouldBindJSON(&recvs)) + + dataSource, err := backend.GetDataSourceFor("") + if err != nil { + logger.Warningf("could not find datasource") + render.Message(c, err) + return + } + + resp := dataSource.QueryIndexByFullTags(recvs) + render.Data(c, resp, nil) +} diff --git a/src/modules/transfer/http/routes/routes.go b/src/modules/transfer/http/routes/routes.go index cec93b7c..1abfadf5 100644 --- a/src/modules/transfer/http/routes/routes.go +++ b/src/modules/transfer/http/routes/routes.go @@ -22,6 +22,14 @@ func Config(r *gin.Engine) { sys.POST("/data/ui", QueryDataForUI) } + index := r.Group("/api/index") + { + index.POST("/metrics", GetMetrics) + index.POST("/tagkv", GetTagPairs) + index.POST("/counter/clude", GetIndexByClude) + index.POST("/counter/fullmatch", GetIndexByFullTags) + } + v2 := r.Group("/api/transfer/v2") { v2.POST("/data", QueryData) diff --git a/src/modules/transfer/rpc/push.go b/src/modules/transfer/rpc/push.go index 4b80c1ad..9db134d8 100644 --- a/src/modules/transfer/rpc/push.go +++ b/src/modules/transfer/rpc/push.go @@ -36,25 +36,20 @@ func (t *Transfer) Push(args []*dataobj.MetricValue, reply *dataobj.TransferResp items = append(items, v) } - if backend.Config.Enabled { - backend.Push2TsdbSendQueue(items) + // send to judge + backend.Push2JudgeQueue(items) + + // send to push endpoints + pushEndpoints, err := backend.GetPushEndpoints() + if err != nil { + logger.Errorf("could not find pushendpoint") + return err + } else { + for _, pushendpoint := range pushEndpoints { + pushendpoint.Push2Queue(items) + } } - if backend.Config.Enabled { - backend.Push2JudgeSendQueue(items) - } - - if backend.Config.Influxdb.Enabled { - backend.Push2InfluxdbSendQueue(items) - } - - if backend.Config.OpenTsdb.Enabled { - backend.Push2OpenTsdbSendQueue(items) - } - - if backend.Config.Kafka.Enabled { - backend.Push2KafkaSendQueue(items) - } if reply.Invalid == 0 { reply.Msg = "ok" } diff --git a/src/modules/transfer/rpc/query.go b/src/modules/transfer/rpc/query.go index df79e87b..5232698e 100644 --- a/src/modules/transfer/rpc/query.go +++ b/src/modules/transfer/rpc/query.go @@ -3,9 +3,15 @@ package rpc import ( "github.com/didi/nightingale/src/dataobj" "github.com/didi/nightingale/src/modules/transfer/backend" + "github.com/toolkits/pkg/logger" ) func (t *Transfer) Query(args []dataobj.QueryData, reply *dataobj.QueryDataResp) error { - reply.Data = backend.FetchData(args) + dataSource, err := backend.GetDataSourceFor("") + if err != nil { + logger.Warningf("could not find datasource") + return err + } + reply.Data = dataSource.QueryData(args) return nil } diff --git a/src/modules/transfer/transfer.go b/src/modules/transfer/transfer.go index 6b9f46de..29089b68 100644 --- a/src/modules/transfer/transfer.go +++ b/src/modules/transfer/transfer.go @@ -13,7 +13,9 @@ import ( "github.com/didi/nightingale/src/modules/transfer/http/routes" "github.com/didi/nightingale/src/modules/transfer/rpc" "github.com/didi/nightingale/src/toolkits/http" + "github.com/didi/nightingale/src/toolkits/identity" tlogger "github.com/didi/nightingale/src/toolkits/logger" + "github.com/didi/nightingale/src/toolkits/report" "github.com/didi/nightingale/src/toolkits/stats" "github.com/gin-gonic/gin" @@ -61,9 +63,11 @@ func main() { tlogger.Init(cfg.Logger) go stats.Init("n9e.transfer") + identity.Init(cfg.Identity) backend.Init(cfg.Backend) cron.Init() + go report.Init(cfg.Report, "monapi") go rpc.Start() r := gin.New()