code refactor for http server

This commit is contained in:
Ulric Qin 2022-08-03 10:41:44 +08:00
parent af8f56b037
commit 17d37dc68b
16 changed files with 948 additions and 612 deletions

View File

@ -3,7 +3,6 @@ package agent
import ( import (
"log" "log"
"flashcat.cloud/categraf/api"
"flashcat.cloud/categraf/traces" "flashcat.cloud/categraf/traces"
// auto registry // auto registry
@ -53,7 +52,6 @@ type Agent struct {
InputFilters map[string]struct{} InputFilters map[string]struct{}
InputReaders map[string]*InputReader InputReaders map[string]*InputReader
TraceCollector *traces.Collector TraceCollector *traces.Collector
Server *api.Server
} }
func NewAgent(filters map[string]struct{}) *Agent { func NewAgent(filters map[string]struct{}) *Agent {
@ -75,11 +73,6 @@ func (a *Agent) Start() {
log.Println(err) log.Println(err)
} }
a.startPrometheusScrape() a.startPrometheusScrape()
err = a.startHttpAgent()
if err != nil {
log.Println(err)
}
log.Println("I! agent started") log.Println("I! agent started")
} }
@ -92,11 +85,6 @@ func (a *Agent) Stop() {
log.Println(err) log.Println(err)
} }
a.stopPrometheusScrape() a.stopPrometheusScrape()
err = a.stopHttpAgent()
if err != nil {
log.Println(err)
}
log.Println("I! agent stopped") log.Println("I! agent stopped")
} }

View File

@ -1,44 +0,0 @@
package agent
import (
"context"
"log"
"flashcat.cloud/categraf/api"
"flashcat.cloud/categraf/config"
)
func (a *Agent) startHttpAgent() (err error) {
if config.Config.HTTPServer == nil || !config.Config.HTTPServer.Enable {
return nil
}
defer func() {
if err != nil {
log.Println("E! failed to start http agent:", err)
}
}()
server := api.NewServer(api.Address(config.Config.HTTPServer.Address))
err = server.Start(context.TODO())
if err != nil {
return err
}
a.Server = server
return nil
}
func (a *Agent) stopHttpAgent() (err error) {
if config.Config.HTTPServer == nil || !config.Config.HTTPServer.Enable || a.Server == nil {
return nil
}
defer func() {
if err != nil {
log.Println("E! failed to stop http agent:", err)
}
}()
return a.Server.Stop(context.TODO())
}

View File

@ -1,6 +0,0 @@
package api
type Response struct {
Message string `json:"message,omitempty"`
Data interface{} `json:"data,omitempty"`
}

View File

@ -1,26 +0,0 @@
package api
import (
"github.com/gin-gonic/gin"
)
type Router struct {
*gin.Engine
}
func newRouter(srv *Server) *Router {
r := &Router{
Engine: srv.engine,
}
r.push()
return r
}
func (r *Router) push() {
p := push{}
g := r.Group("/api/push")
g.POST("/opentsdb", p.OpenTSDB) // 发送OpenTSDB数据
g.POST("/openfalcon", p.falcon) // 发送OpenFalcon数据
g.POST("/prometheus", p.remoteWrite) // 发送Prometheus数据
}

View File

