scrape semple /metrics

This commit is contained in:
Ulric Qin 2022-04-28 22:19:40 +08:00
parent 17b16a7589
commit 3699e1b4a5
5 changed files with 233 additions and 1 deletions

View File

@ -12,6 +12,8 @@ urls = [
# username = ""
# password = ""
headers = ["X-From", "categraf"]
# # interval = global.interval * interval_times
# interval_times = 1

2
go.mod
View File

@ -14,7 +14,9 @@ require (
github.com/jmoiron/sqlx v1.3.5
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
github.com/koding/multiconfig v0.0.0-20171124222453-69c27309b2d7
github.com/matttproud/golang_protobuf_extensions v1.0.1
github.com/prometheus/client_golang v1.12.1
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.32.1
github.com/prometheus/prometheus v2.5.0+incompatible
github.com/shirou/gopsutil/v3 v3.22.3

2
go.sum
View File

@ -224,6 +224,7 @@ github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg=
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
@ -267,6 +268,7 @@ github.com/prometheus/client_golang v1.12.1/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrb
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.2.0 h1:uq5h0d+GuxiXLJLNABMgp2qUWDPiLvgCzz2dUR+/W/M=
github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=

View File

@ -2,13 +2,17 @@ package prometheus
import (
"errors"
"io"
"log"
"net/http"
"net/url"
"sync"
"sync/atomic"
"time"
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/parser/prometheus"
"flashcat.cloud/categraf/pkg/filter"
"flashcat.cloud/categraf/pkg/tls"
"flashcat.cloud/categraf/types"
@ -16,6 +20,7 @@ import (
)
const inputName = "prometheus"
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,*/*;q=0.1`
type Instance struct {
URLs []string `toml:"urls"`
@ -26,6 +31,7 @@ type Instance struct {
Password string `toml:"password"`
Timeout config.Duration `toml:"timeout"`
IgnoreMetrics []string `toml:"ignore_metrics"`
Headers []string `toml:"headers"`
ignoreMetricsFilter filter.Filter
tls.ClientConfig
@ -135,5 +141,83 @@ func (p *Prometheus) gatherOnce(slist *list.SafeList, ins *Instance) {
}
}
// TODO
urlwg := new(sync.WaitGroup)
defer urlwg.Wait()
for i := 0; i < len(ins.URLs); i++ {
urlwg.Add(1)
go p.gatherUrl(slist, ins, ins.URLs[i], urlwg)
}
}
func (p *Prometheus) gatherUrl(slist *list.SafeList, ins *Instance, uri string, urlwg *sync.WaitGroup) {
defer urlwg.Done()
u, err := url.Parse(uri)
if err != nil {
log.Println("E! failed to parse url:", uri, "error:", err)
return
}
if u.Path == "" {
u.Path = "/metrics"
}
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
log.Println("E! failed to new request for url:", u.String(), "error:", err)
return
}
ins.setHeaders(req)
labels := map[string]string{"url": u.String()}
for key, val := range ins.Labels {
labels[key] = val
}
res, err := ins.client.Do(req)
if err != nil {
slist.PushFront(inputs.NewSample("up", 0, labels))
log.Println("E! failed to query url:", u.String(), "error:", err)
return
}
if res.StatusCode != http.StatusOK {
slist.PushFront(inputs.NewSample("up", 0, labels))
log.Println("E! failed to query url:", u.String(), "status code:", res.StatusCode)
return
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
slist.PushFront(inputs.NewSample("up", 0, labels))
log.Println("E! failed to read response body, error:", err)
return
}
slist.PushFront(inputs.NewSample("up", 1, labels))
parser := prometheus.NewParser(labels, res.Header, ins.ignoreMetricsFilter)
if err = parser.Parse(body, slist); err != nil {
log.Println("E! failed to parse response body, url:", u.String(), "error:", err)
}
}
func (ins *Instance) setHeaders(req *http.Request) {
if ins.Username != "" && ins.Password != "" {
req.SetBasicAuth(ins.Username, ins.Password)
}
if ins.BearerToken != "" {
req.Header.Set("Authorization", "Bearer "+ins.BearerToken)
}
req.Header.Set("Accept", acceptHeader)
for i := 0; i < len(ins.Headers); i += 2 {
req.Header.Set(ins.Headers[i], ins.Headers[i+1])
}
}

142
parser/prometheus/parser.go Normal file
View File

@ -0,0 +1,142 @@
package prometheus
import (
"bufio"
"bytes"
"fmt"
"io"
"math"
"mime"
"net/http"
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/pkg/filter"
"github.com/matttproud/golang_protobuf_extensions/pbutil"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/toolkits/pkg/container/list"
)
type Parser struct {
DefaultTags map[string]string
Header http.Header
IgnoreMetricsFilter filter.Filter
}
func NewParser(defaultTags map[string]string, header http.Header, ignoreMetricsFilter filter.Filter) *Parser {
return &Parser{
DefaultTags: defaultTags,
Header: header,
IgnoreMetricsFilter: ignoreMetricsFilter,
}
}
func (p *Parser) Parse(buf []byte, slist *list.SafeList) error {
var parser expfmt.TextParser
// parse even if the buffer begins with a newline
buf = bytes.TrimPrefix(buf, []byte("\n"))
// Read raw data
buffer := bytes.NewBuffer(buf)
reader := bufio.NewReader(buffer)
// Prepare output
metricFamilies := make(map[string]*dto.MetricFamily)
mediatype, params, err := mime.ParseMediaType(p.Header.Get("Content-Type"))
if err == nil && mediatype == "application/vnd.google.protobuf" &&
params["encoding"] == "delimited" &&
params["proto"] == "io.prometheus.client.MetricFamily" {
for {
mf := &dto.MetricFamily{}
if _, ierr := pbutil.ReadDelimited(reader, mf); ierr != nil {
if ierr == io.EOF {
break
}
return fmt.Errorf("reading metric family protocol buffer failed: %s", ierr)
}
metricFamilies[mf.GetName()] = mf
}
} else {
metricFamilies, err = parser.TextToMetricFamilies(reader)
if err != nil {
return fmt.Errorf("reading text format failed: %s", err)
}
}
// read metrics
for metricName, mf := range metricFamilies {
if p.IgnoreMetricsFilter.Match(metricName) {
continue
}
for _, m := range mf.Metric {
// reading tags
tags := makeLabels(m, p.DefaultTags)
if mf.GetType() == dto.MetricType_SUMMARY {
p.handleSummary(m, tags, metricName, slist)
} else if mf.GetType() == dto.MetricType_HISTOGRAM {
p.handleHistogram(m, tags, metricName, slist)
} else {
fields := getNameAndValue(m, metricName)
inputs.PushSamples(slist, fields, tags)
}
}
}
return nil
}
func (p *Parser) handleSummary(m *dto.Metric, tags map[string]string, metricName string, slist *list.SafeList) {
slist.PushFront(inputs.NewSample(metricName+"_count", float64(m.GetSummary().GetSampleCount()), tags))
slist.PushFront(inputs.NewSample(metricName+"_sum", m.GetSummary().GetSampleSum(), tags))
for _, q := range m.GetSummary().Quantile {
slist.PushFront(inputs.NewSample(metricName, q.GetValue(), tags, map[string]string{"quantile": fmt.Sprint(q.GetQuantile())}))
}
}
func (p *Parser) handleHistogram(m *dto.Metric, tags map[string]string, metricName string, slist *list.SafeList) {
slist.PushFront(inputs.NewSample(metricName+"_count", float64(m.GetHistogram().GetSampleCount()), tags))
slist.PushFront(inputs.NewSample(metricName+"_sum", m.GetHistogram().GetSampleSum(), tags))
for _, b := range m.GetHistogram().Bucket {
le := fmt.Sprint(b.GetUpperBound())
value := float64(b.GetCumulativeCount())
slist.PushFront(inputs.NewSample(metricName+"_bucket", value, tags, map[string]string{"le": le}))
}
}
// Get labels from metric
func makeLabels(m *dto.Metric, defaultTags map[string]string) map[string]string {
result := map[string]string{}
for _, lp := range m.Label {
result[lp.GetName()] = lp.GetValue()
}
for key, value := range defaultTags {
result[key] = value
}
return result
}
// Get name and value from metric
func getNameAndValue(m *dto.Metric, metricName string) map[string]interface{} {
fields := make(map[string]interface{})
if m.Gauge != nil {
if !math.IsNaN(m.GetGauge().GetValue()) {
fields[metricName] = m.GetGauge().GetValue()
}
} else if m.Counter != nil {
if !math.IsNaN(m.GetCounter().GetValue()) {
fields[metricName] = m.GetCounter().GetValue()
}
} else if m.Untyped != nil {
if !math.IsNaN(m.GetUntyped().GetValue()) {
fields[metricName] = m.GetUntyped().GetValue()
}
}
return fields
}