feat: add write_relabel action before n9e remote writing to multi tsdb (#1098)
* add write relabel config * change parse relabel Regex field time when config loaded
This commit is contained in:
parent
fd93fd7182
commit
c1e92b56b9
|
@ -190,6 +190,12 @@ KeepAlive = 30000
|
|||
MaxConnsPerHost = 0
|
||||
MaxIdleConns = 100
|
||||
MaxIdleConnsPerHost = 100
|
||||
[[Writers.WriteRelabels]]
|
||||
Action = "replace"
|
||||
SourceLabels = ["__address__"]
|
||||
Regex = "([^:]+)(?::\\d+)?"
|
||||
Replacement = "$1:80"
|
||||
TargetLabel = "__address__"
|
||||
|
||||
# [[Writers]]
|
||||
# Url = "http://127.0.0.1:7201/api/v1/prom/remote/write"
|
||||
|
|
42
go.mod
42
go.mod
|
@ -33,54 +33,12 @@ require (
|
|||
)
|
||||
|
||||
require (
|
||||
github.com/Azure/go-ntlmssp v0.0.0-20200615164410-66371956d46c // indirect
|
||||
github.com/BurntSushi/toml v0.3.1 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.1.2 // indirect
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d // indirect
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
||||
github.com/fatih/camelcase v1.0.0 // indirect
|
||||
github.com/fatih/structs v1.1.0 // indirect
|
||||
github.com/gin-contrib/sse v0.1.0 // indirect
|
||||
github.com/go-asn1-ber/asn1-ber v1.5.1 // indirect
|
||||
github.com/go-playground/locales v0.13.0 // indirect
|
||||
github.com/go-playground/universal-translator v0.17.0 // indirect
|
||||
github.com/go-playground/validator/v10 v10.4.1 // indirect
|
||||
github.com/go-sql-driver/mysql v1.6.0 // indirect
|
||||
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
|
||||
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
|
||||
github.com/jackc/pgconn v1.10.0 // indirect
|
||||
github.com/jackc/pgio v1.0.0 // indirect
|
||||
github.com/jackc/pgpassfile v1.0.0 // indirect
|
||||
github.com/jackc/pgproto3/v2 v2.1.1 // indirect
|
||||
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
|
||||
github.com/jackc/pgtype v1.8.1 // indirect
|
||||
github.com/jackc/pgx/v4 v4.13.0 // indirect
|
||||
github.com/jinzhu/inflection v1.0.0 // indirect
|
||||
github.com/jinzhu/now v1.1.2 // indirect
|
||||
github.com/leodido/go-urn v1.2.0 // indirect
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||
github.com/pquerna/cachecontrol v0.1.0 // indirect
|
||||
github.com/prometheus/client_model v0.2.0 // indirect
|
||||
github.com/prometheus/procfs v0.7.3 // indirect
|
||||
github.com/russross/blackfriday/v2 v2.0.1 // indirect
|
||||
github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect
|
||||
github.com/spaolacci/murmur3 v1.1.0 // indirect
|
||||
github.com/tidwall/match v1.1.1 // indirect
|
||||
github.com/tidwall/pretty v1.2.0 // indirect
|
||||
github.com/ugorji/go/codec v1.1.7 // indirect
|
||||
go.uber.org/automaxprocs v1.4.0 // indirect
|
||||
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 // indirect
|
||||
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d // indirect
|
||||
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect
|
||||
golang.org/x/text v0.3.7 // indirect
|
||||
google.golang.org/appengine v1.6.6 // indirect
|
||||
google.golang.org/genproto v0.0.0-20211007155348-82e027067bd4 // indirect
|
||||
google.golang.org/grpc v1.41.0 // indirect
|
||||
google.golang.org/protobuf v1.27.1 // indirect
|
||||
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
|
||||
gopkg.in/square/go-jose.v2 v2.6.0 // indirect
|
||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||
)
|
||||
|
|
4
go.sum
4
go.sum
|
@ -50,6 +50,7 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce
|
|||
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
||||
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
|
||||
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
|
||||
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
|
||||
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
|
||||
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
|
||||
|
@ -201,6 +202,7 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ
|
|||
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
|
||||
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
|
||||
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
|
||||
github.com/jackc/chunkreader v1.0.0 h1:4s39bBR8ByfqH+DKm8rQA3E1LHZWB9XWcrz8fqaZbe0=
|
||||
github.com/jackc/chunkreader v1.0.0/go.mod h1:RT6O25fNZIuasFJRyZ4R/Y2BbhasbmZXF9QQ7T3kePo=
|
||||
github.com/jackc/chunkreader/v2 v2.0.0/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk=
|
||||
github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8=
|
||||
|
@ -221,6 +223,7 @@ github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65 h1:DadwsjnMwFjfWc9y5W
|
|||
github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65/go.mod h1:5R2h2EEX+qri8jOWMbJCtaPWkrrNc7OHwsp2TCqp7ak=
|
||||
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
|
||||
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
|
||||
github.com/jackc/pgproto3 v1.1.0 h1:FYYE4yRw+AgI8wXIinMlNjBbp/UitDJwfj5LqqewP1A=
|
||||
github.com/jackc/pgproto3 v1.1.0/go.mod h1:eR5FA3leWg7p9aeAqi37XOTgTIbkABlvcPB3E5rlc78=
|
||||
github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190420180111-c116219b62db/go.mod h1:bhq50y+xrl9n5mRYyCBFKkpRVTLYJVWeCc+mEAI3yXA=
|
||||
github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190609003834-432c2951c711/go.mod h1:uH0AWtUmuShn0bcesswc4aBTWGvw0cAxIJp+6OB//Wg=
|
||||
|
@ -383,6 +386,7 @@ github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
|
|||
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
|
||||
github.com/toolkits/pkg v1.2.9 h1:zGlrJDl+2sMBoxBRIoMtAwvKmW5wctuji2+qHCecMKk=
|
||||
github.com/toolkits/pkg v1.2.9/go.mod h1:ZUsQAOoaR99PSbes+RXSirvwmtd6+XIUvizCmrjfUYc=
|
||||
github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
|
||||
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
|
||||
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
|
||||
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
|
||||
|
|
|
@ -0,0 +1,198 @@
|
|||
package models
|
||||
|
||||
import (
|
||||
"crypto/md5"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
)
|
||||
|
||||
const (
|
||||
Replace Action = "replace"
|
||||
Keep Action = "keep"
|
||||
Drop Action = "drop"
|
||||
HashMod Action = "hashmod"
|
||||
LabelMap Action = "labelmap"
|
||||
LabelDrop Action = "labeldrop"
|
||||
LabelKeep Action = "labelkeep"
|
||||
Lowercase Action = "lowercase"
|
||||
Uppercase Action = "uppercase"
|
||||
)
|
||||
|
||||
type Action string
|
||||
|
||||
type Regexp struct {
|
||||
*regexp.Regexp
|
||||
}
|
||||
|
||||
type RelabelConfig struct {
|
||||
SourceLabels model.LabelNames
|
||||
Separator string
|
||||
Regex interface{}
|
||||
Modulus uint64
|
||||
TargetLabel string
|
||||
Replacement string
|
||||
Action Action
|
||||
}
|
||||
|
||||
func Process(labels []*prompb.Label, cfgs ...*RelabelConfig) []*prompb.Label {
|
||||
for _, cfg := range cfgs {
|
||||
labels = relabel(labels, cfg)
|
||||
if labels == nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return labels
|
||||
}
|
||||
|
||||
func getValue(ls []*prompb.Label, name model.LabelName) string {
|
||||
for _, l := range ls {
|
||||
if l.Name == string(name) {
|
||||
return l.Value
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
type LabelBuilder struct {
|
||||
LabelSet map[string]string
|
||||
}
|
||||
|
||||
func newBuilder(ls []*prompb.Label) *LabelBuilder {
|
||||
lset := make(map[string]string, len(ls))
|
||||
for _, l := range ls {
|
||||
lset[l.Name] = l.Value
|
||||
}
|
||||
return &LabelBuilder{LabelSet: lset}
|
||||
}
|
||||
|
||||
func (l *LabelBuilder) set(k, v string) *LabelBuilder {
|
||||
if v == "" {
|
||||
return l.del(k)
|
||||
}
|
||||
|
||||
l.LabelSet[k] = v
|
||||
return l
|
||||
}
|
||||
|
||||
func (l *LabelBuilder) del(ns ...string) *LabelBuilder {
|
||||
for _, n := range ns {
|
||||
delete(l.LabelSet, n)
|
||||
}
|
||||
return l
|
||||
}
|
||||
|
||||
func (l *LabelBuilder) labels() []*prompb.Label {
|
||||
ls := make([]*prompb.Label, 0, len(l.LabelSet))
|
||||
if len(l.LabelSet) == 0 {
|
||||
return ls
|
||||
}
|
||||
|
||||
for k, v := range l.LabelSet {
|
||||
ls = append(ls, &prompb.Label{
|
||||
Name: k,
|
||||
Value: v,
|
||||
})
|
||||
}
|
||||
|
||||
sort.Slice(ls, func(i, j int) bool {
|
||||
return ls[i].Name > ls[j].Name
|
||||
})
|
||||
return ls
|
||||
}
|
||||
|
||||
func relabel(lset []*prompb.Label, cfg *RelabelConfig) []*prompb.Label {
|
||||
values := make([]string, 0, len(cfg.SourceLabels))
|
||||
for _, ln := range cfg.SourceLabels {
|
||||
values = append(values, getValue(lset, ln))
|
||||
}
|
||||
|
||||
regx := cfg.Regex.(Regexp)
|
||||
|
||||
val := strings.Join(values, cfg.Separator)
|
||||
lb := newBuilder(lset)
|
||||
switch cfg.Action {
|
||||
case Drop:
|
||||
if regx.MatchString(val) {
|
||||
return nil
|
||||
}
|
||||
case Keep:
|
||||
if !regx.MatchString(val) {
|
||||
return nil
|
||||
}
|
||||
case Replace:
|
||||
indexes := regx.FindStringSubmatchIndex(val)
|
||||
if indexes == nil {
|
||||
break
|
||||
}
|
||||
target := model.LabelName(regx.ExpandString([]byte{}, cfg.TargetLabel, val, indexes))
|
||||
if !target.IsValid() {
|
||||
lb.del(cfg.TargetLabel)
|
||||
break
|
||||
}
|
||||
res := regx.ExpandString([]byte{}, cfg.Replacement, val, indexes)
|
||||
if len(res) == 0 {
|
||||
lb.del(cfg.TargetLabel)
|
||||
break
|
||||
}
|
||||
lb.set(string(target), string(res))
|
||||
case Lowercase:
|
||||
lb.set(cfg.TargetLabel, strings.ToLower(val))
|
||||
case Uppercase:
|
||||
lb.set(cfg.TargetLabel, strings.ToUpper(val))
|
||||
case HashMod:
|
||||
mod := sum64(md5.Sum([]byte(val))) % cfg.Modulus
|
||||
lb.set(cfg.TargetLabel, fmt.Sprintf("%d", mod))
|
||||
case LabelMap:
|
||||
for _, l := range lset {
|
||||
if regx.MatchString(l.Name) {
|
||||
res := regx.ReplaceAllString(l.Name, cfg.Replacement)
|
||||
lb.set(res, l.Value)
|
||||
}
|
||||
}
|
||||
case LabelDrop:
|
||||
for _, l := range lset {
|
||||
if regx.MatchString(l.Name) {
|
||||
lb.del(l.Name)
|
||||
}
|
||||
}
|
||||
case LabelKeep:
|
||||
for _, l := range lset {
|
||||
if !regx.MatchString(l.Name) {
|
||||
lb.del(l.Name)
|
||||
}
|
||||
}
|
||||
default:
|
||||
panic(fmt.Errorf("relabel: unknown relabel action type %q", cfg.Action))
|
||||
}
|
||||
|
||||
return lb.labels()
|
||||
}
|
||||
|
||||
func sum64(hash [md5.Size]byte) uint64 {
|
||||
var s uint64
|
||||
|
||||
for i, b := range hash {
|
||||
shift := uint64((md5.Size - i - 1) * 8)
|
||||
|
||||
s |= uint64(b) << shift
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func NewRegexp(s string) (Regexp, error) {
|
||||
regex, err := regexp.Compile("^(?:" + s + ")$")
|
||||
return Regexp{Regexp: regex}, err
|
||||
}
|
||||
|
||||
func MustNewRegexp(s string) Regexp {
|
||||
re, err := NewRegexp(s)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return re
|
||||
}
|
|
@ -14,6 +14,7 @@ import (
|
|||
"github.com/gin-gonic/gin"
|
||||
"github.com/koding/multiconfig"
|
||||
|
||||
"github.com/didi/nightingale/v5/src/models"
|
||||
"github.com/didi/nightingale/v5/src/notifier"
|
||||
"github.com/didi/nightingale/v5/src/pkg/httpx"
|
||||
"github.com/didi/nightingale/v5/src/pkg/logx"
|
||||
|
@ -143,6 +144,33 @@ func MustLoad(fpaths ...string) {
|
|||
C.WriterOpt.QueueCount = 100
|
||||
}
|
||||
|
||||
for _, write := range C.Writers {
|
||||
for _, relabel := range write.WriteRelabels {
|
||||
regex, ok := relabel.Regex.(string)
|
||||
if !ok {
|
||||
log.Println("Regex field must be a string")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if regex == "" {
|
||||
regex = "(.*)"
|
||||
}
|
||||
relabel.Regex = models.MustNewRegexp(regex)
|
||||
|
||||
if relabel.Separator == "" {
|
||||
relabel.Separator = ";"
|
||||
}
|
||||
|
||||
if relabel.Action == "" {
|
||||
relabel.Action = "replace"
|
||||
}
|
||||
|
||||
if relabel.Replacement == "" {
|
||||
relabel.Replacement = "$1"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Println("heartbeat.ip:", C.Heartbeat.IP)
|
||||
fmt.Printf("heartbeat.interval: %dms\n", C.Heartbeat.Interval)
|
||||
})
|
||||
|
@ -206,6 +234,8 @@ type WriterOptions struct {
|
|||
MaxIdleConnsPerHost int
|
||||
|
||||
Headers []string
|
||||
|
||||
WriteRelabels []*models.RelabelConfig
|
||||
}
|
||||
|
||||
type WriterGlobalOpt struct {
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/didi/nightingale/v5/src/models"
|
||||
"github.com/didi/nightingale/v5/src/server/config"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/golang/snappy"
|
||||
|
@ -24,11 +25,28 @@ type WriterType struct {
|
|||
Client api.Client
|
||||
}
|
||||
|
||||
func (w WriterType) writeRelabel(items []*prompb.TimeSeries) []*prompb.TimeSeries {
|
||||
ritems := make([]*prompb.TimeSeries, 0, len(items))
|
||||
for _, item := range items {
|
||||
lbls := models.Process(item.Labels, w.Opts.WriteRelabels...)
|
||||
if len(lbls) == 0 {
|
||||
continue
|
||||
}
|
||||
ritems = append(ritems, item)
|
||||
}
|
||||
return ritems
|
||||
}
|
||||
|
||||
func (w WriterType) Write(index int, items []*prompb.TimeSeries, headers ...map[string]string) {
|
||||
if len(items) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
items = w.writeRelabel(items)
|
||||
if len(items) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
promstat.ForwardDuration.WithLabelValues(config.C.ClusterName, fmt.Sprint(index)).Observe(time.Since(start).Seconds())
|
||||
|
|
Loading…
Reference in New Issue