@ -1,115 +1,21 @@
package api package api
import ( import (
"encoding/json"
"fmt" "fmt"
"log"
"net/http"
"strconv" "strconv"
"strings" "strings"
"time"
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/writer"
"github.com/gin-gonic/gin"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/prompb"
) )
type Metric struct {
Metric string `json:"metric"`
Timestamp int64 `json:"timestamp"`
ValueUnTyped interface{} `json:"value"`
Value float64 `json:"-"`
Tags map[string]string `json:"tags"`
}
func (m *Metric) Clean(ts int64) error {
if m.Metric == "" {
return fmt.Errorf("metric is blank")
}
switch v := m.ValueUnTyped.(type) {
case string:
if f, err := strconv.ParseFloat(v, 64); err == nil {
m.Value = f
} else {
return fmt.Errorf("unparseable value %v", v)
}
case float64:
m.Value = v
case uint64:
m.Value = float64(v)
case int64:
m.Value = float64(v)
case int:
m.Value = float64(v)
default:
return fmt.Errorf("unparseable value %v", v)
}
// if timestamp bigger than 32 bits, likely in milliseconds
if m.Timestamp > 0xffffffff {
m.Timestamp /= 1000
}
// If the timestamp is greater than 5 minutes, the current time shall prevail
diff := m.Timestamp - ts
if diff > 300 {
m.Timestamp = ts
}
return nil
}
func (m *Metric) ToProm() (*prompb.TimeSeries, error) {
pt := &prompb.TimeSeries{}
pt.Samples = append(pt.Samples, prompb.Sample{
// use ms
Timestamp: m.Timestamp * 1000,
Value: m.Value,
})
if strings.IndexByte(m.Metric, '.') != -1 {
m.Metric = strings.ReplaceAll(m.Metric, ".", "_")
}
if strings.IndexByte(m.Metric, '-') != -1 {
m.Metric = strings.ReplaceAll(m.Metric, "-", "_")
}
if !model.MetricNameRE.MatchString(m.Metric) {
return nil, fmt.Errorf("invalid metric name: %s", m.Metric)
}
pt.Labels = append(pt.Labels, prompb.Label{
Name: model.MetricNameLabel,
Value: m.Metric,
})
if _, exists := m.Tags["ident"]; !exists {
// rename tag key
host, has := m.Tags["host"]
if has {
delete(m.Tags, "host")
m.Tags["ident"] = host
}
}
for key, value := range m.Tags {
if strings.IndexByte(key, '.') != -1 {
key = strings.ReplaceAll(key, ".", "_")
}
if strings.IndexByte(key, '-') != -1 {
key = strings.ReplaceAll(key, "-", "_")
}
if !model.LabelNameRE.MatchString(key) {
return nil, fmt.Errorf("invalid tag name: %s", key)
}
pt.Labels = append(pt.Labels, prompb.Label{
Name: key,
Value: value,
})
}
return pt, nil
}
type FalconMetric struct { type FalconMetric struct {
Metric string `json:"metric"` Metric string `json:"metric"`
Endpoint string `json:"endpoint"` Endpoint string `json:"endpoint"`
@ -228,3 +134,86 @@ func (m *FalconMetric) ToProm() (*prompb.TimeSeries, string, error) {
return pt, ident, nil return pt, ident, nil
} }
func openFalcon(c *gin.Context) {
var (
err error
bytes []byte
)
bytes, err = readerGzipBody(c.GetHeader("Content-Encoding"), c.Request)
if err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}
var arr []FalconMetric
if bytes[0] == '[' {
err = json.Unmarshal(bytes, &arr)
} else {
var one FalconMetric
err = json.Unmarshal(bytes, &one)
arr = []FalconMetric{one}
}
if err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}
var (
fail int
succ int
msg = "data pushed to queue"
ts = time.Now().Unix()
)
ignoreHostname := c.GetBool("ignore_hostname")
ignoreGlobalLabels := c.GetBool("ignore_global_labels")
count := len(arr)
series := make([]prompb.TimeSeries, 0, count)
for i := 0; i < count; i++ {
if err := arr[i].Clean(ts); err != nil {
fail++
continue
}
pt, _, err := arr[i].ToProm()
if err != nil {
fail++
continue
}
tags := make(map[string]string)
for _, label := range pt.Labels {
tags[label.Name] = label.Value
}
// add global labels
if !ignoreGlobalLabels {
for k, v := range config.Config.Global.Labels {
if _, has := tags[k]; has {
continue
}
pt.Labels = append(pt.Labels, prompb.Label{Name: k, Value: v})
}
}
// add label: agent_hostname
if _, has := tags[agentHostnameLabelKey]; !has && !ignoreHostname {
pt.Labels = append(pt.Labels, prompb.Label{Name: agentHostnameLabelKey, Value: config.Config.GetHostname()})
}
series = append(series, *pt)
succ++
}
if fail > 0 {
log.Printf("falconmetric msg process error , msg is : %s\n", string(bytes))
}
writer.PostTimeSeries(series)
c.JSON(200, gin.H{
"succ": succ,
"fail": fail,
"msg": msg,
})
}

59
api/router_func.go Normal file
View File

@ -0,0 +1,59 @@
package api
import (
"compress/gzip"
"errors"
"io"
"io/ioutil"
"net/http"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/prometheus/prompb"
)
const agentHostnameLabelKey = "agent_hostname"
func readerGzipBody(contentEncoding string, request *http.Request) (bytes []byte, err error) {
if contentEncoding == "gzip" {
var (
r *gzip.Reader
)
r, err = gzip.NewReader(request.Body)
if err != nil {
return nil, err
}
defer r.Close()
bytes, err = ioutil.ReadAll(r)
} else {
defer request.Body.Close()
bytes, err = ioutil.ReadAll(request.Body)
}
if err != nil || len(bytes) == 0 {
return nil, errors.New("request parameter error")
}
return bytes, nil
}
// DecodeWriteRequest from an io.Reader into a prompb.WriteRequest, handling
// snappy decompression.
func DecodeWriteRequest(r io.Reader) (*prompb.WriteRequest, error) {
compressed, err := ioutil.ReadAll(r)
if err != nil {
return nil, err
}
reqBuf, err := snappy.Decode(nil, compressed)
if err != nil {
return nil, err
}
var req prompb.WriteRequest
if err := proto.Unmarshal(reqBuf, &req); err != nil {
return nil, err
}
return &req, nil
}

204
api/router_opentsdb.go Normal file
View File

