add clickhouse as output plugin, beta version

This commit is contained in:
Ulric Qin 2022-06-17 19:04:02 +08:00
parent 1363664dfd
commit 52e372adf3
10 changed files with 354 additions and 37 deletions

View File

@ -9,6 +9,7 @@ import (
"time"
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/house"
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/types"
"flashcat.cloud/categraf/writer"
@ -19,6 +20,8 @@ import (
const agentHostnameLabelKey = "agent_hostname"
var metricReplacer = strings.NewReplacer("-", "_", ".", "_")
type Reader struct {
Instance inputs.Input
QuitChan chan struct{}
@ -87,12 +90,34 @@ func (r *Reader) gatherOnce() {
}
if len(r.Instance.Prefix()) > 0 {
s.Metric = r.Instance.Prefix() + "_" + strings.ReplaceAll(s.Metric, "-", "_")
s.Metric = r.Instance.Prefix() + "_" + metricReplacer.Replace(s.Metric)
} else {
s.Metric = strings.ReplaceAll(s.Metric, "-", "_")
s.Metric = metricReplacer.Replace(s.Metric)
}
if s.Labels == nil {
s.Labels = make(map[string]string)
}
// add label: agent_hostname
if _, has := s.Labels[agentHostnameLabelKey]; !has {
if !config.Config.Global.OmitHostname {
s.Labels[agentHostnameLabelKey] = config.Config.GetHostname()
}
}
// add global labels
for k, v := range config.Config.Global.Labels {
if _, has := s.Labels[k]; !has {
s.Labels[k] = v
}
}
// write to remote write queue
r.Queue <- s
// write to clickhouse queue
house.MetricsHouse.Push(s)
}
}
@ -179,24 +204,6 @@ func postSeries(series []*prompb.TimeSeries) {
}
func convert(item *types.Sample) *prompb.TimeSeries {
if item.Labels == nil {
item.Labels = make(map[string]string)
}
// add label: agent_hostname
if _, has := item.Labels[agentHostnameLabelKey]; !has {
if !config.Config.Global.OmitHostname {
item.Labels[agentHostnameLabelKey] = config.Config.GetHostname()
}
}
// add global labels
for k, v := range config.Config.Global.Labels {
if _, has := item.Labels[k]; !has {
item.Labels[k] = v
}
}
pt := &prompb.TimeSeries{}
timestamp := item.Timestamp.UnixMilli()

15
conf/clickhouse.toml Normal file
View File

@ -0,0 +1,15 @@
# clickhouse output plugin, sending metrics to clickhouse
[metricshouse]
enable = false
debug = false
endpoints = ["10.1.1.7:9000", "10.1.1.8:9000"]
database = "default"
table = ""
username = ""
password = ""
dial_timeout = "1s"
max_open_conns = 10
max_idle_conns = 5
conn_max_lifetime = "1h"
queue_size = 100000
batch_size = 10000

17
config/clickhouse.go Normal file
View File

@ -0,0 +1,17 @@
package config
type MetricsHouse struct {
Enable bool `toml:"enable"`
Debug bool `toml:"debug"`
Endpoints []string `toml:"endpoints"`
Database string `toml:"database"`
Table string `toml:"table"`
Username string `toml:"username"`
Password string `toml:"password"`
DialTimeout Duration `toml:"dial_timeout"`
MaxOpenConns int `toml:"max_open_conns"`
MaxIdleConns int `toml:"max_idle_conns"`
ConnMaxLifetime Duration `toml:"conn_max_lifetime"`
QueueSize int `toml:"queue_size"`
BatchSize int `toml:"batch_size"`
}

View File

@ -50,10 +50,11 @@ type ConfigType struct {
TestMode bool
// from config.toml
Global Global `toml:"global"`
WriterOpt WriterOpt `toml:"writer_opt"`
Writers []WriterOption `toml:"writers"`
Logs Logs `toml:"logs"`
Global Global `toml:"global"`
WriterOpt WriterOpt `toml:"writer_opt"`
Writers []WriterOption `toml:"writers"`
Logs Logs `toml:"logs"`
MetricsHouse MetricsHouse `toml:"metricshouse"`
}
var Config *ConfigType

14
go.mod
View File

@ -3,6 +3,7 @@ module flashcat.cloud/categraf
go 1.18
require (
github.com/ClickHouse/clickhouse-go/v2 v2.0.15
github.com/chai2010/winsvc v0.0.0-20200705094454-db7ec320025c
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf
github.com/docker/docker v20.10.16+incompatible
@ -27,11 +28,11 @@ require (
github.com/prometheus/common v0.32.1
github.com/prometheus/prometheus v2.5.0+incompatible
github.com/shirou/gopsutil/v3 v3.22.3
github.com/stretchr/testify v1.7.1
github.com/stretchr/testify v1.7.2
github.com/toolkits/pkg v1.3.0
github.com/ulricqin/gosnmp v0.0.1
golang.org/x/net v0.0.0-20210525063256-abc453219eb5
golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27
golang.org/x/sys v0.0.0-20220429233432-b5fbb4746d32
golang.org/x/text v0.3.7
)
@ -54,7 +55,7 @@ require (
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/godror/knownpb v0.1.0 // indirect
github.com/google/uuid v1.2.0 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.1 // indirect
github.com/hashicorp/go-hclog v0.12.0 // indirect
@ -71,20 +72,25 @@ require (
github.com/morikuni/aec v1.0.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/paulmach/orb v0.7.1 // indirect
github.com/pierrec/lz4/v4 v4.1.14 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/shopspring/decimal v1.3.1 // indirect
github.com/sirupsen/logrus v1.7.0 // indirect
github.com/stretchr/objx v0.1.1 // indirect
github.com/tklauser/go-sysconf v0.3.10 // indirect
github.com/tklauser/numcpus v0.4.0 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.opentelemetry.io/otel v1.7.0 // indirect
go.opentelemetry.io/otel/trace v1.7.0 // indirect
go.uber.org/automaxprocs v1.4.0 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
google.golang.org/genproto v0.0.0-20200825200019-8632dd797987 // indirect
google.golang.org/grpc v1.33.1 // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
gotest.tools/v3 v3.2.0 // indirect
)

44
go.sum
View File

@ -37,8 +37,12 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I=
github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/ClickHouse/clickhouse-go v1.5.4/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI=
github.com/ClickHouse/clickhouse-go/v2 v2.0.15 h1:lLAZliqrZEygkxosLaW1qHyeTb4Ho7fVCZ0WKCpLocU=
github.com/ClickHouse/clickhouse-go/v2 v2.0.15/go.mod h1:Z21o82zD8FFqefOQDg93c0XITlxGbTsWQuRm588Azkk=
github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VMrpA=
github.com/Microsoft/go-winio v0.5.2/go.mod h1:WpS1mjBmmwHBEWmogvA2mj8546UReBk4v8QkMxJ6pZY=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
@ -56,6 +60,7 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bkaradzic/go-lz4 v1.0.0/go.mod h1:0YdlkowM3VswSROI7qDxhRvJ3sLhlFrRRwjwegp5jy4=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
@ -66,6 +71,7 @@ github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWR
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58/go.mod h1:EOBUe0h4xcZ5GoxqC5SDxFQ8gwyZPKQoEzownBlhI80=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf h1:iW4rZ826su+pqaw19uhpSCzhj44qo35pNgKFGqzDKkU=
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
@ -119,8 +125,11 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-ping/ping v0.0.0-20211130115550-779d1e919534 h1:dhy9OQKGBh4zVXbjwbxxHjRxMJtLXj3zfgpBYQaR4Q4=
@ -131,6 +140,7 @@ github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+
github.com/go-playground/validator/v10 v10.4.1/go.mod h1:nlOn6nFhuKACm19sB/8EGNn9GlaMV7XkbRSipzJ0Ii4=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE=
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
@ -201,10 +211,13 @@ github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hf
github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/gorilla/handlers v1.4.2/go.mod h1:Qkdc/uu4tH4g6mTK6auzZ766c4CA0Ng8+o/OAirnOIQ=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/hashicorp/consul/api v1.13.0 h1:2hnLQ0GjQvw7f3O61jMO8gbasZviZTrt9R8WzgiirHc=
@ -249,6 +262,7 @@ github.com/influxdata/line-protocol/v2 v2.0.0-20210312151457-c52fdecb625a/go.mod
github.com/influxdata/line-protocol/v2 v2.1.0/go.mod h1:QKw43hdUBg3GTk2iC3iyCxksNj7PX9aUSeYOYE/ceHY=
github.com/influxdata/line-protocol/v2 v2.2.1 h1:EAPkqJ9Km4uAxtMRgUubJyqAr6zgWM0dznKMLRauQRE=
github.com/influxdata/line-protocol/v2 v2.2.1/go.mod h1:DmB3Cnh+3oxmG6LOBIxce4oaL4CPj3OmMPgvauXh+tM=
github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks=
github.com/jmoiron/sqlx v1.3.5 h1:vFFPA71p1o5gAeqtEAwLU4dnX2napprKtHr7PYIcN3g=
github.com/jmoiron/sqlx v1.3.5/go.mod h1:nRVWtLre0KfCLJvgxzCsLVMogSvQ1zNJtpYr2Ccp0mQ=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
@ -279,6 +293,7 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
@ -293,6 +308,7 @@ github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcME
github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE=
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg=
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
@ -308,6 +324,7 @@ github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eI
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mkevac/debugcharts v0.0.0-20191222103121-ae1c48aa8615/go.mod h1:Ad7oeElCZqA1Ufj0U9/liOF4BtVepxRcTvr2ey7zTvM=
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 h1:dcztxKSvZ4Id8iPpHERQBbIJfabdt4wUm5qy3wOL2Zc=
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6/go.mod h1:E2VnQOmVuvZB6UYnnDB0qG5Nq/1tD9acaOpo6xmt0Kw=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
@ -334,7 +351,13 @@ github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c h1:Lgl0gzECD8GnQ5
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/paulmach/orb v0.7.1 h1:Zha++Z5OX/l168sqHK3k4z18LDvr+YAO/VjK0ReQ9rU=
github.com/paulmach/orb v0.7.1/go.mod h1:FWRlTgl88VI1RBx/MkrwWDRhQ96ctqMCh8boXhmqB/A=
github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY=
github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4/v4 v4.1.14 h1:+fL8AQEZtz/ijeNnpduH0bROTu0O3NZAlPjQxGn8LwE=
github.com/pierrec/lz4/v4 v4.1.14/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
@ -374,8 +397,13 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/shirou/gopsutil v2.19.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/gopsutil/v3 v3.22.3 h1:UebRzEomgMpv61e3hgD1tGooqX5trFbdU/ehphbHd00=
github.com/shirou/gopsutil/v3 v3.22.3/go.mod h1:D01hZJ4pVHPpCTZ3m3T2+wDF2YAGfd+H4ifUguaQzHM=
github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4/go.mod h1:qsXQc7+bwAM3Q1u/4XEfrquwF8Lw7D7y5cD8CuHnfIc=
github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8=
github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
@ -389,8 +417,9 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.2 h1:4jaiDzPyXQvSd7D0EjG45355tLlV3VOECpq10pLC+8s=
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
github.com/tklauser/go-sysconf v0.3.10 h1:IJ1AZGZRWbY8T5Vfk04D9WOA5WSejdflXxP03OUqALw=
github.com/tklauser/go-sysconf v0.3.10/go.mod h1:C8XykCvCb+Gn0oNCWPIlcb0RuglQTYaQ2hGm7jmxEFk=
github.com/tklauser/numcpus v0.4.0 h1:E53Dm1HjH1/R2/aoCtXtPgzmElmn51aOkhCFSuZq//o=
@ -412,6 +441,10 @@ go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opentelemetry.io/otel v1.7.0 h1:Z2lA3Tdch0iDcrhJXDIlC94XE+bxok1F9B+4Lz/lGsM=
go.opentelemetry.io/otel v1.7.0/go.mod h1:5BdUoMIz5WEs0vt0CUEMtSSaTSHBBVwrhnz7+nrD5xk=
go.opentelemetry.io/otel/trace v1.7.0 h1:O37Iogk1lEkMRXewVtZ1BBTVn5JEp8GrJvP92bJqC6o=
go.opentelemetry.io/otel/trace v1.7.0/go.mod h1:fzLSB9nqR2eXzxPXb2JW9IKE+ScyXA48yyE4TNvoHqU=
go.uber.org/automaxprocs v1.4.0 h1:CpDZl6aOlLhReez+8S3eEotD7Jx0Os++lemPlMULQP0=
go.uber.org/automaxprocs v1.4.0/go.mod h1:/mTEdr7LvHhs0v7mjdxDreTz1OG5zdZGqgOnhWiR/+Q=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
@ -526,6 +559,7 @@ golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191220220014-0732a990476f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200113162924-86b910548bc1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -557,8 +591,9 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27 h1:XDXtA5hveEEV8JB2l7nhMTp3t3cHp9ZpwcdjqyEWLlo=
golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220429233432-b5fbb4746d32 h1:Js08h5hqB5xyWR789+QqueR6sDE8mk+YvpETZ+F6X9Y=
golang.org/x/sys v0.0.0-20220429233432-b5fbb4746d32/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@ -721,8 +756,9 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 h1:tQIYjPdBoyREyB9XMu+nnTclpTYkz2zFM+lzLJFO4gQ=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk=
gotest.tools/v3 v3.2.0 h1:I0DwBVMGAx26dttAj1BtJLAkVGncrkkUXfJLC4Flt/I=
gotest.tools/v3 v3.2.0/go.mod h1:Mcr9QNxkg0uMvy/YElmo4SpXgJKWgQvYrT7Kw5RzJ1A=

194
house/house.go Normal file
View File

@ -0,0 +1,194 @@
package house
// refactor from [lishijiang](https://github.com/lishijiang)'s version
import (
"context"
"errors"
"fmt"
"log"
"math/rand"
"time"
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/types"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
)
type MetricsHouseType struct {
Opts config.MetricsHouse
Chan chan *types.Sample
Conns map[string]driver.Conn
}
var MetricsHouse *MetricsHouseType
func InitMetricsHouse() error {
opts := config.Config.MetricsHouse
if !opts.Enable {
return nil
}
if opts.Database == "" {
return errors.New("configuration database is blank")
}
if opts.Table == "" {
return errors.New("configuration table is blank")
}
if len(opts.Endpoints) == 0 {
return errors.New("configuration endpoints is empty")
}
if opts.BatchSize <= 0 {
opts.BatchSize = 10000
}
MetricsHouse = &MetricsHouseType{
Opts: opts,
Chan: make(chan *types.Sample, opts.QueueSize),
Conns: make(map[string]driver.Conn),
}
if err := MetricsHouse.connect(); err != nil {
return err
}
if err := MetricsHouse.createTable(); err != nil {
return err
}
go MetricsHouse.consume()
return nil
}
func (mh *MetricsHouseType) consume() {
batch := mh.Opts.BatchSize
series := make([]*types.Sample, 0, batch)
var count int
for {
select {
case item, open := <-mh.Chan:
if !open {
// queue closed
return
}
if item == nil {
continue
}
series = append(series, item)
count++
if count >= batch {
mh.postSeries(series)
count = 0
series = make([]*types.Sample, 0, batch)
}
default:
if len(series) > 0 {
mh.postSeries(series)
count = 0
series = make([]*types.Sample, 0, batch)
}
time.Sleep(time.Second * 5)
}
}
}
func (mh *MetricsHouseType) postSeries(series []*types.Sample) {
count := len(mh.Opts.Endpoints)
for _, i := range rand.Perm(count) {
ep := mh.Opts.Endpoints[i]
conn := mh.Conns[ep]
if err := mh.post(conn, series); err == nil {
return
} else {
log.Println("E! failed to post series to clickhouse:", ep, "error:", err)
}
}
}
func (mh *MetricsHouseType) post(conn driver.Conn, series []*types.Sample) error {
batch, err := conn.PrepareBatch(context.Background(), "INSERT INTO "+mh.Opts.Table)
if err != nil {
return err
}
for _, e := range series {
err := batch.Append(
e.Timestamp, //会自动转换时间格式
e.Timestamp, //会自动转换时间格式
e.Metric,
convertTags(e),
e.Value,
)
if err != nil {
return err
}
}
return batch.Send()
}
func (mh *MetricsHouseType) Push(s *types.Sample) {
mh.Chan <- s
}
const SQLCreateTable = `
CREATE TABLE IF NOT EXISTS %s (
event_time DateTime
, event_date Date
, metric LowCardinality(String)
, tags String
, value Float64
) ENGINE = MergeTree
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_time, metric, tags)
TTL event_date + toIntervalDay(365)
SETTINGS index_granularity = 8192
`
func (mh *MetricsHouseType) createTable() error {
for _, conn := range mh.Conns {
if err := conn.Exec(context.Background(), fmt.Sprintf(SQLCreateTable, mh.Opts.Table)); err != nil {
return fmt.Errorf("failed to create table: %v", err)
}
}
return nil
}
func (mh *MetricsHouseType) connect() error {
for _, endpoint := range mh.Opts.Endpoints {
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{endpoint},
Auth: clickhouse.Auth{
Database: mh.Opts.Database,
Username: mh.Opts.Username,
Password: mh.Opts.Password,
},
Debug: mh.Opts.Debug,
DialTimeout: time.Duration(mh.Opts.DialTimeout),
MaxOpenConns: mh.Opts.MaxOpenConns,
MaxIdleConns: mh.Opts.MaxIdleConns,
ConnMaxLifetime: time.Duration(mh.Opts.ConnMaxLifetime),
})
if err != nil {
return fmt.Errorf("failed to open clickhouse(%s) connection: %v", endpoint, err)
}
if _, has := mh.Conns[endpoint]; has {
return fmt.Errorf("duplicate endpoint: %s", endpoint)
}
mh.Conns[endpoint] = conn
}
return nil
}

32
house/types.go Normal file
View File

@ -0,0 +1,32 @@
package house
import (
"encoding/json"
"strings"
"time"
"flashcat.cloud/categraf/types"
)
var labelReplacer = strings.NewReplacer("-", "_", ".", "_", " ", "_", "/", "_")
type Sample struct {
Timestamp time.Time `json:"timestamp"`
Metric string `json:"metric"`
Tags string `json:"tags"`
Value float64 `json:"value"`
}
func convertTags(s *types.Sample) string {
tags := make(map[string]string)
for k, v := range s.Labels {
tags[labelReplacer.Replace(k)] = v
}
bs, err := json.Marshal(tags)
if err != nil {
return ""
}
return string(bs)
}

16
main.go
View File

@ -13,6 +13,7 @@ import (
"flashcat.cloud/categraf/agent"
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/house"
"flashcat.cloud/categraf/pkg/osx"
"flashcat.cloud/categraf/writer"
"github.com/chai2010/winsvc"
@ -62,15 +63,22 @@ func main() {
log.Fatalln("F! failed to init config:", err)
}
// init writers
if err := writer.Init(config.Config.Writers); err != nil {
log.Fatalln("F! failed to init writer:", err)
}
initWriters()
ag := agent.NewAgent(parseFilter(*inputFilters))
runAgent(ag)
}
func initWriters() {
if err := writer.InitWriters(); err != nil {
log.Fatalln("F! failed to init writer:", err)
}
if err := house.InitMetricsHouse(); err != nil {
log.Fatalln("F! failed to init metricshouse:", err)
}
}
func handleSignal(ag *agent.Agent) {
sc := make(chan os.Signal, 1)
signal.Notify(sc, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGPIPE)

View File

@ -74,7 +74,8 @@ func (w WriterType) Post(req []byte) error {
var Writers = make(map[string]WriterType)
func Init(opts []config.WriterOption) error {
func InitWriters() error {
opts := config.Config.Writers
for i := 0; i < len(opts); i++ {
cli, err := api.NewClient(api.Config{
Address: opts[i].Url,