Transfer backend support opentsdb (#149)

* support send to opentsdb

* fix issue
This commit is contained in:
zengwh 2020-05-16 14:32:32 +08:00 committed by GitHub
parent c610d645bb
commit 7fa09f19de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 286 additions and 3 deletions

View File

@ -12,6 +12,10 @@ backend:
database: "n9e"
address: "http://127.0.0.1:8086"
opentsdb:
enabled: false
address: "127.0.0.1:4242"
logger:
dir: logs/transfer
level: WARNING

35
src/dataobj/opentsdb.go Normal file
View File

@ -0,0 +1,35 @@
package dataobj
import (
"fmt"
"strings"
)
type OpenTsdbItem struct {
Metric string `json:"metric"`
Tags map[string]string `json:"tags"`
Value float64 `json:"value"`
Timestamp int64 `json:"timestamp"`
}
func (t *OpenTsdbItem) String() string {
return fmt.Sprintf(
"<Metric:%s, Tags:%v, Value:%v, TS:%d>",
t.Metric,
t.Tags,
t.Value,
t.Timestamp,
)
}
func (t *OpenTsdbItem) OpenTsdbString() (s string) {
s = fmt.Sprintf("put %s %d %.3f ", t.Metric, t.Timestamp, t.Value)
for k, v := range t.Tags {
key := strings.ToLower(strings.Replace(k, " ", "_", -1))
value := strings.Replace(v, " ", "_", -1)
s += key + "=" + value + " "
}
return s
}

View File

@ -24,6 +24,18 @@ type InfluxdbSection struct {
Precision string `yaml:"precision"`
}
type OpenTsdbSection struct {
Enabled bool `yaml:"enabled"`
Batch int `yaml:"batch"`
ConnTimeout int `yaml:"connTimeout"`
CallTimeout int `yaml:"callTimeout"`
WorkerNum int `yaml:"workerNum"`
MaxConns int `yaml:"maxConns"`
MaxIdle int `yaml:"maxIdle"`
MaxRetry int `yaml:"maxRetry"`
Address string `yaml:"address"`
}
type BackendSection struct {
Enabled bool `yaml:"enabled"`
Batch int `yaml:"batch"`
@ -40,6 +52,7 @@ type BackendSection struct {
Cluster map[string]string `yaml:"cluster"`
ClusterList map[string]*ClusterNode `json:"clusterList"`
Influxdb InfluxdbSection `yaml:"influxdb"`
OpenTsdb OpenTsdbSection `yaml:"opentsdb"`
}
const DefaultSendQueueMaxSize = 102400 //10.24w
@ -57,10 +70,12 @@ var (
TsdbQueues = make(map[string]*list.SafeListLimited)
JudgeQueues = cache.SafeJudgeQueue{}
InfluxdbQueue *list.SafeListLimited
OpenTsdbQueue *list.SafeListLimited
// 连接池 node_address -> connection_pool
TsdbConnPools *pools.ConnPools
JudgeConnPools *pools.ConnPools
TsdbConnPools *pools.ConnPools
JudgeConnPools *pools.ConnPools
OpenTsdbConnPoolHelper *pools.OpenTsdbConnPoolHelper
connTimeout int32
callTimeout int32
@ -97,6 +112,9 @@ func initConnPools() {
JudgeConnPools = pools.NewConnPools(
Config.MaxConns, Config.MaxIdle, Config.ConnTimeout, Config.CallTimeout, GetJudges(),
)
if Config.OpenTsdb.Enabled {
OpenTsdbConnPoolHelper = pools.NewOpenTsdbConnPoolHelper(Config.OpenTsdb.Address, Config.OpenTsdb.MaxConns, Config.OpenTsdb.MaxIdle, Config.OpenTsdb.ConnTimeout, Config.OpenTsdb.CallTimeout)
}
}
func initSendQueues() {
@ -115,6 +133,10 @@ func initSendQueues() {
if Config.Influxdb.Enabled {
InfluxdbQueue = list.NewSafeListLimited(DefaultSendQueueMaxSize)
}
if Config.OpenTsdb.Enabled {
OpenTsdbQueue = list.NewSafeListLimited(DefaultSendQueueMaxSize)
}
}
func GetJudges() []string {

View File

@ -1,6 +1,7 @@
package backend
import (
"bytes"
"time"
"github.com/didi/nightingale/src/dataobj"
@ -42,6 +43,11 @@ func startSendTasks() {
influxdbConcurrent = 1
}
openTsdbConcurrent := Config.OpenTsdb.WorkerNum
if openTsdbConcurrent < 1 {
openTsdbConcurrent = 1
}
if Config.Enabled {
for node, item := range Config.ClusterList {
for _, addr := range item.Addrs {
@ -62,6 +68,12 @@ func startSendTasks() {
go send2InfluxdbTask(influxdbConcurrent)
}
if Config.OpenTsdb.Enabled {
go send2OpenTsdbTask(openTsdbConcurrent)
}
}
func Send2TsdbTask(Q *list.SafeListLimited, node, addr string, concurrent int) {
@ -407,3 +419,81 @@ func send2InfluxdbTask(concurrent int) {
}(addr, influxdbItems, count)
}
}
// 将原始数据入到tsdb发送缓存队列
func Push2OpenTsdbSendQueue(items []*dataobj.MetricValue) {
errCnt := 0
for _, item := range items {
tsdbItem := convert2OpenTsdbItem(item)
isSuccess := OpenTsdbQueue.PushFront(tsdbItem)
if !isSuccess {
errCnt += 1
}
}
stats.Counter.Set("opentsdb.queue.err", errCnt)
}
func send2OpenTsdbTask(concurrent int) {
batch := Config.OpenTsdb.Batch // 一次发送,最多batch条数据
retry := Config.OpenTsdb.MaxRetry
addr := Config.OpenTsdb.Address
sema := semaphore.NewSemaphore(concurrent)
for {
items := OpenTsdbQueue.PopBackBy(batch)
count := len(items)
if count == 0 {
time.Sleep(DefaultSendTaskSleepInterval)
continue
}
var openTsdbBuffer bytes.Buffer
for i := 0; i < count; i++ {
tsdbItem := items[i].(*dataobj.OpenTsdbItem)
openTsdbBuffer.WriteString(tsdbItem.OpenTsdbString())
openTsdbBuffer.WriteString("\n")
stats.Counter.Set("points.out.opentsdb", 1)
logger.Debug("send to opentsdb: ", tsdbItem)
}
// 同步Call + 有限并发 进行发送
sema.Acquire()
go func(addr string, openTsdbBuffer bytes.Buffer, count int) {
defer sema.Release()
var err error
sendOk := false
for i := 0; i < retry; i++ {
err = OpenTsdbConnPoolHelper.Send(openTsdbBuffer.Bytes())
if err == nil {
sendOk = true
break
}
logger.Warningf("send opentsdb %s fail: %v", addr, err)
time.Sleep(100 * time.Millisecond)
}
if !sendOk {
stats.Counter.Set("points.out.opentsdb.err", count)
for _, item := range items {
logger.Errorf("send %v to opentsdb %s fail: %v", item, addr, err)
}
} else {
logger.Debugf("send to opentsdb %s ok", addr)
}
}(addr, openTsdbBuffer, count)
}
}
func convert2OpenTsdbItem(d *dataobj.MetricValue) *dataobj.OpenTsdbItem {
t := dataobj.OpenTsdbItem{Tags: make(map[string]string)}
for k, v := range d.TagsMap {
t.Tags[k] = v
}
t.Tags["endpoint"] = d.Endpoint
t.Metric = d.Metric
t.Timestamp = d.Timestamp
t.Value = d.Value
return &t
}

View File

@ -94,7 +94,7 @@ func Parse(conf string) error {
})
viper.SetDefault("backend.influxdb", map[string]interface{}{
"enabled": true,
"enabled": false,
"batch": 200, //每次拉取文件的个数
"maxRetry": 3, //重试次数
"workerNum": 32,
@ -102,6 +102,17 @@ func Parse(conf string) error {
"timeout": 3000, //访问超时时间,单位毫秒
})
viper.SetDefault("backend.opentsdb", map[string]interface{}{
"enabled": false,
"batch": 200, //每次拉取文件的个数
"maxRetry": 3, //重试次数
"workerNum": 32,
"maxConns": 2000, //查询和推送数据的并发个数
"maxIdle": 32, //建立的连接池的最大空闲数
"connTimeout": 1000, //链接超时时间,单位毫秒
"callTimeout": 3000, //访问超时时间,单位毫秒
})
err = viper.Unmarshal(&Config)
if err != nil {
return fmt.Errorf("cannot read yml[%s]: %v", conf, err)

View File

@ -48,6 +48,10 @@ func (t *Transfer) Push(args []*dataobj.MetricValue, reply *dataobj.TransferResp
backend.Push2InfluxdbSendQueue(items)
}
if backend.Config.OpenTsdb.Enabled {
backend.Push2OpenTsdbSendQueue(items)
}
if reply.Invalid == 0 {
reply.Msg = "ok"
}

View File

@ -0,0 +1,117 @@
// Copyright 2017 Xiaomi, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pools
import (
"fmt"
"net"
"time"
connp "github.com/toolkits/pkg/pool"
)
type OpenTsdbClient struct {
cli *struct{ net.Conn }
name string
}
func (t OpenTsdbClient) Name() string {
return t.name
}
func (t OpenTsdbClient) Closed() bool {
return t.cli.Conn == nil
}
func (t OpenTsdbClient) Close() error {
if t.cli != nil {
err := t.cli.Close()
t.cli.Conn = nil
return err
}
return nil
}
func newOpenTsdbConnPool(address string, maxConns int, maxIdle int, connTimeout int) *connp.ConnPool {
pool := connp.NewConnPool("opentsdb", address, maxConns, maxIdle)
pool.New = func(name string) (connp.NConn, error) {
_, err := net.ResolveTCPAddr("tcp", address)
if err != nil {
return nil, err
}
conn, err := net.DialTimeout("tcp", address, time.Duration(connTimeout)*time.Millisecond)
if err != nil {
return nil, err
}
return OpenTsdbClient{
cli: &struct{ net.Conn }{conn},
name: name,
}, nil
}
return pool
}
type OpenTsdbConnPoolHelper struct {
p *connp.ConnPool
maxConns int
maxIdle int
connTimeout int
callTimeout int
address string
}
func NewOpenTsdbConnPoolHelper(address string, maxConns, maxIdle, connTimeout, callTimeout int) *OpenTsdbConnPoolHelper {
return &OpenTsdbConnPoolHelper{
p: newOpenTsdbConnPool(address, maxConns, maxIdle, connTimeout),
maxConns: maxConns,
maxIdle: maxIdle,
connTimeout: connTimeout,
callTimeout: callTimeout,
address: address,
}
}
func (t *OpenTsdbConnPoolHelper) Send(data []byte) (err error) {
conn, err := t.p.Fetch()
if err != nil {
return fmt.Errorf("get connection fail: err %v. proc: %s", err, t.p.Proc())
}
cli := conn.(OpenTsdbClient).cli
done := make(chan error, 1)
go func() {
_, err = cli.Write(data)
done <- err
}()
select {
case <-time.After(time.Duration(t.callTimeout) * time.Millisecond):
t.p.ForceClose(conn)
return fmt.Errorf("%s, call timeout", t.address)
case err = <-done:
if err != nil {
t.p.ForceClose(conn)
err = fmt.Errorf("%s, call failed, err %v. proc: %s", t.address, err, t.p.Proc())
} else {
t.p.Release(conn)
}
return err
}
}