@ -0,0 +1,204 @@
package api
import (
"encoding/json"
"fmt"
"log"
"net/http"
"strconv"
"strings"
"time"
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/writer"
"github.com/gin-gonic/gin"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
)
type Metric struct {
Metric string `json:"metric"`
Timestamp int64 `json:"timestamp"`
ValueUnTyped interface{} `json:"value"`
Value float64 `json:"-"`
Tags map[string]string `json:"tags"`
}
func (m *Metric) Clean(ts int64) error {
if m.Metric == "" {
return fmt.Errorf("metric is blank")
}
switch v := m.ValueUnTyped.(type) {
case string:
if f, err := strconv.ParseFloat(v, 64); err == nil {
m.Value = f
} else {
return fmt.Errorf("unparseable value %v", v)
}
case float64:
m.Value = v
case uint64:
m.Value = float64(v)
case int64:
m.Value = float64(v)
case int:
m.Value = float64(v)
default:
return fmt.Errorf("unparseable value %v", v)
}
// if timestamp bigger than 32 bits, likely in milliseconds
if m.Timestamp > 0xffffffff {
m.Timestamp /= 1000
}
// If the timestamp is greater than 5 minutes, the current time shall prevail
diff := m.Timestamp - ts
if diff > 300 {
m.Timestamp = ts
}
return nil
}
func (m *Metric) ToProm() (*prompb.TimeSeries, error) {
pt := &prompb.TimeSeries{}
pt.Samples = append(pt.Samples, prompb.Sample{
// use ms
Timestamp: m.Timestamp * 1000,
Value: m.Value,
})
if strings.IndexByte(m.Metric, '.') != -1 {
m.Metric = strings.ReplaceAll(m.Metric, ".", "_")
}
if strings.IndexByte(m.Metric, '-') != -1 {
m.Metric = strings.ReplaceAll(m.Metric, "-", "_")
}
if !model.MetricNameRE.MatchString(m.Metric) {
return nil, fmt.Errorf("invalid metric name: %s", m.Metric)
}
pt.Labels = append(pt.Labels, prompb.Label{
Name: model.MetricNameLabel,
Value: m.Metric,
})
if _, exists := m.Tags["ident"]; !exists {
// rename tag key
host, has := m.Tags["host"]
if has {
delete(m.Tags, "host")
m.Tags["ident"] = host
}
}
for key, value := range m.Tags {
if strings.IndexByte(key, '.') != -1 {
key = strings.ReplaceAll(key, ".", "_")
}
if strings.IndexByte(key, '-') != -1 {
key = strings.ReplaceAll(key, "-", "_")
}
if !model.LabelNameRE.MatchString(key) {
return nil, fmt.Errorf("invalid tag name: %s", key)
}
pt.Labels = append(pt.Labels, prompb.Label{
Name: key,
Value: value,
})
}
return pt, nil
}
func openTSDB(c *gin.Context) {
var (
err error
bytes []byte
)
bytes, err = readerGzipBody(c.GetHeader("Content-Encoding"), c.Request)
if err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}
var list []Metric
if bytes[0] == '[' {
err = json.Unmarshal(bytes, &list)
} else {
var openTSDBMetric Metric
err = json.Unmarshal(bytes, &openTSDBMetric)
list = []Metric{openTSDBMetric}
}
if err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}
var (
fail int
succ int
msg = "data pushed to queue"
ts = time.Now().Unix()
)
ignoreHostname := c.GetBool("ignore_hostname")
ignoreGlobalLabels := c.GetBool("ignore_global_labels")
count := len(list)
series := make([]prompb.TimeSeries, 0, count)
for i := 0; i < len(list); i++ {
if err := list[i].Clean(ts); err != nil {
log.Printf("opentsdb msg clean error: %s\n", err.Error())
if fail == 0 {
msg = fmt.Sprintf("%s , Error clean: %s", msg, err.Error())
}
fail++
continue
}
// add global labels
if !ignoreGlobalLabels {
for k, v := range config.Config.Global.Labels {
if _, has := list[i].Tags[k]; has {
continue
}
list[i].Tags[k] = v
}
}
// add label: agent_hostname
if _, has := list[i].Tags[agentHostnameLabelKey]; !has && !ignoreHostname {
list[i].Tags[agentHostnameLabelKey] = config.Config.GetHostname()
}
pt, err := list[i].ToProm()
if err != nil {
log.Printf("opentsdb msg to tsdb error: %s\n", err.Error())
if fail == 0 {
msg = fmt.Sprintf("%s , Error toprom: %s", msg, err.Error())
}
fail++
continue
}
series = append(series, *pt)
succ++
}
if fail > 0 {
log.Printf("opentsdb msg process error , msg is : %s\n", string(bytes))
}
writer.PostTimeSeries(series)
c.JSON(200, gin.H{
"succ": succ,
"fail": fail,
"msg": msg,
})
}

View File

