From c1e92b56b98c0aa362e124580e581a8ff262e59e Mon Sep 17 00:00:00 2001 From: Resurgence <70791908+resurgence72@users.noreply.github.com> Date: Wed, 10 Aug 2022 16:50:52 +0800 Subject: [PATCH] feat: add write_relabel action before n9e remote writing to multi tsdb (#1098) * add write relabel config * change parse relabel Regex field time when config loaded --- etc/server.conf | 6 ++ go.mod | 42 -------- go.sum | 4 + src/models/relabel.go | 198 ++++++++++++++++++++++++++++++++++++ src/server/config/config.go | 30 ++++++ src/server/writer/writer.go | 18 ++++ 6 files changed, 256 insertions(+), 42 deletions(-) create mode 100644 src/models/relabel.go diff --git a/etc/server.conf b/etc/server.conf index 07d4c9a4..f1726ad3 100644 --- a/etc/server.conf +++ b/etc/server.conf @@ -190,6 +190,12 @@ KeepAlive = 30000 MaxConnsPerHost = 0 MaxIdleConns = 100 MaxIdleConnsPerHost = 100 +[[Writers.WriteRelabels]] +Action = "replace" +SourceLabels = ["__address__"] +Regex = "([^:]+)(?::\\d+)?" +Replacement = "$1:80" +TargetLabel = "__address__" # [[Writers]] # Url = "http://127.0.0.1:7201/api/v1/prom/remote/write" diff --git a/go.mod b/go.mod index 93df18e9..204b6409 100644 --- a/go.mod +++ b/go.mod @@ -33,54 +33,12 @@ require ( ) require ( - github.com/Azure/go-ntlmssp v0.0.0-20200615164410-66371956d46c // indirect - github.com/BurntSushi/toml v0.3.1 // indirect - github.com/beorn7/perks v1.0.1 // indirect - github.com/cespare/xxhash/v2 v2.1.2 // indirect - github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d // indirect - github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/fatih/camelcase v1.0.0 // indirect github.com/fatih/structs v1.1.0 // indirect - github.com/gin-contrib/sse v0.1.0 // indirect - github.com/go-asn1-ber/asn1-ber v1.5.1 // indirect - github.com/go-playground/locales v0.13.0 // indirect - github.com/go-playground/universal-translator v0.17.0 // indirect - github.com/go-playground/validator/v10 v10.4.1 // indirect - github.com/go-sql-driver/mysql v1.6.0 // indirect - github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect - github.com/jackc/chunkreader/v2 v2.0.1 // indirect - github.com/jackc/pgconn v1.10.0 // indirect - github.com/jackc/pgio v1.0.0 // indirect - github.com/jackc/pgpassfile v1.0.0 // indirect - github.com/jackc/pgproto3/v2 v2.1.1 // indirect - github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect - github.com/jackc/pgtype v1.8.1 // indirect - github.com/jackc/pgx/v4 v4.13.0 // indirect - github.com/jinzhu/inflection v1.0.0 // indirect - github.com/jinzhu/now v1.1.2 // indirect - github.com/leodido/go-urn v1.2.0 // indirect - github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect - github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect - github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pquerna/cachecontrol v0.1.0 // indirect - github.com/prometheus/client_model v0.2.0 // indirect - github.com/prometheus/procfs v0.7.3 // indirect - github.com/russross/blackfriday/v2 v2.0.1 // indirect - github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect - github.com/spaolacci/murmur3 v1.1.0 // indirect - github.com/tidwall/match v1.1.1 // indirect - github.com/tidwall/pretty v1.2.0 // indirect - github.com/ugorji/go/codec v1.1.7 // indirect - go.uber.org/automaxprocs v1.4.0 // indirect - golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 // indirect golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d // indirect - golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect - golang.org/x/text v0.3.7 // indirect - google.golang.org/appengine v1.6.6 // indirect google.golang.org/genproto v0.0.0-20211007155348-82e027067bd4 // indirect google.golang.org/grpc v1.41.0 // indirect - google.golang.org/protobuf v1.27.1 // indirect gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect gopkg.in/square/go-jose.v2 v2.6.0 // indirect - gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index 401103a0..60bba1f2 100644 --- a/go.sum +++ b/go.sum @@ -50,6 +50,7 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= @@ -201,6 +202,7 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= +github.com/jackc/chunkreader v1.0.0 h1:4s39bBR8ByfqH+DKm8rQA3E1LHZWB9XWcrz8fqaZbe0= github.com/jackc/chunkreader v1.0.0/go.mod h1:RT6O25fNZIuasFJRyZ4R/Y2BbhasbmZXF9QQ7T3kePo= github.com/jackc/chunkreader/v2 v2.0.0/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8= @@ -221,6 +223,7 @@ github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65 h1:DadwsjnMwFjfWc9y5W github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65/go.mod h1:5R2h2EEX+qri8jOWMbJCtaPWkrrNc7OHwsp2TCqp7ak= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgproto3 v1.1.0 h1:FYYE4yRw+AgI8wXIinMlNjBbp/UitDJwfj5LqqewP1A= github.com/jackc/pgproto3 v1.1.0/go.mod h1:eR5FA3leWg7p9aeAqi37XOTgTIbkABlvcPB3E5rlc78= github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190420180111-c116219b62db/go.mod h1:bhq50y+xrl9n5mRYyCBFKkpRVTLYJVWeCc+mEAI3yXA= github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190609003834-432c2951c711/go.mod h1:uH0AWtUmuShn0bcesswc4aBTWGvw0cAxIJp+6OB//Wg= @@ -383,6 +386,7 @@ github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/toolkits/pkg v1.2.9 h1:zGlrJDl+2sMBoxBRIoMtAwvKmW5wctuji2+qHCecMKk= github.com/toolkits/pkg v1.2.9/go.mod h1:ZUsQAOoaR99PSbes+RXSirvwmtd6+XIUvizCmrjfUYc= +github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo= github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs= github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= diff --git a/src/models/relabel.go b/src/models/relabel.go new file mode 100644 index 00000000..59e71675 --- /dev/null +++ b/src/models/relabel.go @@ -0,0 +1,198 @@ +package models + +import ( + "crypto/md5" + "fmt" + "regexp" + "sort" + "strings" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/prompb" +) + +const ( + Replace Action = "replace" + Keep Action = "keep" + Drop Action = "drop" + HashMod Action = "hashmod" + LabelMap Action = "labelmap" + LabelDrop Action = "labeldrop" + LabelKeep Action = "labelkeep" + Lowercase Action = "lowercase" + Uppercase Action = "uppercase" +) + +type Action string + +type Regexp struct { + *regexp.Regexp +} + +type RelabelConfig struct { + SourceLabels model.LabelNames + Separator string + Regex interface{} + Modulus uint64 + TargetLabel string + Replacement string + Action Action +} + +func Process(labels []*prompb.Label, cfgs ...*RelabelConfig) []*prompb.Label { + for _, cfg := range cfgs { + labels = relabel(labels, cfg) + if labels == nil { + return nil + } + } + return labels +} + +func getValue(ls []*prompb.Label, name model.LabelName) string { + for _, l := range ls { + if l.Name == string(name) { + return l.Value + } + } + return "" +} + +type LabelBuilder struct { + LabelSet map[string]string +} + +func newBuilder(ls []*prompb.Label) *LabelBuilder { + lset := make(map[string]string, len(ls)) + for _, l := range ls { + lset[l.Name] = l.Value + } + return &LabelBuilder{LabelSet: lset} +} + +func (l *LabelBuilder) set(k, v string) *LabelBuilder { + if v == "" { + return l.del(k) + } + + l.LabelSet[k] = v + return l +} + +func (l *LabelBuilder) del(ns ...string) *LabelBuilder { + for _, n := range ns { + delete(l.LabelSet, n) + } + return l +} + +func (l *LabelBuilder) labels() []*prompb.Label { + ls := make([]*prompb.Label, 0, len(l.LabelSet)) + if len(l.LabelSet) == 0 { + return ls + } + + for k, v := range l.LabelSet { + ls = append(ls, &prompb.Label{ + Name: k, + Value: v, + }) + } + + sort.Slice(ls, func(i, j int) bool { + return ls[i].Name > ls[j].Name + }) + return ls +} + +func relabel(lset []*prompb.Label, cfg *RelabelConfig) []*prompb.Label { + values := make([]string, 0, len(cfg.SourceLabels)) + for _, ln := range cfg.SourceLabels { + values = append(values, getValue(lset, ln)) + } + + regx := cfg.Regex.(Regexp) + + val := strings.Join(values, cfg.Separator) + lb := newBuilder(lset) + switch cfg.Action { + case Drop: + if regx.MatchString(val) { + return nil + } + case Keep: + if !regx.MatchString(val) { + return nil + } + case Replace: + indexes := regx.FindStringSubmatchIndex(val) + if indexes == nil { + break + } + target := model.LabelName(regx.ExpandString([]byte{}, cfg.TargetLabel, val, indexes)) + if !target.IsValid() { + lb.del(cfg.TargetLabel) + break + } + res := regx.ExpandString([]byte{}, cfg.Replacement, val, indexes) + if len(res) == 0 { + lb.del(cfg.TargetLabel) + break + } + lb.set(string(target), string(res)) + case Lowercase: + lb.set(cfg.TargetLabel, strings.ToLower(val)) + case Uppercase: + lb.set(cfg.TargetLabel, strings.ToUpper(val)) + case HashMod: + mod := sum64(md5.Sum([]byte(val))) % cfg.Modulus + lb.set(cfg.TargetLabel, fmt.Sprintf("%d", mod)) + case LabelMap: + for _, l := range lset { + if regx.MatchString(l.Name) { + res := regx.ReplaceAllString(l.Name, cfg.Replacement) + lb.set(res, l.Value) + } + } + case LabelDrop: + for _, l := range lset { + if regx.MatchString(l.Name) { + lb.del(l.Name) + } + } + case LabelKeep: + for _, l := range lset { + if !regx.MatchString(l.Name) { + lb.del(l.Name) + } + } + default: + panic(fmt.Errorf("relabel: unknown relabel action type %q", cfg.Action)) + } + + return lb.labels() +} + +func sum64(hash [md5.Size]byte) uint64 { + var s uint64 + + for i, b := range hash { + shift := uint64((md5.Size - i - 1) * 8) + + s |= uint64(b) << shift + } + return s +} + +func NewRegexp(s string) (Regexp, error) { + regex, err := regexp.Compile("^(?:" + s + ")$") + return Regexp{Regexp: regex}, err +} + +func MustNewRegexp(s string) Regexp { + re, err := NewRegexp(s) + if err != nil { + panic(err) + } + return re +} diff --git a/src/server/config/config.go b/src/server/config/config.go index 2cf280d8..fd88504a 100644 --- a/src/server/config/config.go +++ b/src/server/config/config.go @@ -14,6 +14,7 @@ import ( "github.com/gin-gonic/gin" "github.com/koding/multiconfig" + "github.com/didi/nightingale/v5/src/models" "github.com/didi/nightingale/v5/src/notifier" "github.com/didi/nightingale/v5/src/pkg/httpx" "github.com/didi/nightingale/v5/src/pkg/logx" @@ -143,6 +144,33 @@ func MustLoad(fpaths ...string) { C.WriterOpt.QueueCount = 100 } + for _, write := range C.Writers { + for _, relabel := range write.WriteRelabels { + regex, ok := relabel.Regex.(string) + if !ok { + log.Println("Regex field must be a string") + os.Exit(1) + } + + if regex == "" { + regex = "(.*)" + } + relabel.Regex = models.MustNewRegexp(regex) + + if relabel.Separator == "" { + relabel.Separator = ";" + } + + if relabel.Action == "" { + relabel.Action = "replace" + } + + if relabel.Replacement == "" { + relabel.Replacement = "$1" + } + } + } + fmt.Println("heartbeat.ip:", C.Heartbeat.IP) fmt.Printf("heartbeat.interval: %dms\n", C.Heartbeat.Interval) }) @@ -206,6 +234,8 @@ type WriterOptions struct { MaxIdleConnsPerHost int Headers []string + + WriteRelabels []*models.RelabelConfig } type WriterGlobalOpt struct { diff --git a/src/server/writer/writer.go b/src/server/writer/writer.go index 3869ec3e..7d8b8723 100644 --- a/src/server/writer/writer.go +++ b/src/server/writer/writer.go @@ -9,6 +9,7 @@ import ( "net/http" "time" + "github.com/didi/nightingale/v5/src/models" "github.com/didi/nightingale/v5/src/server/config" "github.com/golang/protobuf/proto" "github.com/golang/snappy" @@ -24,11 +25,28 @@ type WriterType struct { Client api.Client } +func (w WriterType) writeRelabel(items []*prompb.TimeSeries) []*prompb.TimeSeries { + ritems := make([]*prompb.TimeSeries, 0, len(items)) + for _, item := range items { + lbls := models.Process(item.Labels, w.Opts.WriteRelabels...) + if len(lbls) == 0 { + continue + } + ritems = append(ritems, item) + } + return ritems +} + func (w WriterType) Write(index int, items []*prompb.TimeSeries, headers ...map[string]string) { if len(items) == 0 { return } + items = w.writeRelabel(items) + if len(items) == 0 { + return + } + start := time.Now() defer func() { promstat.ForwardDuration.WithLabelValues(config.C.ClusterName, fmt.Sprint(index)).Observe(time.Since(start).Seconds())