diff --git a/agent/agent.go b/agent/agent.go index 63e4d39..ce84ee4 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -3,7 +3,6 @@ package agent import ( "log" - "flashcat.cloud/categraf/api" "flashcat.cloud/categraf/traces" // auto registry @@ -53,7 +52,6 @@ type Agent struct { InputFilters map[string]struct{} InputReaders map[string]*InputReader TraceCollector *traces.Collector - Server *api.Server } func NewAgent(filters map[string]struct{}) *Agent { @@ -75,11 +73,6 @@ func (a *Agent) Start() { log.Println(err) } a.startPrometheusScrape() - err = a.startHttpAgent() - if err != nil { - log.Println(err) - } - log.Println("I! agent started") } @@ -92,11 +85,6 @@ func (a *Agent) Stop() { log.Println(err) } a.stopPrometheusScrape() - err = a.stopHttpAgent() - if err != nil { - log.Println(err) - } - log.Println("I! agent stopped") } diff --git a/agent/http_agent.go b/agent/http_agent.go deleted file mode 100644 index 3ef500d..0000000 --- a/agent/http_agent.go +++ /dev/null @@ -1,44 +0,0 @@ -package agent - -import ( - "context" - "log" - - "flashcat.cloud/categraf/api" - "flashcat.cloud/categraf/config" -) - -func (a *Agent) startHttpAgent() (err error) { - if config.Config.HTTPServer == nil || !config.Config.HTTPServer.Enable { - return nil - } - - defer func() { - if err != nil { - log.Println("E! failed to start http agent:", err) - } - }() - - server := api.NewServer(api.Address(config.Config.HTTPServer.Address)) - err = server.Start(context.TODO()) - if err != nil { - return err - } - - a.Server = server - return nil -} - -func (a *Agent) stopHttpAgent() (err error) { - if config.Config.HTTPServer == nil || !config.Config.HTTPServer.Enable || a.Server == nil { - return nil - } - - defer func() { - if err != nil { - log.Println("E! failed to stop http agent:", err) - } - }() - - return a.Server.Stop(context.TODO()) -} diff --git a/api/response.go b/api/response.go deleted file mode 100644 index f2b048e..0000000 --- a/api/response.go +++ /dev/null @@ -1,6 +0,0 @@ -package api - -type Response struct { - Message string `json:"message,omitempty"` - Data interface{} `json:"data,omitempty"` -} diff --git a/api/router.go b/api/router.go deleted file mode 100644 index 5ed2277..0000000 --- a/api/router.go +++ /dev/null @@ -1,26 +0,0 @@ -package api - -import ( - "github.com/gin-gonic/gin" -) - -type Router struct { - *gin.Engine -} - -func newRouter(srv *Server) *Router { - r := &Router{ - Engine: srv.engine, - } - - r.push() - return r -} - -func (r *Router) push() { - p := push{} - g := r.Group("/api/push") - g.POST("/opentsdb", p.OpenTSDB) // 发送OpenTSDB数据 - g.POST("/openfalcon", p.falcon) // 发送OpenFalcon数据 - g.POST("/prometheus", p.remoteWrite) // 发送Prometheus数据 -} diff --git a/api/metric.go b/api/router_falcon.go similarity index 56% rename from api/metric.go rename to api/router_falcon.go index f7f8c08..3c0af75 100644 --- a/api/metric.go +++ b/api/router_falcon.go @@ -1,115 +1,21 @@ package api import ( + "encoding/json" "fmt" + "log" + "net/http" "strconv" "strings" + "time" + "flashcat.cloud/categraf/config" + "flashcat.cloud/categraf/writer" + "github.com/gin-gonic/gin" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/prompb" ) -type Metric struct { - Metric string `json:"metric"` - Timestamp int64 `json:"timestamp"` - ValueUnTyped interface{} `json:"value"` - Value float64 `json:"-"` - Tags map[string]string `json:"tags"` -} - -func (m *Metric) Clean(ts int64) error { - if m.Metric == "" { - return fmt.Errorf("metric is blank") - } - - switch v := m.ValueUnTyped.(type) { - case string: - if f, err := strconv.ParseFloat(v, 64); err == nil { - m.Value = f - } else { - return fmt.Errorf("unparseable value %v", v) - } - case float64: - m.Value = v - case uint64: - m.Value = float64(v) - case int64: - m.Value = float64(v) - case int: - m.Value = float64(v) - default: - return fmt.Errorf("unparseable value %v", v) - } - - // if timestamp bigger than 32 bits, likely in milliseconds - if m.Timestamp > 0xffffffff { - m.Timestamp /= 1000 - } - - // If the timestamp is greater than 5 minutes, the current time shall prevail - diff := m.Timestamp - ts - if diff > 300 { - m.Timestamp = ts - } - return nil -} - -func (m *Metric) ToProm() (*prompb.TimeSeries, error) { - pt := &prompb.TimeSeries{} - pt.Samples = append(pt.Samples, prompb.Sample{ - // use ms - Timestamp: m.Timestamp * 1000, - Value: m.Value, - }) - - if strings.IndexByte(m.Metric, '.') != -1 { - m.Metric = strings.ReplaceAll(m.Metric, ".", "_") - } - - if strings.IndexByte(m.Metric, '-') != -1 { - m.Metric = strings.ReplaceAll(m.Metric, "-", "_") - } - - if !model.MetricNameRE.MatchString(m.Metric) { - return nil, fmt.Errorf("invalid metric name: %s", m.Metric) - } - - pt.Labels = append(pt.Labels, prompb.Label{ - Name: model.MetricNameLabel, - Value: m.Metric, - }) - - if _, exists := m.Tags["ident"]; !exists { - // rename tag key - host, has := m.Tags["host"] - if has { - delete(m.Tags, "host") - m.Tags["ident"] = host - } - } - - for key, value := range m.Tags { - if strings.IndexByte(key, '.') != -1 { - key = strings.ReplaceAll(key, ".", "_") - } - - if strings.IndexByte(key, '-') != -1 { - key = strings.ReplaceAll(key, "-", "_") - } - - if !model.LabelNameRE.MatchString(key) { - return nil, fmt.Errorf("invalid tag name: %s", key) - } - - pt.Labels = append(pt.Labels, prompb.Label{ - Name: key, - Value: value, - }) - } - - return pt, nil -} - type FalconMetric struct { Metric string `json:"metric"` Endpoint string `json:"endpoint"` @@ -228,3 +134,86 @@ func (m *FalconMetric) ToProm() (*prompb.TimeSeries, string, error) { return pt, ident, nil } + +func openFalcon(c *gin.Context) { + var ( + err error + bytes []byte + ) + + bytes, err = readerGzipBody(c.GetHeader("Content-Encoding"), c.Request) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + + var arr []FalconMetric + if bytes[0] == '[' { + err = json.Unmarshal(bytes, &arr) + } else { + var one FalconMetric + err = json.Unmarshal(bytes, &one) + arr = []FalconMetric{one} + } + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + + var ( + fail int + succ int + msg = "data pushed to queue" + ts = time.Now().Unix() + ) + + ignoreHostname := c.GetBool("ignore_hostname") + ignoreGlobalLabels := c.GetBool("ignore_global_labels") + count := len(arr) + series := make([]prompb.TimeSeries, 0, count) + for i := 0; i < count; i++ { + if err := arr[i].Clean(ts); err != nil { + fail++ + continue + } + + pt, _, err := arr[i].ToProm() + if err != nil { + fail++ + continue + } + + tags := make(map[string]string) + for _, label := range pt.Labels { + tags[label.Name] = label.Value + } + // add global labels + if !ignoreGlobalLabels { + for k, v := range config.Config.Global.Labels { + if _, has := tags[k]; has { + continue + } + pt.Labels = append(pt.Labels, prompb.Label{Name: k, Value: v}) + } + } + // add label: agent_hostname + if _, has := tags[agentHostnameLabelKey]; !has && !ignoreHostname { + pt.Labels = append(pt.Labels, prompb.Label{Name: agentHostnameLabelKey, Value: config.Config.GetHostname()}) + } + + series = append(series, *pt) + succ++ + } + + if fail > 0 { + log.Printf("falconmetric msg process error , msg is : %s\n", string(bytes)) + } + + writer.PostTimeSeries(series) + + c.JSON(200, gin.H{ + "succ": succ, + "fail": fail, + "msg": msg, + }) +} diff --git a/api/router_func.go b/api/router_func.go new file mode 100644 index 0000000..e75a1c1 --- /dev/null +++ b/api/router_func.go @@ -0,0 +1,59 @@ +package api + +import ( + "compress/gzip" + "errors" + "io" + "io/ioutil" + "net/http" + + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" + "github.com/prometheus/prometheus/prompb" +) + +const agentHostnameLabelKey = "agent_hostname" + +func readerGzipBody(contentEncoding string, request *http.Request) (bytes []byte, err error) { + if contentEncoding == "gzip" { + var ( + r *gzip.Reader + ) + r, err = gzip.NewReader(request.Body) + if err != nil { + return nil, err + } + + defer r.Close() + bytes, err = ioutil.ReadAll(r) + } else { + defer request.Body.Close() + bytes, err = ioutil.ReadAll(request.Body) + } + if err != nil || len(bytes) == 0 { + return nil, errors.New("request parameter error") + } + + return bytes, nil +} + +// DecodeWriteRequest from an io.Reader into a prompb.WriteRequest, handling +// snappy decompression. +func DecodeWriteRequest(r io.Reader) (*prompb.WriteRequest, error) { + compressed, err := ioutil.ReadAll(r) + if err != nil { + return nil, err + } + + reqBuf, err := snappy.Decode(nil, compressed) + if err != nil { + return nil, err + } + + var req prompb.WriteRequest + if err := proto.Unmarshal(reqBuf, &req); err != nil { + return nil, err + } + + return &req, nil +} diff --git a/api/router_opentsdb.go b/api/router_opentsdb.go new file mode 100644 index 0000000..c71e7fd --- /dev/null +++ b/api/router_opentsdb.go @@ -0,0 +1,204 @@ +package api + +import ( + "encoding/json" + "fmt" + "log" + "net/http" + "strconv" + "strings" + "time" + + "flashcat.cloud/categraf/config" + "flashcat.cloud/categraf/writer" + "github.com/gin-gonic/gin" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/prompb" +) + +type Metric struct { + Metric string `json:"metric"` + Timestamp int64 `json:"timestamp"` + ValueUnTyped interface{} `json:"value"` + Value float64 `json:"-"` + Tags map[string]string `json:"tags"` +} + +func (m *Metric) Clean(ts int64) error { + if m.Metric == "" { + return fmt.Errorf("metric is blank") + } + + switch v := m.ValueUnTyped.(type) { + case string: + if f, err := strconv.ParseFloat(v, 64); err == nil { + m.Value = f + } else { + return fmt.Errorf("unparseable value %v", v) + } + case float64: + m.Value = v + case uint64: + m.Value = float64(v) + case int64: + m.Value = float64(v) + case int: + m.Value = float64(v) + default: + return fmt.Errorf("unparseable value %v", v) + } + + // if timestamp bigger than 32 bits, likely in milliseconds + if m.Timestamp > 0xffffffff { + m.Timestamp /= 1000 + } + + // If the timestamp is greater than 5 minutes, the current time shall prevail + diff := m.Timestamp - ts + if diff > 300 { + m.Timestamp = ts + } + return nil +} + +func (m *Metric) ToProm() (*prompb.TimeSeries, error) { + pt := &prompb.TimeSeries{} + pt.Samples = append(pt.Samples, prompb.Sample{ + // use ms + Timestamp: m.Timestamp * 1000, + Value: m.Value, + }) + + if strings.IndexByte(m.Metric, '.') != -1 { + m.Metric = strings.ReplaceAll(m.Metric, ".", "_") + } + + if strings.IndexByte(m.Metric, '-') != -1 { + m.Metric = strings.ReplaceAll(m.Metric, "-", "_") + } + + if !model.MetricNameRE.MatchString(m.Metric) { + return nil, fmt.Errorf("invalid metric name: %s", m.Metric) + } + + pt.Labels = append(pt.Labels, prompb.Label{ + Name: model.MetricNameLabel, + Value: m.Metric, + }) + + if _, exists := m.Tags["ident"]; !exists { + // rename tag key + host, has := m.Tags["host"] + if has { + delete(m.Tags, "host") + m.Tags["ident"] = host + } + } + + for key, value := range m.Tags { + if strings.IndexByte(key, '.') != -1 { + key = strings.ReplaceAll(key, ".", "_") + } + + if strings.IndexByte(key, '-') != -1 { + key = strings.ReplaceAll(key, "-", "_") + } + + if !model.LabelNameRE.MatchString(key) { + return nil, fmt.Errorf("invalid tag name: %s", key) + } + + pt.Labels = append(pt.Labels, prompb.Label{ + Name: key, + Value: value, + }) + } + + return pt, nil +} + +func openTSDB(c *gin.Context) { + var ( + err error + bytes []byte + ) + + bytes, err = readerGzipBody(c.GetHeader("Content-Encoding"), c.Request) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + + var list []Metric + if bytes[0] == '[' { + err = json.Unmarshal(bytes, &list) + } else { + var openTSDBMetric Metric + err = json.Unmarshal(bytes, &openTSDBMetric) + list = []Metric{openTSDBMetric} + } + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + + var ( + fail int + succ int + msg = "data pushed to queue" + ts = time.Now().Unix() + ) + + ignoreHostname := c.GetBool("ignore_hostname") + ignoreGlobalLabels := c.GetBool("ignore_global_labels") + count := len(list) + series := make([]prompb.TimeSeries, 0, count) + for i := 0; i < len(list); i++ { + if err := list[i].Clean(ts); err != nil { + log.Printf("opentsdb msg clean error: %s\n", err.Error()) + if fail == 0 { + msg = fmt.Sprintf("%s , Error clean: %s", msg, err.Error()) + } + fail++ + continue + } + // add global labels + if !ignoreGlobalLabels { + for k, v := range config.Config.Global.Labels { + if _, has := list[i].Tags[k]; has { + continue + } + list[i].Tags[k] = v + } + } + // add label: agent_hostname + if _, has := list[i].Tags[agentHostnameLabelKey]; !has && !ignoreHostname { + list[i].Tags[agentHostnameLabelKey] = config.Config.GetHostname() + } + + pt, err := list[i].ToProm() + if err != nil { + log.Printf("opentsdb msg to tsdb error: %s\n", err.Error()) + if fail == 0 { + msg = fmt.Sprintf("%s , Error toprom: %s", msg, err.Error()) + } + fail++ + continue + } + + series = append(series, *pt) + succ++ + } + + if fail > 0 { + log.Printf("opentsdb msg process error , msg is : %s\n", string(bytes)) + } + + writer.PostTimeSeries(series) + + c.JSON(200, gin.H{ + "succ": succ, + "fail": fail, + "msg": msg, + }) +} diff --git a/api/router_push.go b/api/router_push.go deleted file mode 100644 index 0e75060..0000000 --- a/api/router_push.go +++ /dev/null @@ -1,323 +0,0 @@ -package api - -import ( - "compress/gzip" - "encoding/json" - "errors" - "fmt" - "io" - "io/ioutil" - "log" - "net/http" - "time" - - "flashcat.cloud/categraf/config" - "flashcat.cloud/categraf/writer" - "github.com/gin-gonic/gin" - "github.com/gogo/protobuf/proto" - "github.com/golang/snappy" - "github.com/prometheus/prometheus/prompb" -) - -const agentHostnameLabelKey = "agent_hostname" - -type push struct { -} - -type Context struct { - *gin.Context -} - -func NewContext(c *gin.Context) *Context { - return &Context{c} -} - -func (c *Context) Failed(code int, err error) { - c.JSON(code, Response{ - Message: err.Error(), - }) -} - -func (c *Context) Success(v ...any) { - r := Response{ - Message: "success", - } - if len(v) > 0 { - r.Data = v[0] - } - c.JSON(http.StatusOK, r) -} - -func (push *push) OpenTSDB(c *gin.Context) { - var ( - err error - bytes []byte - ) - - cc := NewContext(c) - bytes, err = readerGzipBody(c.GetHeader("Content-Encoding"), c.Request) - if err != nil { - cc.Failed(http.StatusBadRequest, err) - return - } - - var list []Metric - if bytes[0] == '[' { - err = json.Unmarshal(bytes, &list) - } else { - var openTSDBMetric Metric - err = json.Unmarshal(bytes, &openTSDBMetric) - list = []Metric{openTSDBMetric} - } - if err != nil { - cc.Failed(http.StatusBadRequest, err) - return - } - - var ( - fail int - success int - msg = "data pushed to queue" - ts = time.Now().Unix() - ) - - ignoreHostname := c.GetBool("ignore_hostname") - ignoreGlobalLabels := c.GetBool("ignore_global_labels") - count := len(list) - series := make([]prompb.TimeSeries, 0, count) - for i := 0; i < len(list); i++ { - if err := list[i].Clean(ts); err != nil { - log.Printf("opentsdb msg clean error: %s\n", err.Error()) - if fail == 0 { - msg = fmt.Sprintf("%s , Error clean: %s", msg, err.Error()) - } - fail++ - continue - } - // add global labels - if !ignoreGlobalLabels { - for k, v := range config.Config.Global.Labels { - if _, has := list[i].Tags[k]; has { - continue - } - list[i].Tags[k] = v - } - } - // add label: agent_hostname - if _, has := list[i].Tags[agentHostnameLabelKey]; !has && !ignoreHostname { - list[i].Tags[agentHostnameLabelKey] = config.Config.GetHostname() - } - - pt, err := list[i].ToProm() - if err != nil { - log.Printf("opentsdb msg to tsdb error: %s\n", err.Error()) - if fail == 0 { - msg = fmt.Sprintf("%s , Error toprom: %s", msg, err.Error()) - } - fail++ - continue - } - - series = append(series, *pt) - success++ - } - - if fail > 0 { - log.Printf("opentsdb msg process error , msg is : %s\n", string(bytes)) - } - - writer.PostTimeSeries(series) - cc.Success(map[string]interface{}{ - "success": success, - "fail": fail, - "msg": msg, - }) -} - -func (push *push) falcon(c *gin.Context) { - var ( - err error - bytes []byte - ) - - cc := NewContext(c) - bytes, err = readerGzipBody(c.GetHeader("Content-Encoding"), c.Request) - if err != nil { - cc.Failed(http.StatusBadRequest, err) - return - } - - var arr []FalconMetric - if bytes[0] == '[' { - err = json.Unmarshal(bytes, &arr) - } else { - var one FalconMetric - err = json.Unmarshal(bytes, &one) - arr = []FalconMetric{one} - } - if err != nil { - cc.Failed(http.StatusBadRequest, err) - return - } - - var ( - fail int - success int - msg = "data pushed to queue" - ts = time.Now().Unix() - ) - - ignoreHostname := c.GetBool("ignore_hostname") - ignoreGlobalLabels := c.GetBool("ignore_global_labels") - count := len(arr) - series := make([]prompb.TimeSeries, 0, count) - for i := 0; i < count; i++ { - if err := arr[i].Clean(ts); err != nil { - fail++ - continue - } - - pt, _, err := arr[i].ToProm() - if err != nil { - fail++ - continue - } - - tags := make(map[string]string) - for _, label := range pt.Labels { - tags[label.Name] = label.Value - } - // add global labels - if !ignoreGlobalLabels { - for k, v := range config.Config.Global.Labels { - if _, has := tags[k]; has { - continue - } - pt.Labels = append(pt.Labels, prompb.Label{Name: k, Value: v}) - } - } - // add label: agent_hostname - if _, has := tags[agentHostnameLabelKey]; !has && !ignoreHostname { - pt.Labels = append(pt.Labels, prompb.Label{Name: agentHostnameLabelKey, Value: config.Config.GetHostname()}) - } - - series = append(series, *pt) - success++ - } - - if fail > 0 { - log.Printf("falconmetric msg process error , msg is : %s\n", string(bytes)) - } - - writer.PostTimeSeries(series) - - cc.Success(map[string]interface{}{ - "success": success, - "fail": fail, - "msg": msg, - }) -} - -func (push *push) remoteWrite(c *gin.Context) { - cc := NewContext(c) - req, err := DecodeWriteRequest(c.Request.Body) - if err != nil { - cc.Failed(http.StatusBadRequest, err) - return - } - - count := len(req.Timeseries) - if count == 0 { - cc.Success() - return - } - - ignoreHostname := c.GetBool("ignore_hostname") - ignoreGlobalLabels := c.GetBool("ignore_global_labels") - for i := 0; i < count; i++ { - // 去除重复的数据 - if duplicateLabelKey(req.Timeseries[i]) { - continue - } - - tags := make(map[string]string) - for _, label := range req.Timeseries[i].Labels { - tags[label.Name] = label.Value - } - // add global labels - if !ignoreGlobalLabels { - for k, v := range config.Config.Global.Labels { - if _, has := tags[k]; has { - continue - } - req.Timeseries[i].Labels = append(req.Timeseries[i].Labels, prompb.Label{Name: k, Value: v}) - } - } - // add label: agent_hostname - if _, has := tags[agentHostnameLabelKey]; !has && !ignoreHostname { - req.Timeseries[i].Labels = append(req.Timeseries[i].Labels, prompb.Label{Name: agentHostnameLabelKey, Value: config.Config.GetHostname()}) - } - } - - writer.PostTimeSeries(req.Timeseries) - - cc.Success() -} - -func duplicateLabelKey(series prompb.TimeSeries) bool { - labelKeys := make(map[string]struct{}) - - for j := 0; j < len(series.Labels); j++ { - if _, has := labelKeys[series.Labels[j].Name]; has { - return true - } else { - labelKeys[series.Labels[j].Name] = struct{}{} - } - } - - return false -} - -func readerGzipBody(contentEncoding string, request *http.Request) (bytes []byte, err error) { - if contentEncoding == "gzip" { - var ( - r *gzip.Reader - ) - r, err = gzip.NewReader(request.Body) - if err != nil { - return nil, err - } - - defer r.Close() - bytes, err = ioutil.ReadAll(r) - } else { - defer request.Body.Close() - bytes, err = ioutil.ReadAll(request.Body) - } - if err != nil || len(bytes) == 0 { - return nil, errors.New("request parameter error") - } - - return bytes, nil -} - -// DecodeWriteRequest from an io.Reader into a prompb.WriteRequest, handling -// snappy decompression. -func DecodeWriteRequest(r io.Reader) (*prompb.WriteRequest, error) { - compressed, err := ioutil.ReadAll(r) - if err != nil { - return nil, err - } - - reqBuf, err := snappy.Decode(nil, compressed) - if err != nil { - return nil, err - } - - var req prompb.WriteRequest - if err := proto.Unmarshal(reqBuf, &req); err != nil { - return nil, err - } - - return &req, nil -} diff --git a/api/router_remotewrite.go b/api/router_remotewrite.go new file mode 100644 index 0000000..d175608 --- /dev/null +++ b/api/router_remotewrite.go @@ -0,0 +1,68 @@ +package api + +import ( + "net/http" + + "flashcat.cloud/categraf/config" + "flashcat.cloud/categraf/writer" + "github.com/gin-gonic/gin" + "github.com/prometheus/prometheus/prompb" +) + +func remoteWrite(c *gin.Context) { + req, err := DecodeWriteRequest(c.Request.Body) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + + count := len(req.Timeseries) + if count == 0 { + c.String(http.StatusBadRequest, "payload empty") + return + } + + ignoreHostname := c.GetBool("ignore_hostname") + ignoreGlobalLabels := c.GetBool("ignore_global_labels") + for i := 0; i < count; i++ { + // 去除重复的数据 + if duplicateLabelKey(req.Timeseries[i]) { + continue + } + + tags := make(map[string]string) + for _, label := range req.Timeseries[i].Labels { + tags[label.Name] = label.Value + } + // add global labels + if !ignoreGlobalLabels { + for k, v := range config.Config.Global.Labels { + if _, has := tags[k]; has { + continue + } + req.Timeseries[i].Labels = append(req.Timeseries[i].Labels, prompb.Label{Name: k, Value: v}) + } + } + // add label: agent_hostname + if _, has := tags[agentHostnameLabelKey]; !has && !ignoreHostname { + req.Timeseries[i].Labels = append(req.Timeseries[i].Labels, prompb.Label{Name: agentHostnameLabelKey, Value: config.Config.GetHostname()}) + } + } + + writer.PostTimeSeries(req.Timeseries) + c.String(200, "forwarding...") +} + +func duplicateLabelKey(series prompb.TimeSeries) bool { + labelKeys := make(map[string]struct{}) + + for j := 0; j < len(series.Labels); j++ { + if _, has := labelKeys[series.Labels[j].Name]; has { + return true + } else { + labelKeys[series.Labels[j].Name] = struct{}{} + } + } + + return false +} diff --git a/api/server.go b/api/server.go index 25245d9..ad79743 100644 --- a/api/server.go +++ b/api/server.go @@ -1,104 +1,68 @@ package api import ( - "context" - "errors" + "crypto/tls" "log" - "net" "net/http" + "strings" + "time" + "flashcat.cloud/categraf/config" + "flashcat.cloud/categraf/pkg/aop" "github.com/gin-gonic/gin" ) -// ServerOption is an HTTP server option. -type ServerOption func(*Server) +func Start() { + conf := config.Config.HTTP + if !conf.Enable { + return + } -// Network with server network. -func Network(network string) ServerOption { - return func(s *Server) { - s.network = network + gin.SetMode(conf.RunMode) + + if strings.ToLower(conf.RunMode) == "release" { + aop.DisableConsoleColor() + } + + r := gin.New() + r.Use(aop.Recovery()) + + if conf.PrintAccess { + r.Use(aop.Logger()) + } + + configRoutes(r) + + srv := &http.Server{ + Addr: conf.Address, + Handler: r, + ReadTimeout: time.Duration(conf.ReadTimeout) * time.Second, + WriteTimeout: time.Duration(conf.WriteTimeout) * time.Second, + IdleTimeout: time.Duration(conf.IdleTimeout) * time.Second, + } + + log.Println("I! http server listening on:", conf.Address) + + var err error + if conf.CertFile != "" && conf.KeyFile != "" { + srv.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12} + err = srv.ListenAndServeTLS(conf.CertFile, conf.KeyFile) + } else { + err = srv.ListenAndServe() + } + + if err != nil && err != http.ErrServerClosed { + panic(err) } } -// Address with server address. -func Address(addr string) ServerOption { - return func(s *Server) { - s.address = addr - } -} - -// Listener with server lis -func Listener(lis net.Listener) ServerOption { - return func(s *Server) { - s.lis = lis - } -} - -type Server struct { - *http.Server - engine *gin.Engine - lis net.Listener - network string - address string -} - -func NewServer(opts ...ServerOption) *Server { - srv := &Server{ - network: "tcp", - address: ":0", - } - for _, o := range opts { - o(srv) - } - - srv.engine = gin.New() - srv.engine.Use(gin.Recovery(), gin.Logger()) - srv.Server = &http.Server{ - Handler: srv.engine, - } - - newRouter(srv) - return srv -} - -// ServeHTTP should write reply headers and data to the ResponseWriter and then return. -func (s *Server) ServeHTTP(res http.ResponseWriter, req *http.Request) { - s.Handler.ServeHTTP(res, req) -} - -func (s *Server) listenAndEndpoint() error { - if s.lis == nil { - lis, err := net.Listen(s.network, s.address) - if err != nil { - return err - } - s.lis = lis - } - - return nil -} - -// Start start the HTTP server. -func (s *Server) Start(ctx context.Context) error { - if err := s.listenAndEndpoint(); err != nil { - return err - } - s.BaseContext = func(net.Listener) context.Context { - return ctx - } - - log.Printf("[HTTP] server listening on: %s]\n", s.lis.Addr().String()) - go func() { - err := s.Serve(s.lis) - if !errors.Is(err, http.ErrServerClosed) { - log.Fatalf("[HTTP] server error: %v", err) - } - }() - return nil -} - -// Stop stop the HTTP server. -func (s *Server) Stop(ctx context.Context) error { - log.Println("[HTTP] server stopping") - return s.Shutdown(ctx) +func configRoutes(r *gin.Engine) { + r.GET("/ping", func(c *gin.Context) { + c.String(200, "pong") + }) + + g := r.Group("/api/push") + g.POST("/opentsdb", openTSDB) + g.POST("/openfalcon", openFalcon) + g.POST("/remotewrite", remoteWrite) } diff --git a/conf/config.toml b/conf/config.toml index f85fd0e..0ccb198 100644 --- a/conf/config.toml +++ b/conf/config.toml @@ -29,10 +29,6 @@ batch = 2000 # channel(as queue) size chan_size = 10000 -[http] -enable = false -address = ":9100" - [[writers]] url = "http://127.0.0.1:19000/prometheus/v1/write" @@ -46,3 +42,9 @@ basic_auth_pass = "" timeout = 5000 dial_timeout = 2500 max_idle_conns_per_host = 100 + +[http] +enable = true +address = ":9100" +print_access = false +run_mode = "release" \ No newline at end of file diff --git a/config/config.go b/config/config.go index e28f83a..9e2f34e 100644 --- a/config/config.go +++ b/config/config.go @@ -44,9 +44,16 @@ type WriterOption struct { MaxIdleConnsPerHost int `toml:"max_idle_conns_per_host"` } -type HTTPServer struct { - Enable bool `toml:"enable"` - Address string `toml:"address"` +type HTTP struct { + Enable bool `toml:"enable"` + Address string `toml:"address"` + PrintAccess bool `toml:"print_access"` + RunMode string `toml:"run_mode"` + CertFile string `toml:"cert_file"` + KeyFile string `toml:"key_file"` + ReadTimeout int `toml:"read_timeout"` + WriteTimeout int `toml:"write_timeout"` + IdleTimeout int `toml:"idle_timeout"` } type ConfigType struct { @@ -62,7 +69,7 @@ type ConfigType struct { Logs Logs `toml:"logs"` MetricsHouse MetricsHouse `toml:"metricshouse"` Traces *traces.Config `toml:"traces"` - HTTPServer *HTTPServer `toml:"http"` + HTTP *HTTP `toml:"http"` Prometheus *Prometheus `toml:"prometheus"` } diff --git a/inputs/nfsclient/nfsclient.go b/inputs/nfsclient/nfsclient.go index dd30d5a..0d0c78b 100644 --- a/inputs/nfsclient/nfsclient.go +++ b/inputs/nfsclient/nfsclient.go @@ -173,7 +173,6 @@ func (s *NfsClient) Init() error { log.Println("D! Including these mount patterns:", s.IncludeMounts) } } else { - log.Println("Including all mounts.") if config.Config.DebugMode { log.Println("D! Including all mounts.") } @@ -215,7 +214,9 @@ func (s *NfsClient) Init() error { func (s *NfsClient) Gather(slist *types.SampleList) { file, err := os.Open(s.mountstatsPath) if err != nil { - log.Println("E! Failed opening the", file, "file:", err) + if config.Config.DebugMode { + log.Println("D! Failed opening the", file, "file:", err) + } return } defer file.Close() diff --git a/main.go b/main.go index 8a47014..1f90e7e 100644 --- a/main.go +++ b/main.go @@ -12,6 +12,7 @@ import ( "syscall" "flashcat.cloud/categraf/agent" + "flashcat.cloud/categraf/api" "flashcat.cloud/categraf/config" "flashcat.cloud/categraf/house" "flashcat.cloud/categraf/pkg/osx" @@ -66,6 +67,8 @@ func main() { initWriters() + go api.Start() + ag := agent.NewAgent(parseFilter(*inputFilters)) runAgent(ag) } diff --git a/pkg/aop/logger.go b/pkg/aop/logger.go new file mode 100644 index 0000000..1dbfb01 --- /dev/null +++ b/pkg/aop/logger.go @@ -0,0 +1,285 @@ +package aop + +import ( + "fmt" + "io" + "log" + "net/http" + "os" + "time" + + "github.com/gin-gonic/gin" + "github.com/mattn/go-isatty" +) + +type consoleColorModeValue int + +const ( + autoColor consoleColorModeValue = iota + disableColor + forceColor +) + +var ( + green = string([]byte{27, 91, 57, 55, 59, 52, 50, 109}) + white = string([]byte{27, 91, 57, 48, 59, 52, 55, 109}) + yellow = string([]byte{27, 91, 57, 48, 59, 52, 51, 109}) + red = string([]byte{27, 91, 57, 55, 59, 52, 49, 109}) + blue = string([]byte{27, 91, 57, 55, 59, 52, 52, 109}) + magenta = string([]byte{27, 91, 57, 55, 59, 52, 53, 109}) + cyan = string([]byte{27, 91, 57, 55, 59, 52, 54, 109}) + reset = string([]byte{27, 91, 48, 109}) + consoleColorMode = autoColor +) + +// LoggerConfig defines the config for Logger middleware. +type LoggerConfig struct { + // Optional. Default value is gin.defaultLogFormatter + Formatter LogFormatter + + // Output is a writer where logs are written. + // Optional. Default value is gin.DefaultWriter. + Output io.Writer + + // SkipPaths is a url path array which logs are not written. + // Optional. + SkipPaths []string +} + +// LogFormatter gives the signature of the formatter function passed to LoggerWithFormatter +type LogFormatter func(params LogFormatterParams) string + +// LogFormatterParams is the structure any formatter will be handed when time to log comes +type LogFormatterParams struct { + Request *http.Request + + // TimeStamp shows the time after the server returns a response. + TimeStamp time.Time + // StatusCode is HTTP response code. + StatusCode int + // Latency is how much time the server cost to process a certain request. + Latency time.Duration + // ClientIP equals Context's ClientIP method. + ClientIP string + // Method is the HTTP method given to the request. + Method string + // Path is a path the client requests. + Path string + // ErrorMessage is set if error has occurred in processing the request. + ErrorMessage string + // isTerm shows whether does gin's output descriptor refers to a terminal. + isTerm bool + // BodySize is the size of the Response Body + BodySize int + // Keys are the keys set on the request's context. + Keys map[string]interface{} +} + +// StatusCodeColor is the ANSI color for appropriately logging http status code to a terminal. +func (p *LogFormatterParams) StatusCodeColor() string { + code := p.StatusCode + + switch { + case code >= http.StatusOK && code < http.StatusMultipleChoices: + return green + case code >= http.StatusMultipleChoices && code < http.StatusBadRequest: + return white + case code >= http.StatusBadRequest && code < http.StatusInternalServerError: + return yellow + default: + return red + } +} + +// MethodColor is the ANSI color for appropriately logging http method to a terminal. +func (p *LogFormatterParams) MethodColor() string { + method := p.Method + + switch method { + case "GET": + return blue + case "POST": + return cyan + case "PUT": + return yellow + case "DELETE": + return red + case "PATCH": + return green + case "HEAD": + return magenta + case "OPTIONS": + return white + default: + return reset + } +} + +// ResetColor resets all escape attributes. +func (p *LogFormatterParams) ResetColor() string { + return reset +} + +// IsOutputColor indicates whether can colors be outputted to the log. +func (p *LogFormatterParams) IsOutputColor() bool { + return consoleColorMode == forceColor || (consoleColorMode == autoColor && p.isTerm) +} + +// defaultLogFormatter is the default log format function Logger middleware uses. +var defaultLogFormatter = func(param LogFormatterParams) string { + var statusColor, methodColor, resetColor string + if param.IsOutputColor() { + statusColor = param.StatusCodeColor() + methodColor = param.MethodColor() + resetColor = param.ResetColor() + } + + if param.Latency > time.Minute { + // Truncate in a golang < 1.8 safe way + param.Latency = param.Latency - param.Latency%time.Second + } + return fmt.Sprintf("[GIN] |%s %3d %s| %13v | %15s |%s %-7s %s %s\n%s", + statusColor, param.StatusCode, resetColor, + param.Latency, + param.ClientIP, + methodColor, param.Method, resetColor, + param.Path, + param.ErrorMessage, + ) +} + +// DisableConsoleColor disables color output in the console. +func DisableConsoleColor() { + consoleColorMode = disableColor +} + +// ForceConsoleColor force color output in the console. +func ForceConsoleColor() { + consoleColorMode = forceColor +} + +// ErrorLogger returns a handlerfunc for any error type. +func ErrorLogger() gin.HandlerFunc { + return ErrorLoggerT(gin.ErrorTypeAny) +} + +// ErrorLoggerT returns a handlerfunc for a given error type. +func ErrorLoggerT(typ gin.ErrorType) gin.HandlerFunc { + return func(c *gin.Context) { + c.Next() + errors := c.Errors.ByType(typ) + if len(errors) > 0 { + c.JSON(-1, errors) + } + } +} + +// Logger instances a Logger middleware that will write the logs to gin.DefaultWriter. +// By default gin.DefaultWriter = os.Stdout. +func Logger() gin.HandlerFunc { + return LoggerWithConfig(LoggerConfig{}) +} + +// LoggerWithFormatter instance a Logger middleware with the specified log format function. +func LoggerWithFormatter(f LogFormatter) gin.HandlerFunc { + return LoggerWithConfig(LoggerConfig{ + Formatter: f, + }) +} + +// LoggerWithWriter instance a Logger middleware with the specified writer buffer. +// Example: os.Stdout, a file opened in write mode, a socket... +func LoggerWithWriter(out io.Writer, notlogged ...string) gin.HandlerFunc { + return LoggerWithConfig(LoggerConfig{ + Output: out, + SkipPaths: notlogged, + }) +} + +// LoggerWithConfig instance a Logger middleware with config. +func LoggerWithConfig(conf LoggerConfig) gin.HandlerFunc { + formatter := conf.Formatter + if formatter == nil { + formatter = defaultLogFormatter + } + + out := conf.Output + if out == nil { + out = os.Stdout + } + + notlogged := conf.SkipPaths + + isTerm := true + + if w, ok := out.(*os.File); !ok || os.Getenv("TERM") == "dumb" || + (!isatty.IsTerminal(w.Fd()) && !isatty.IsCygwinTerminal(w.Fd())) { + isTerm = false + } + + var skip map[string]struct{} + + if length := len(notlogged); length > 0 { + skip = make(map[string]struct{}, length) + + for _, path := range notlogged { + skip[path] = struct{}{} + } + } + + return func(c *gin.Context) { + // Start timer + start := time.Now() + path := c.Request.URL.Path + raw := c.Request.URL.RawQuery + + // var ( + // rdr1 io.ReadCloser + // rdr2 io.ReadCloser + // ) + + // if c.Request.Method != "GET" { + // buf, _ := ioutil.ReadAll(c.Request.Body) + // rdr1 = ioutil.NopCloser(bytes.NewBuffer(buf)) + // rdr2 = ioutil.NopCloser(bytes.NewBuffer(buf)) + + // c.Request.Body = rdr2 + // } + + // Process request + c.Next() + + // Log only when path is not being skipped + if _, ok := skip[path]; !ok { + param := LogFormatterParams{ + Request: c.Request, + isTerm: isTerm, + Keys: c.Keys, + } + + // Stop timer + param.TimeStamp = time.Now() + param.Latency = param.TimeStamp.Sub(start) + + param.ClientIP = c.ClientIP() + param.Method = c.Request.Method + param.StatusCode = c.Writer.Status() + param.ErrorMessage = c.Errors.ByType(gin.ErrorTypePrivate).String() + + param.BodySize = c.Writer.Size() + + if raw != "" { + path = path + "?" + raw + } + + param.Path = path + + // fmt.Fprint(out, formatter(param)) + log.Println("I!", formatter(param)) + + // if c.Request.Method != "GET" { + // logger.Debug(readBody(rdr1)) + // } + } + } +} diff --git a/pkg/aop/recovery.go b/pkg/aop/recovery.go new file mode 100644 index 0000000..5672a2a --- /dev/null +++ b/pkg/aop/recovery.go @@ -0,0 +1,165 @@ +package aop + +// Copyright 2014 Manu Martinez-Almeida. All rights reserved. +// Use of this source code is governed by a MIT style +// license that can be found in the LICENSE file. + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "log" + "net" + "net/http" + "net/http/httputil" + "os" + "runtime" + "strings" + "time" + + "github.com/gin-gonic/gin" + "github.com/toolkits/pkg/errorx" + "github.com/toolkits/pkg/i18n" +) + +var ( + dunno = []byte("???") + centerDot = []byte("·") + dot = []byte(".") + slash = []byte("/") +) + +// Recovery returns a middleware that recovers from any panics and writes a 500 if there was one. +func Recovery() gin.HandlerFunc { + return RecoveryWithWriter(gin.DefaultErrorWriter) +} + +// RecoveryWithWriter returns a middleware for a given writer that recovers from any panics and writes a 500 if there was one. +func RecoveryWithWriter(out io.Writer) gin.HandlerFunc { + var logger *log.Logger + if out != nil { + logger = log.New(out, "\n\n\x1b[31m", log.LstdFlags) + } + return func(c *gin.Context) { + defer func() { + if err := recover(); err != nil { + // custom error + if e, ok := err.(errorx.PageError); ok { + if e.Code != 200 { + c.String(e.Code, i18n.Sprintf(c.GetHeader("X-Language"), e.Message)) + } else { + c.JSON(e.Code, gin.H{"err": i18n.Sprintf(c.GetHeader("X-Language"), e.Message)}) + } + c.Abort() + return + } + + // Check for a broken connection, as it is not really a + // condition that warrants a panic stack trace. + var brokenPipe bool + if ne, ok := err.(*net.OpError); ok { + if se, ok := ne.Err.(*os.SyscallError); ok { + if strings.Contains(strings.ToLower(se.Error()), "broken pipe") || strings.Contains(strings.ToLower(se.Error()), "connection reset by peer") { + brokenPipe = true + } + } + } + if logger != nil { + stack := stack(3) + httpRequest, _ := httputil.DumpRequest(c.Request, false) + headers := strings.Split(string(httpRequest), "\r\n") + for idx, header := range headers { + current := strings.Split(header, ":") + if current[0] == "Authorization" { + headers[idx] = current[0] + ": *" + } + } + if brokenPipe { + logger.Printf("%s\n%s%s", err, string(httpRequest), reset) + } else if gin.IsDebugging() { + logger.Printf("[Recovery] %s panic recovered:\n%s\n%s\n%s%s", + timeFormat(time.Now()), strings.Join(headers, "\r\n"), err, stack, reset) + } else { + logger.Printf("[Recovery] %s panic recovered:\n%s\n%s%s", + timeFormat(time.Now()), err, stack, reset) + } + } + + // If the connection is dead, we can't write a status to it. + if brokenPipe { + c.Error(err.(error)) // nolint: errcheck + c.Abort() + } else { + c.AbortWithStatus(http.StatusInternalServerError) + } + } + }() + c.Next() + } +} + +// stack returns a nicely formatted stack frame, skipping skip frames. +func stack(skip int) []byte { + buf := new(bytes.Buffer) // the returned data + // As we loop, we open files and read them. These variables record the currently + // loaded file. + var lines [][]byte + var lastFile string + for i := skip; ; i++ { // Skip the expected number of frames + pc, file, line, ok := runtime.Caller(i) + if !ok { + break + } + // Print this much at least. If we can't find the source, it won't show. + fmt.Fprintf(buf, "%s:%d (0x%x)\n", file, line, pc) + if file != lastFile { + data, err := ioutil.ReadFile(file) + if err != nil { + continue + } + lines = bytes.Split(data, []byte{'\n'}) + lastFile = file + } + fmt.Fprintf(buf, "\t%s: %s\n", function(pc), source(lines, line)) + } + return buf.Bytes() +} + +// source returns a space-trimmed slice of the n'th line. +func source(lines [][]byte, n int) []byte { + n-- // in stack trace, lines are 1-indexed but our array is 0-indexed + if n < 0 || n >= len(lines) { + return dunno + } + return bytes.TrimSpace(lines[n]) +} + +// function returns, if possible, the name of the function containing the PC. +func function(pc uintptr) []byte { + fn := runtime.FuncForPC(pc) + if fn == nil { + return dunno + } + name := []byte(fn.Name()) + // The name includes the path name to the package, which is unnecessary + // since the file name is already included. Plus, it has center dots. + // That is, we see + // runtime/debug.*T·ptrmethod + // and want + // *T.ptrmethod + // Also the package path might contains dot (e.g. code.google.com/...), + // so first eliminate the path prefix + if lastSlash := bytes.LastIndex(name, slash); lastSlash >= 0 { + name = name[lastSlash+1:] + } + if period := bytes.Index(name, dot); period >= 0 { + name = name[period+1:] + } + name = bytes.Replace(name, centerDot, dot, -1) + return name +} + +func timeFormat(t time.Time) string { + return t.Format("2006/01/02 - 15:04:05") +}