@ -1,323 +0,0 @@
package api
import (
"compress/gzip"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"time"
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/writer"
"github.com/gin-gonic/gin"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/prometheus/prompb"
)
const agentHostnameLabelKey = "agent_hostname"
type push struct {
}
type Context struct {
*gin.Context
}
func NewContext(c *gin.Context) *Context {
return &Context{c}
}
func (c *Context) Failed(code int, err error) {
c.JSON(code, Response{
Message: err.Error(),
})
}
func (c *Context) Success(v ...any) {
r := Response{
Message: "success",
}
if len(v) > 0 {
r.Data = v[0]
}
c.JSON(http.StatusOK, r)
}
func (push *push) OpenTSDB(c *gin.Context) {
var (
err error
bytes []byte
)
cc := NewContext(c)
bytes, err = readerGzipBody(c.GetHeader("Content-Encoding"), c.Request)
if err != nil {
cc.Failed(http.StatusBadRequest, err)
return
}
var list []Metric
if bytes[0] == '[' {
err = json.Unmarshal(bytes, &list)
} else {
var openTSDBMetric Metric
err = json.Unmarshal(bytes, &openTSDBMetric)
list = []Metric{openTSDBMetric}
}
if err != nil {
cc.Failed(http.StatusBadRequest, err)
return
}
var (
fail int
success int
msg = "data pushed to queue"
ts = time.Now().Unix()
)
ignoreHostname := c.GetBool("ignore_hostname")
ignoreGlobalLabels := c.GetBool("ignore_global_labels")
count := len(list)
series := make([]prompb.TimeSeries, 0, count)
for i := 0; i < len(list); i++ {
if err := list[i].Clean(ts); err != nil {
log.Printf("opentsdb msg clean error: %s\n", err.Error())
if fail == 0 {
msg = fmt.Sprintf("%s , Error clean: %s", msg, err.Error())
}
fail++
continue
}
// add global labels
if !ignoreGlobalLabels {
for k, v := range config.Config.Global.Labels {
if _, has := list[i].Tags[k]; has {
continue
}
list[i].Tags[k] = v
}
}
// add label: agent_hostname
if _, has := list[i].Tags[agentHostnameLabelKey]; !has && !ignoreHostname {
list[i].Tags[agentHostnameLabelKey] = config.Config.GetHostname()
}
pt, err := list[i].ToProm()
if err != nil {
log.Printf("opentsdb msg to tsdb error: %s\n", err.Error())
if fail == 0 {
msg = fmt.Sprintf("%s , Error toprom: %s", msg, err.Error())
}
fail++
continue
}
series = append(series, *pt)
success++
}
if fail > 0 {
log.Printf("opentsdb msg process error , msg is : %s\n", string(bytes))
}
writer.PostTimeSeries(series)
cc.Success(map[string]interface{}{
"success": success,
"fail": fail,
"msg": msg,
})
}
func (push *push) falcon(c *gin.Context) {
var (
err error
bytes []byte
)
cc := NewContext(c)
bytes, err = readerGzipBody(c.GetHeader("Content-Encoding"), c.Request)
if err != nil {
cc.Failed(http.StatusBadRequest, err)
return
}
var arr []FalconMetric
if bytes[0] == '[' {
err = json.Unmarshal(bytes, &arr)
} else {
var one FalconMetric
err = json.Unmarshal(bytes, &one)
arr = []FalconMetric{one}
}
if err != nil {
cc.Failed(http.StatusBadRequest, err)
return
}
var (
fail int
success int
msg = "data pushed to queue"
ts = time.Now().Unix()
)
ignoreHostname := c.GetBool("ignore_hostname")
ignoreGlobalLabels := c.GetBool("ignore_global_labels")
count := len(arr)
series := make([]prompb.TimeSeries, 0, count)
for i := 0; i < count; i++ {
if err := arr[i].Clean(ts); err != nil {
fail++
continue
}
pt, _, err := arr[i].ToProm()
if err != nil {
fail++
continue
}
tags := make(map[string]string)
for _, label := range pt.Labels {
tags[label.Name] = label.Value
}
// add global labels
if !ignoreGlobalLabels {
for k, v := range config.Config.Global.Labels {
if _, has := tags[k]; has {
continue
}
pt.Labels = append(pt.Labels, prompb.Label{Name: k, Value: v})
}
}
// add label: agent_hostname
if _, has := tags[agentHostnameLabelKey]; !has && !ignoreHostname {
pt.Labels = append(pt.Labels, prompb.Label{Name: agentHostnameLabelKey, Value: config.Config.GetHostname()})
}
series = append(series, *pt)
success++
}
if fail > 0 {
log.Printf("falconmetric msg process error , msg is : %s\n", string(bytes))
}
writer.PostTimeSeries(series)
cc.Success(map[string]interface{}{
"success": success,
"fail": fail,
"msg": msg,
})
}
func (push *push) remoteWrite(c *gin.Context) {
cc := NewContext(c)
req, err := DecodeWriteRequest(c.Request.Body)
if err != nil {
cc.Failed(http.StatusBadRequest, err)
return
}
count := len(req.Timeseries)
if count == 0 {
cc.Success()
return
}
ignoreHostname := c.GetBool("ignore_hostname")
ignoreGlobalLabels := c.GetBool("ignore_global_labels")
for i := 0; i < count; i++ {
// 去除重复的数据
if duplicateLabelKey(req.Timeseries[i]) {
continue
}
tags := make(map[string]string)
for _, label := range req.Timeseries[i].Labels {
tags[label.Name] = label.Value
}
// add global labels
if !ignoreGlobalLabels {
for k, v := range config.Config.Global.Labels {
if _, has := tags[k]; has {
continue
}
req.Timeseries[i].Labels = append(req.Timeseries[i].Labels, prompb.Label{Name: k, Value: v})
}
}
// add label: agent_hostname
if _, has := tags[agentHostnameLabelKey]; !has && !ignoreHostname {
req.Timeseries[i].Labels = append(req.Timeseries[i].Labels, prompb.Label{Name: agentHostnameLabelKey, Value: config.Config.GetHostname()})
}
}
writer.PostTimeSeries(req.Timeseries)
cc.Success()
}
func duplicateLabelKey(series prompb.TimeSeries) bool {
labelKeys := make(map[string]struct{})
for j := 0; j < len(series.Labels); j++ {
if _, has := labelKeys[series.Labels[j].Name]; has {
return true
} else {
labelKeys[series.Labels[j].Name] = struct{}{}
}
}
return false
}
func readerGzipBody(contentEncoding string, request *http.Request) (bytes []byte, err error) {
if contentEncoding == "gzip" {
var (
r *gzip.Reader
)
r, err = gzip.NewReader(request.Body)
if err != nil {
return nil, err
}
defer r.Close()
bytes, err = ioutil.ReadAll(r)
} else {
defer request.Body.Close()
bytes, err = ioutil.ReadAll(request.Body)
}
if err != nil || len(bytes) == 0 {
return nil, errors.New("request parameter error")
}
return bytes, nil
}
// DecodeWriteRequest from an io.Reader into a prompb.WriteRequest, handling
// snappy decompression.
func DecodeWriteRequest(r io.Reader) (*prompb.WriteRequest, error) {
compressed, err := ioutil.ReadAll(r)
if err != nil {
return nil, err
}
reqBuf, err := snappy.Decode(nil, compressed)
if err != nil {
return nil, err
}
var req prompb.WriteRequest
if err := proto.Unmarshal(reqBuf, &req); err != nil {
return nil, err
}
return &req, nil
}

68
api/router_remotewrite.go Normal file
View File

@ -0,0 +1,68 @@
package api
import (
"net/http"
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/writer"
"github.com/gin-gonic/gin"
"github.com/prometheus/prometheus/prompb"
)
func remoteWrite(c *gin.Context) {
req, err := DecodeWriteRequest(c.Request.Body)
if err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}
count := len(req.Timeseries)
if count == 0 {
c.String(http.StatusBadRequest, "payload empty")
return
}
ignoreHostname := c.GetBool("ignore_hostname")
ignoreGlobalLabels := c.GetBool("ignore_global_labels")
for i := 0; i < count; i++ {
// 去除重复的数据
if duplicateLabelKey(req.Timeseries[i]) {
continue
}
tags := make(map[string]string)
for _, label := range req.Timeseries[i].Labels {
tags[label.Name] = label.Value
}
// add global labels
if !ignoreGlobalLabels {
for k, v := range config.Config.Global.Labels {
if _, has := tags[k]; has {
continue
}
req.Timeseries[i].Labels = append(req.Timeseries[i].Labels, prompb.Label{Name: k, Value: v})
}
}
// add label: agent_hostname
if _, has := tags[agentHostnameLabelKey]; !has && !ignoreHostname {
req.Timeseries[i].Labels = append(req.Timeseries[i].Labels, prompb.Label{Name: agentHostnameLabelKey, Value: config.Config.GetHostname()})
}
}
writer.PostTimeSeries(req.Timeseries)
c.String(200, "forwarding...")
}
func duplicateLabelKey(series prompb.TimeSeries) bool {
labelKeys := make(map[string]struct{})
for j := 0; j < len(series.Labels); j++ {
if _, has := labelKeys[series.Labels[j].Name]; has {
return true
} else {
labelKeys[series.Labels[j].Name] = struct{}{}
}
}
return false
}

View File

@ -1,104 +1,68 @@
package api package api
import ( import (
"context" "crypto/tls"
"errors"
"log" "log"
"net"
"net/http" "net/http"
"strings"
"time"
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/pkg/aop"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
) )
// ServerOption is an HTTP server option. func Start() {
type ServerOption func(*Server) conf := config.Config.HTTP
if !conf.Enable {
return
}
// Network with server network. gin.SetMode(conf.RunMode)
func Network(network string) ServerOption {
return func(s *Server) { if strings.ToLower(conf.RunMode) == "release" {
s.network = network aop.DisableConsoleColor()
}
r := gin.New()
r.Use(aop.Recovery())
if conf.PrintAccess {
r.Use(aop.Logger())
}
configRoutes(r)
srv := &http.Server{
Addr: conf.Address,
Handler: r,
ReadTimeout: time.Duration(conf.ReadTimeout) * time.Second,
WriteTimeout: time.Duration(conf.WriteTimeout) * time.Second,
IdleTimeout: time.Duration(conf.IdleTimeout) * time.Second,
}
log.Println("I! http server listening on:", conf.Address)
var err error
if conf.CertFile != "" && conf.KeyFile != "" {
srv.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12}
err = srv.ListenAndServeTLS(conf.CertFile, conf.KeyFile)
} else {
err = srv.ListenAndServe()
}
if err != nil && err != http.ErrServerClosed {
panic(err)
} }
} }
// Address with server address. func configRoutes(r *gin.Engine) {
func Address(addr string) ServerOption { r.GET("/ping", func(c *gin.Context) {
return func(s *Server) { c.String(200, "pong")
s.address = addr })
}
} g := r.Group("/api/push")
g.POST("/opentsdb", openTSDB)
// Listener with server lis g.POST("/openfalcon", openFalcon)
func Listener(lis net.Listener) ServerOption { g.POST("/remotewrite", remoteWrite)
return func(s *Server) {
s.lis = lis
}
}
type Server struct {
*http.Server
engine *gin.Engine
lis net.Listener
network string
address string
}
func NewServer(opts ...ServerOption) *Server {
srv := &Server{
network: "tcp",
address: ":0",
}
for _, o := range opts {
o(srv)
}
srv.engine = gin.New()
srv.engine.Use(gin.Recovery(), gin.Logger())
srv.Server = &http.Server{
Handler: srv.engine,
}
newRouter(srv)
return srv
}
// ServeHTTP should write reply headers and data to the ResponseWriter and then return.
func (s *Server) ServeHTTP(res http.ResponseWriter, req *http.Request) {
s.Handler.ServeHTTP(res, req)
}
func (s *Server) listenAndEndpoint() error {
if s.lis == nil {
lis, err := net.Listen(s.network, s.address)
if err != nil {
return err
}
s.lis = lis
}
return nil
}
// Start start the HTTP server.
func (s *Server) Start(ctx context.Context) error {
if err := s.listenAndEndpoint(); err != nil {
return err
}
s.BaseContext = func(net.Listener) context.Context {
return ctx
}
log.Printf("[HTTP] server listening on: %s]\n", s.lis.Addr().String())
go func() {
err := s.Serve(s.lis)
if !errors.Is(err, http.ErrServerClosed) {
log.Fatalf("[HTTP] server error: %v", err)
}
}()
return nil
}
// Stop stop the HTTP server.
func (s *Server) Stop(ctx context.Context) error {
log.Println("[HTTP] server stopping")
return s.Shutdown(ctx)
} }

View File

@ -29,10 +29,6 @@ batch = 2000
# channel(as queue) size # channel(as queue) size
chan_size = 10000 chan_size = 10000
[http]
enable = false
address = ":9100"
[[writers]] [[writers]]
url = "http://127.0.0.1:19000/prometheus/v1/write" url = "http://127.0.0.1:19000/prometheus/v1/write"
@ -46,3 +42,9 @@ basic_auth_pass = ""
timeout = 5000 timeout = 5000
dial_timeout = 2500 dial_timeout = 2500
max_idle_conns_per_host = 100 max_idle_conns_per_host = 100
[http]
enable = true
address = ":9100"
print_access = false
run_mode = "release"

View File

@ -44,9 +44,16 @@ type WriterOption struct {
MaxIdleConnsPerHost int `toml:"max_idle_conns_per_host"` MaxIdleConnsPerHost int `toml:"max_idle_conns_per_host"`
} }
type HTTPServer struct { type HTTP struct {
Enable bool `toml:"enable"` Enable bool `toml:"enable"`
Address string `toml:"address"` Address string `toml:"address"`
PrintAccess bool `toml:"print_access"`
RunMode string `toml:"run_mode"`
CertFile string `toml:"cert_file"`
KeyFile string `toml:"key_file"`
ReadTimeout int `toml:"read_timeout"`
WriteTimeout int `toml:"write_timeout"`
IdleTimeout int `toml:"idle_timeout"`
} }
type ConfigType struct { type ConfigType struct {
@ -62,7 +69,7 @@ type ConfigType struct {
Logs Logs `toml:"logs"` Logs Logs `toml:"logs"`
MetricsHouse MetricsHouse `toml:"metricshouse"` MetricsHouse MetricsHouse `toml:"metricshouse"`
Traces *traces.Config `toml:"traces"` Traces *traces.Config `toml:"traces"`
HTTPServer *HTTPServer `toml:"http"` HTTP *HTTP `toml:"http"`
Prometheus *Prometheus `toml:"prometheus"` Prometheus *Prometheus `toml:"prometheus"`
} }

View File

@ -173,7 +173,6 @@ func (s *NfsClient) Init() error {
log.Println("D! Including these mount patterns:", s.IncludeMounts) log.Println("D! Including these mount patterns:", s.IncludeMounts)
} }
} else { } else {
log.Println("Including all mounts.")
if config.Config.DebugMode { if config.Config.DebugMode {
log.Println("D! Including all mounts.") log.Println("D! Including all mounts.")
} }
@ -215,7 +214,9 @@ func (s *NfsClient) Init() error {
func (s *NfsClient) Gather(slist *types.SampleList) { func (s *NfsClient) Gather(slist *types.SampleList) {
file, err := os.Open(s.mountstatsPath) file, err := os.Open(s.mountstatsPath)
if err != nil { if err != nil {
log.Println("E! Failed opening the", file, "file:", err) if config.Config.DebugMode {
log.Println("D! Failed opening the", file, "file:", err)
}
return return
} }
defer file.Close() defer file.Close()

View File

@ -12,6 +12,7 @@ import (
"syscall" "syscall"
"flashcat.cloud/categraf/agent" "flashcat.cloud/categraf/agent"
"flashcat.cloud/categraf/api"
"flashcat.cloud/categraf/config" "flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/house" "flashcat.cloud/categraf/house"
"flashcat.cloud/categraf/pkg/osx" "flashcat.cloud/categraf/pkg/osx"
@ -66,6 +67,8 @@ func main() {
initWriters() initWriters()
go api.Start()
ag := agent.NewAgent(parseFilter(*inputFilters)) ag := agent.NewAgent(parseFilter(*inputFilters))
runAgent(ag) runAgent(ag)
} }

285
pkg/aop/logger.go Normal file
View File

@ -0,0 +1,285 @@
package aop
import (
"fmt"
"io"
"log"
"net/http"
"os"
"time"
"github.com/gin-gonic/gin"
"github.com/mattn/go-isatty"
)
type consoleColorModeValue int
const (
autoColor consoleColorModeValue = iota
disableColor
forceColor
)
var (
green = string([]byte{27, 91, 57, 55, 59, 52, 50, 109})
white = string([]byte{27, 91, 57, 48, 59, 52, 55, 109})
yellow = string([]byte{27, 91, 57, 48, 59, 52, 51, 109})
red = string([]byte{27, 91, 57, 55, 59, 52, 49, 109})
blue = string([]byte{27, 91, 57, 55, 59, 52, 52, 109})
magenta = string([]byte{27, 91, 57, 55, 59, 52, 53, 109})
cyan = string([]byte{27, 91, 57, 55, 59, 52, 54, 109})
reset = string([]byte{27, 91, 48, 109})
consoleColorMode = autoColor
)
// LoggerConfig defines the config for Logger middleware.
type LoggerConfig struct {
// Optional. Default value is gin.defaultLogFormatter
Formatter LogFormatter
// Output is a writer where logs are written.
// Optional. Default value is gin.DefaultWriter.
Output io.Writer
// SkipPaths is a url path array which logs are not written.
// Optional.
SkipPaths []string
}
// LogFormatter gives the signature of the formatter function passed to LoggerWithFormatter
type LogFormatter func(params LogFormatterParams) string
// LogFormatterParams is the structure any formatter will be handed when time to log comes
type LogFormatterParams struct {
Request *http.Request
// TimeStamp shows the time after the server returns a response.
TimeStamp time.Time
// StatusCode is HTTP response code.
StatusCode int
// Latency is how much time the server cost to process a certain request.
Latency time.Duration
// ClientIP equals Context's ClientIP method.
ClientIP string
// Method is the HTTP method given to the request.
Method string
// Path is a path the client requests.
Path string
// ErrorMessage is set if error has occurred in processing the request.
ErrorMessage string
// isTerm shows whether does gin's output descriptor refers to a terminal.
isTerm bool
// BodySize is the size of the Response Body
BodySize int
// Keys are the keys set on the request's context.
Keys map[string]interface{}
}
// StatusCodeColor is the ANSI color for appropriately logging http status code to a terminal.
func (p *LogFormatterParams) StatusCodeColor() string {
code := p.StatusCode
switch {
case code >= http.StatusOK && code < http.StatusMultipleChoices:
return green
case code >= http.StatusMultipleChoices && code < http.StatusBadRequest:
return white
case code >= http.StatusBadRequest && code < http.StatusInternalServerError:
return yellow
default:
return red
}
}
// MethodColor is the ANSI color for appropriately logging http method to a terminal.
func (p *LogFormatterParams) MethodColor() string {
method := p.Method
switch method {
case "GET":
return blue
case "POST":
return cyan
case "PUT":
return yellow
case "DELETE":
return red
case "PATCH":
return green
case "HEAD":
return magenta
case "OPTIONS":
return white
default:
return reset
}
}
// ResetColor resets all escape attributes.
func (p *LogFormatterParams) ResetColor() string {
return reset
}
// IsOutputColor indicates whether can colors be outputted to the log.
func (p *LogFormatterParams) IsOutputColor() bool {
return consoleColorMode == forceColor || (consoleColorMode == autoColor && p.isTerm)
}
// defaultLogFormatter is the default log format function Logger middleware uses.
var defaultLogFormatter = func(param LogFormatterParams) string {
var statusColor, methodColor, resetColor string
if param.IsOutputColor() {
statusColor = param.StatusCodeColor()
methodColor = param.MethodColor()
resetColor = param.ResetColor()
}
if param.Latency > time.Minute {
// Truncate in a golang < 1.8 safe way
param.Latency = param.Latency - param.Latency%time.Second
}
return fmt.Sprintf("[GIN] |%s %3d %s| %13v | %15s |%s %-7s %s %s\n%s",
statusColor, param.StatusCode, resetColor,
param.Latency,
param.ClientIP,
methodColor, param.Method, resetColor,
param.Path,
param.ErrorMessage,
)
}
// DisableConsoleColor disables color output in the console.
func DisableConsoleColor() {
consoleColorMode = disableColor
}
// ForceConsoleColor force color output in the console.
func ForceConsoleColor() {
consoleColorMode = forceColor
}
// ErrorLogger returns a handlerfunc for any error type.
func ErrorLogger() gin.HandlerFunc {
return ErrorLoggerT(gin.ErrorTypeAny)
}
// ErrorLoggerT returns a handlerfunc for a given error type.
func ErrorLoggerT(typ gin.ErrorType) gin.HandlerFunc {
return func(c *gin.Context) {
c.Next()
errors := c.Errors.ByType(typ)
if len(errors) > 0 {
c.JSON(-1, errors)
}
}
}
// Logger instances a Logger middleware that will write the logs to gin.DefaultWriter.
// By default gin.DefaultWriter = os.Stdout.
func Logger() gin.HandlerFunc {
return LoggerWithConfig(LoggerConfig{})
}
// LoggerWithFormatter instance a Logger middleware with the specified log format function.
func LoggerWithFormatter(f LogFormatter) gin.HandlerFunc {
return LoggerWithConfig(LoggerConfig{
Formatter: f,
})
}
// LoggerWithWriter instance a Logger middleware with the specified writer buffer.
// Example: os.Stdout, a file opened in write mode, a socket...
func LoggerWithWriter(out io.Writer, notlogged ...string) gin.HandlerFunc {
return LoggerWithConfig(LoggerConfig{
Output: out,
SkipPaths: notlogged,
})
}
// LoggerWithConfig instance a Logger middleware with config.
func LoggerWithConfig(conf LoggerConfig) gin.HandlerFunc {
formatter := conf.Formatter
if formatter == nil {
formatter = defaultLogFormatter
}
out := conf.Output
if out == nil {
out = os.Stdout
}
notlogged := conf.SkipPaths
isTerm := true
if w, ok := out.(*os.File); !ok || os.Getenv("TERM") == "dumb" ||
(!isatty.IsTerminal(w.Fd()) && !isatty.IsCygwinTerminal(w.Fd())) {
isTerm = false
}
var skip map[string]struct{}
if length := len(notlogged); length > 0 {
skip = make(map[string]struct{}, length)
for _, path := range notlogged {
skip[path] = struct{}{}
}
}
return func(c *gin.Context) {
// Start timer
start := time.Now()
path := c.Request.URL.Path
raw := c.Request.URL.RawQuery
// var (
// rdr1 io.ReadCloser
// rdr2 io.ReadCloser
// )
// if c.Request.Method != "GET" {
// buf, _ := ioutil.ReadAll(c.Request.Body)
// rdr1 = ioutil.NopCloser(bytes.NewBuffer(buf))
// rdr2 = ioutil.NopCloser(bytes.NewBuffer(buf))
// c.Request.Body = rdr2
// }
// Process request
c.Next()
// Log only when path is not being skipped
if _, ok := skip[path]; !ok {
param := LogFormatterParams{
Request: c.Request,
isTerm: isTerm,
Keys: c.Keys,
}
// Stop timer
param.TimeStamp = time.Now()
param.Latency = param.TimeStamp.Sub(start)
param.ClientIP = c.ClientIP()
param.Method = c.Request.Method
param.StatusCode = c.Writer.Status()
param.ErrorMessage = c.Errors.ByType(gin.ErrorTypePrivate).String()
param.BodySize = c.Writer.Size()
if raw != "" {
path = path + "?" + raw
}
param.Path = path
// fmt.Fprint(out, formatter(param))
log.Println("I!", formatter(param))
// if c.Request.Method != "GET" {
// logger.Debug(readBody(rdr1))
// }
}
}
}

165
pkg/aop/recovery.go Normal file
View File

@ -0,0 +1,165 @@
package aop
// Copyright 2014 Manu Martinez-Almeida. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
"net/http/httputil"
"os"
"runtime"
"strings"
"time"
"github.com/gin-gonic/gin"
"github.com/toolkits/pkg/errorx"
"github.com/toolkits/pkg/i18n"
)
var (
dunno = []byte("???")
centerDot = []byte("·")
dot = []byte(".")
slash = []byte("/")
)
// Recovery returns a middleware that recovers from any panics and writes a 500 if there was one.
func Recovery() gin.HandlerFunc {
return RecoveryWithWriter(gin.DefaultErrorWriter)
}
// RecoveryWithWriter returns a middleware for a given writer that recovers from any panics and writes a 500 if there was one.
func RecoveryWithWriter(out io.Writer) gin.HandlerFunc {
var logger *log.Logger
if out != nil {
logger = log.New(out, "\n\n\x1b[31m", log.LstdFlags)
}
return func(c *gin.Context) {
defer func() {
if err := recover(); err != nil {
// custom error
if e, ok := err.(errorx.PageError); ok {
if e.Code != 200 {
c.String(e.Code, i18n.Sprintf(c.GetHeader("X-Language"), e.Message))
} else {
c.JSON(e.Code, gin.H{"err": i18n.Sprintf(c.GetHeader("X-Language"), e.Message)})
}
c.Abort()
return
}
// Check for a broken connection, as it is not really a
// condition that warrants a panic stack trace.
var brokenPipe bool
if ne, ok := err.(*net.OpError); ok {
if se, ok := ne.Err.(*os.SyscallError); ok {
if strings.Contains(strings.ToLower(se.Error()), "broken pipe") || strings.Contains(strings.ToLower(se.Error()), "connection reset by peer") {
brokenPipe = true
}
}
}
if logger != nil {
stack := stack(3)
httpRequest, _ := httputil.DumpRequest(c.Request, false)
headers := strings.Split(string(httpRequest), "\r\n")
for idx, header := range headers {
current := strings.Split(header, ":")
if current[0] == "Authorization" {
headers[idx] = current[0] + ": *"
}
}
if brokenPipe {
logger.Printf("%s\n%s%s", err, string(httpRequest), reset)
} else if gin.IsDebugging() {
logger.Printf("[Recovery] %s panic recovered:\n%s\n%s\n%s%s",
timeFormat(time.Now()), strings.Join(headers, "\r\n"), err, stack, reset)
} else {
logger.Printf("[Recovery] %s panic recovered:\n%s\n%s%s",
timeFormat(time.Now()), err, stack, reset)
}
}
// If the connection is dead, we can't write a status to it.
if brokenPipe {
c.Error(err.(error)) // nolint: errcheck
c.Abort()
} else {
c.AbortWithStatus(http.StatusInternalServerError)
}
}
}()
c.Next()
}
}
// stack returns a nicely formatted stack frame, skipping skip frames.
func stack(skip int) []byte {
buf := new(bytes.Buffer) // the returned data
// As we loop, we open files and read them. These variables record the currently
// loaded file.
var lines [][]byte
var lastFile string
for i := skip; ; i++ { // Skip the expected number of frames
pc, file, line, ok := runtime.Caller(i)
if !ok {
break
}
// Print this much at least. If we can't find the source, it won't show.
fmt.Fprintf(buf, "%s:%d (0x%x)\n", file, line, pc)
if file != lastFile {
data, err := ioutil.ReadFile(file)
if err != nil {
continue
}
lines = bytes.Split(data, []byte{'\n'})
lastFile = file
}
fmt.Fprintf(buf, "\t%s: %s\n", function(pc), source(lines, line))
}
return buf.Bytes()
}
// source returns a space-trimmed slice of the n'th line.
func source(lines [][]byte, n int) []byte {
n-- // in stack trace, lines are 1-indexed but our array is 0-indexed
if n < 0 || n >= len(lines) {
return dunno
}
return bytes.TrimSpace(lines[n])
}
// function returns, if possible, the name of the function containing the PC.
func function(pc uintptr) []byte {
fn := runtime.FuncForPC(pc)
if fn == nil {
return dunno
}
name := []byte(fn.Name())
// The name includes the path name to the package, which is unnecessary
// since the file name is already included. Plus, it has center dots.
// That is, we see
// runtime/debug.*T·ptrmethod
// and want
// *T.ptrmethod
// Also the package path might contains dot (e.g. code.google.com/...),
// so first eliminate the path prefix
if lastSlash := bytes.LastIndex(name, slash); lastSlash >= 0 {
name = name[lastSlash+1:]
}
if period := bytes.Index(name, dot); period >= 0 {
name = name[period+1:]
}
name = bytes.Replace(name, centerDot, dot, -1)
return name
}
func timeFormat(t time.Time) string {
return t.Format("2006/01/02 - 15:04:05")
}