commit
4c5d4252f5
|
@ -5,3 +5,4 @@
|
|||
/vendor
|
||||
docker/conf
|
||||
docker/categraf
|
||||
/build
|
||||
|
|
|
@ -34,11 +34,13 @@ import (
|
|||
_ "flashcat.cloud/categraf/inputs/system"
|
||||
_ "flashcat.cloud/categraf/inputs/tomcat"
|
||||
_ "flashcat.cloud/categraf/inputs/zookeeper"
|
||||
"flashcat.cloud/categraf/traces"
|
||||
)
|
||||
|
||||
type Agent struct {
|
||||
InputFilters map[string]struct{}
|
||||
InputReaders map[string]*InputReader
|
||||
InputFilters map[string]struct{}
|
||||
InputReaders map[string]*InputReader
|
||||
TraceCollector *traces.Collector
|
||||
}
|
||||
|
||||
func NewAgent(filters map[string]struct{}) *Agent {
|
||||
|
@ -52,6 +54,7 @@ func (a *Agent) Start() {
|
|||
log.Println("I! agent starting")
|
||||
a.startLogAgent()
|
||||
a.startMetricsAgent()
|
||||
a.startTracesAgent()
|
||||
log.Println("I! agent started")
|
||||
}
|
||||
|
||||
|
@ -59,6 +62,7 @@ func (a *Agent) Stop() {
|
|||
log.Println("I! agent stopping")
|
||||
a.stopLogAgent()
|
||||
a.stopMetricsAgent()
|
||||
a.stopTracesAgent()
|
||||
log.Println("I! agent stopped")
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,52 @@
|
|||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
|
||||
"flashcat.cloud/categraf/config"
|
||||
"flashcat.cloud/categraf/traces"
|
||||
)
|
||||
|
||||
func (a *Agent) startTracesAgent() (err error) {
|
||||
if config.Config.Traces == nil || !config.Config.Traces.Enable {
|
||||
return nil
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
log.Println("E! failed to start tracing agent:", err)
|
||||
}
|
||||
}()
|
||||
|
||||
col, err := traces.New(config.Config.Traces)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = col.Run(context.Background())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
a.TraceCollector = col
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *Agent) stopTracesAgent() (err error) {
|
||||
if config.Config.Traces == nil || !config.Config.Traces.Enable {
|
||||
return nil
|
||||
}
|
||||
|
||||
if a.TraceCollector == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
log.Println("E! failed to stop tracing agent:", err)
|
||||
}
|
||||
}()
|
||||
|
||||
return a.TraceCollector.Shutdown(context.Background())
|
||||
}
|
|
@ -0,0 +1,41 @@
|
|||
# This is an example:
|
||||
# receive spans from jaeger, send to jaeger and zipkin, with three extensions enabled.
|
||||
# See factories we already supported:
|
||||
# ./config/traces/components.go
|
||||
# For more details, see the OpenTelemetry official docs:
|
||||
# https://opentelemetry.io/docs/collector/configuration/
|
||||
traces:
|
||||
enable: true
|
||||
extensions:
|
||||
health_check:
|
||||
pprof:
|
||||
endpoint: 0.0.0.0:1777
|
||||
zpages:
|
||||
endpoint: 0.0.0.0:55679
|
||||
|
||||
receivers:
|
||||
jaeger:
|
||||
protocols:
|
||||
thrift_http:
|
||||
endpoint: 0.0.0.0:14268
|
||||
|
||||
processors:
|
||||
batch:
|
||||
|
||||
exporters:
|
||||
jaeger:
|
||||
endpoint: "127.0.0.1:14250"
|
||||
tls:
|
||||
insecure: true
|
||||
|
||||
zipkin:
|
||||
endpoint: "http://127.0.0.1:9411/api/v2/spans"
|
||||
|
||||
service:
|
||||
pipelines:
|
||||
traces:
|
||||
receivers: [jaeger]
|
||||
processors: [batch]
|
||||
exporters: [jaeger,zipkin]
|
||||
|
||||
extensions: [health_check, pprof, zpages]
|
|
@ -9,6 +9,7 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"flashcat.cloud/categraf/config/traces"
|
||||
"flashcat.cloud/categraf/pkg/cfg"
|
||||
"github.com/toolkits/pkg/file"
|
||||
)
|
||||
|
@ -55,6 +56,7 @@ type ConfigType struct {
|
|||
Writers []WriterOption `toml:"writers"`
|
||||
Logs Logs `toml:"logs"`
|
||||
MetricsHouse MetricsHouse `toml:"metricshouse"`
|
||||
Traces *traces.Config `toml:"traces"`
|
||||
}
|
||||
|
||||
var Config *ConfigType
|
||||
|
@ -83,6 +85,10 @@ func InitConfig(configDir string, debugMode bool, testMode bool) error {
|
|||
return err
|
||||
}
|
||||
|
||||
if err := traces.Parse(Config.Traces); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if Config.Global.PrintConfigs {
|
||||
bs, _ := json.MarshalIndent(Config, "", " ")
|
||||
fmt.Println(string(bs))
|
||||
|
|
|
@ -0,0 +1,77 @@
|
|||
package traces
|
||||
|
||||
import (
|
||||
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/alibabacloudlogserviceexporter"
|
||||
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/jaegerexporter"
|
||||
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"
|
||||
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/zipkinexporter"
|
||||
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension"
|
||||
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/oauth2clientauthextension"
|
||||
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/pprofextension"
|
||||
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor"
|
||||
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor"
|
||||
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerreceiver"
|
||||
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver"
|
||||
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinreceiver"
|
||||
"go.opentelemetry.io/collector/component"
|
||||
"go.opentelemetry.io/collector/exporter/otlpexporter"
|
||||
"go.opentelemetry.io/collector/exporter/otlphttpexporter"
|
||||
"go.opentelemetry.io/collector/extension/ballastextension"
|
||||
"go.opentelemetry.io/collector/extension/zpagesextension"
|
||||
"go.opentelemetry.io/collector/processor/batchprocessor"
|
||||
"go.opentelemetry.io/collector/processor/memorylimiterprocessor"
|
||||
"go.opentelemetry.io/collector/receiver/otlpreceiver"
|
||||
)
|
||||
|
||||
// Add more factories here if you need
|
||||
func components() (component.Factories, error) {
|
||||
extensions, err := component.MakeExtensionFactoryMap(
|
||||
oauth2clientauthextension.NewFactory(),
|
||||
healthcheckextension.NewFactory(),
|
||||
ballastextension.NewFactory(),
|
||||
zpagesextension.NewFactory(),
|
||||
pprofextension.NewFactory(),
|
||||
)
|
||||
if err != nil {
|
||||
return component.Factories{}, err
|
||||
}
|
||||
|
||||
receivers, err := component.MakeReceiverFactoryMap(
|
||||
jaegerreceiver.NewFactory(),
|
||||
zipkinreceiver.NewFactory(),
|
||||
otlpreceiver.NewFactory(),
|
||||
kafkareceiver.NewFactory(),
|
||||
)
|
||||
if err != nil {
|
||||
return component.Factories{}, err
|
||||
}
|
||||
|
||||
exporters, err := component.MakeExporterFactoryMap(
|
||||
otlpexporter.NewFactory(),
|
||||
otlphttpexporter.NewFactory(),
|
||||
jaegerexporter.NewFactory(),
|
||||
zipkinexporter.NewFactory(),
|
||||
kafkaexporter.NewFactory(),
|
||||
alibabacloudlogserviceexporter.NewFactory(),
|
||||
)
|
||||
if err != nil {
|
||||
return component.Factories{}, err
|
||||
}
|
||||
|
||||
processors, err := component.MakeProcessorFactoryMap(
|
||||
batchprocessor.NewFactory(),
|
||||
memorylimiterprocessor.NewFactory(),
|
||||
attributesprocessor.NewFactory(),
|
||||
tailsamplingprocessor.NewFactory(),
|
||||
)
|
||||
if err != nil {
|
||||
return component.Factories{}, err
|
||||
}
|
||||
|
||||
return component.Factories{
|
||||
Extensions: extensions,
|
||||
Receivers: receivers,
|
||||
Processors: processors,
|
||||
Exporters: exporters,
|
||||
}, nil
|
||||
}
|
|
@ -0,0 +1,70 @@
|
|||
package traces
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"go.opentelemetry.io/collector/component"
|
||||
"go.opentelemetry.io/collector/config"
|
||||
"go.opentelemetry.io/collector/confmap"
|
||||
"go.opentelemetry.io/collector/confmap/converter/expandconverter"
|
||||
"go.opentelemetry.io/collector/confmap/provider/yamlprovider"
|
||||
"go.opentelemetry.io/collector/service"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
// Config defines the OpenTelemetry Collector configuration.
|
||||
// Enable: enable tracing or not.
|
||||
// UnParsed: loaded as map[string]interface{} from the raw config file.
|
||||
// Parsed: retrieved and validated from the UnParsed contents.
|
||||
// Factories: struct holds in a single type all component factories that can be handled by the Config.
|
||||
// We only create the needed factories as default, if you need more, import and init these by components.go
|
||||
type Config struct {
|
||||
Enable bool `toml:"enable" yaml:"enable" json:"enable"`
|
||||
UnParsed map[string]interface{} `toml:",inline" yaml:",inline" json:",inline"`
|
||||
Parsed *config.Config `toml:"-" yaml:"-" json:"parsed"`
|
||||
Factories component.Factories `toml:"-" yaml:"-" json:"-"`
|
||||
}
|
||||
|
||||
// Parse parse the UnParsed contents to Parsed
|
||||
func Parse(c *Config) error {
|
||||
if c == nil || len(c.UnParsed) == 0 || !c.Enable {
|
||||
log.Println("I! tracing disabled")
|
||||
return nil
|
||||
}
|
||||
|
||||
ymlCfg, err := yaml.Marshal(c.UnParsed)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to marshal trace config, %v", err)
|
||||
}
|
||||
|
||||
provider, err := service.NewConfigProvider(service.ConfigProviderSettings{
|
||||
Locations: []string{"yaml:" + string(ymlCfg)},
|
||||
MapProviders: makeMapProvidersMap(yamlprovider.New()),
|
||||
MapConverters: []confmap.Converter{expandconverter.New()},
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to new config provider: %v", err)
|
||||
}
|
||||
|
||||
c.Factories, err = components()
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to init otlp factories: %v", err)
|
||||
}
|
||||
|
||||
c.Parsed, err = provider.Get(context.Background(), c.Factories)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse trace config: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func makeMapProvidersMap(providers ...confmap.Provider) map[string]confmap.Provider {
|
||||
ret := make(map[string]confmap.Provider, len(providers))
|
||||
for _, provider := range providers {
|
||||
ret[provider.Scheme()] = provider
|
||||
}
|
||||
return ret
|
||||
}
|
144
go.mod
144
go.mod
|
@ -4,7 +4,7 @@ go 1.18
|
|||
|
||||
require (
|
||||
github.com/ClickHouse/clickhouse-go/v2 v2.0.15
|
||||
github.com/Shopify/sarama v1.34.0
|
||||
github.com/Shopify/sarama v1.34.1
|
||||
github.com/chai2010/winsvc v0.0.0-20200705094454-db7ec320025c
|
||||
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf
|
||||
github.com/docker/docker v20.10.16+incompatible
|
||||
|
@ -22,26 +22,56 @@ require (
|
|||
github.com/jmoiron/sqlx v1.3.5
|
||||
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
|
||||
github.com/koding/multiconfig v0.0.0-20171124222453-69c27309b2d7
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.1
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/alibabacloudlogserviceexporter v0.54.0
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/jaegerexporter v0.54.0
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter v0.54.0
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/zipkinexporter v0.54.0
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension v0.54.0
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/extension/oauth2clientauthextension v0.54.0
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/extension/pprofextension v0.54.0
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor v0.54.0
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor v0.54.0
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerreceiver v0.54.0
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver v0.54.0
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinreceiver v0.54.0
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible
|
||||
github.com/prometheus/client_golang v1.12.1
|
||||
github.com/prometheus/client_golang v1.12.2
|
||||
github.com/prometheus/client_model v0.2.0
|
||||
github.com/prometheus/common v0.32.1
|
||||
github.com/prometheus/common v0.35.0
|
||||
github.com/prometheus/prometheus v2.5.0+incompatible
|
||||
github.com/shirou/gopsutil/v3 v3.22.3
|
||||
github.com/stretchr/testify v1.7.2
|
||||
github.com/shirou/gopsutil/v3 v3.22.5
|
||||
github.com/stretchr/testify v1.7.4
|
||||
github.com/toolkits/pkg v1.3.0
|
||||
github.com/ulricqin/gosnmp v0.0.1
|
||||
golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2
|
||||
golang.org/x/sys v0.0.0-20220429233432-b5fbb4746d32
|
||||
go.opentelemetry.io/collector v0.54.0
|
||||
go.opentelemetry.io/otel/metric v0.30.0
|
||||
go.opentelemetry.io/otel/trace v1.7.0
|
||||
go.uber.org/multierr v1.8.0
|
||||
go.uber.org/zap v1.21.0
|
||||
golang.org/x/net v0.0.0-20220607020251-c690dde0001d
|
||||
golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d
|
||||
golang.org/x/text v0.3.7
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
)
|
||||
|
||||
require (
|
||||
cloud.google.com/go/compute v1.6.1 // indirect
|
||||
contrib.go.opencensus.io/exporter/prometheus v0.4.1 // indirect
|
||||
github.com/BurntSushi/toml v1.1.0 // indirect
|
||||
github.com/HdrHistogram/hdrhistogram-go v1.1.0 // indirect
|
||||
github.com/Microsoft/go-winio v0.5.2 // indirect
|
||||
github.com/VividCortex/gohistogram v1.0.0 // indirect
|
||||
github.com/aliyun/aliyun-log-go-sdk v0.1.36 // indirect
|
||||
github.com/alouca/gologger v0.0.0-20120904114645-7d4b7291de9c // indirect
|
||||
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect
|
||||
github.com/antonmedv/expr v1.9.0 // indirect
|
||||
github.com/apache/thrift v0.16.0 // indirect
|
||||
github.com/armon/go-metrics v0.3.10 // indirect
|
||||
github.com/aws/aws-sdk-go v1.44.38 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
|
||||
github.com/cenkalti/backoff/v4 v4.1.3 // indirect
|
||||
github.com/census-instrumentation/opencensus-proto v0.3.0 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.1.2 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
||||
|
@ -52,60 +82,120 @@ require (
|
|||
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
|
||||
github.com/eapache/queue v1.1.0 // indirect
|
||||
github.com/fatih/camelcase v1.0.0 // indirect
|
||||
github.com/fatih/color v1.9.0 // indirect
|
||||
github.com/fatih/color v1.13.0 // indirect
|
||||
github.com/fatih/structs v1.1.0 // indirect
|
||||
github.com/felixge/httpsnoop v1.0.2 // indirect
|
||||
github.com/freedomkk-qfeng/go-fastping v0.0.0-20160109021039-d7bb493dee3e // indirect
|
||||
github.com/fsnotify/fsnotify v1.5.4 // indirect
|
||||
github.com/go-kit/kit v0.11.0 // indirect
|
||||
github.com/go-kit/log v0.2.0 // indirect
|
||||
github.com/go-logfmt/logfmt v0.5.1 // indirect
|
||||
github.com/go-logr/logr v1.2.3 // indirect
|
||||
github.com/go-logr/stdr v1.2.2 // indirect
|
||||
github.com/go-ole/go-ole v1.2.6 // indirect
|
||||
github.com/godror/knownpb v0.1.0 // indirect
|
||||
github.com/gogo/googleapis v1.4.1 // indirect
|
||||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
|
||||
github.com/google/uuid v1.3.0 // indirect
|
||||
github.com/gorilla/handlers v1.5.1 // indirect
|
||||
github.com/gorilla/mux v1.8.0 // indirect
|
||||
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
|
||||
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
|
||||
github.com/hashicorp/errwrap v1.0.0 // indirect
|
||||
github.com/hashicorp/go-cleanhttp v0.5.1 // indirect
|
||||
github.com/hashicorp/go-hclog v0.12.0 // indirect
|
||||
github.com/hashicorp/go-immutable-radix v1.0.0 // indirect
|
||||
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
|
||||
github.com/hashicorp/go-hclog v1.2.0 // indirect
|
||||
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
|
||||
github.com/hashicorp/go-multierror v1.1.1 // indirect
|
||||
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
|
||||
github.com/hashicorp/go-uuid v1.0.2 // indirect
|
||||
github.com/hashicorp/golang-lru v0.5.1 // indirect
|
||||
github.com/hashicorp/serf v0.9.6 // indirect
|
||||
github.com/hashicorp/golang-lru v0.5.4 // indirect
|
||||
github.com/hashicorp/hcl v1.0.0 // indirect
|
||||
github.com/hashicorp/serf v0.9.7 // indirect
|
||||
github.com/inconshreveable/mousetrap v1.0.0 // indirect
|
||||
github.com/jaegertracing/jaeger v1.35.2 // indirect
|
||||
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
|
||||
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
|
||||
github.com/jcmturner/gofork v1.0.0 // indirect
|
||||
github.com/jcmturner/gokrb5/v8 v8.4.2 // indirect
|
||||
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
|
||||
github.com/klauspost/compress v1.15.0 // indirect
|
||||
github.com/jmespath/go-jmespath v0.4.0 // indirect
|
||||
github.com/json-iterator/go v1.1.12 // indirect
|
||||
github.com/klauspost/compress v1.15.6 // indirect
|
||||
github.com/knadh/koanf v1.4.2 // indirect
|
||||
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
|
||||
github.com/mattn/go-colorable v0.1.6 // indirect
|
||||
github.com/mattn/go-isatty v0.0.12 // indirect
|
||||
github.com/magiconair/properties v1.8.6 // indirect
|
||||
github.com/mattn/go-colorable v0.1.12 // indirect
|
||||
github.com/mattn/go-isatty v0.0.14 // indirect
|
||||
github.com/mitchellh/copystructure v1.2.0 // indirect
|
||||
github.com/mitchellh/go-homedir v1.1.0 // indirect
|
||||
github.com/mitchellh/mapstructure v1.1.2 // indirect
|
||||
github.com/mitchellh/mapstructure v1.5.0 // indirect
|
||||
github.com/mitchellh/reflectwalk v1.0.2 // indirect
|
||||
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 // indirect
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||
github.com/morikuni/aec v1.0.0 // indirect
|
||||
github.com/mostynb/go-grpc-compression v1.1.16 // indirect
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.54.0 // indirect
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.54.0 // indirect
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus v0.54.0 // indirect
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.54.0 // indirect
|
||||
github.com/opencontainers/go-digest v1.0.0 // indirect
|
||||
github.com/opencontainers/image-spec v1.0.2 // indirect
|
||||
github.com/opentracing/opentracing-go v1.2.0 // indirect
|
||||
github.com/openzipkin/zipkin-go v0.4.0 // indirect
|
||||
github.com/paulmach/orb v0.7.1 // indirect
|
||||
github.com/pelletier/go-toml v1.9.4 // indirect
|
||||
github.com/pelletier/go-toml/v2 v2.0.0-beta.8 // indirect
|
||||
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
|
||||
github.com/pierrec/lz4/v4 v4.1.14 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
|
||||
github.com/prometheus/procfs v0.7.3 // indirect
|
||||
github.com/prometheus/statsd_exporter v0.21.0 // indirect
|
||||
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
|
||||
github.com/rs/cors v1.8.2 // indirect
|
||||
github.com/shopspring/decimal v1.3.1 // indirect
|
||||
github.com/sirupsen/logrus v1.8.1 // indirect
|
||||
github.com/stretchr/objx v0.1.1 // indirect
|
||||
github.com/spf13/afero v1.8.2 // indirect
|
||||
github.com/spf13/cast v1.5.0 // indirect
|
||||
github.com/spf13/cobra v1.4.0 // indirect
|
||||
github.com/spf13/jwalterweatherman v1.1.0 // indirect
|
||||
github.com/spf13/pflag v1.0.5 // indirect
|
||||
github.com/spf13/viper v1.11.0 // indirect
|
||||
github.com/stretchr/objx v0.4.0 // indirect
|
||||
github.com/subosito/gotenv v1.2.0 // indirect
|
||||
github.com/tklauser/go-sysconf v0.3.10 // indirect
|
||||
github.com/tklauser/numcpus v0.4.0 // indirect
|
||||
github.com/uber/jaeger-client-go v2.30.0+incompatible // indirect
|
||||
github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
|
||||
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
|
||||
github.com/xdg-go/scram v1.1.1 // indirect
|
||||
github.com/xdg-go/stringprep v1.0.3 // indirect
|
||||
github.com/yusufpapurcu/wmi v1.2.2 // indirect
|
||||
go.opencensus.io v0.23.0 // indirect
|
||||
go.opentelemetry.io/collector/pdata v0.54.0 // indirect
|
||||
go.opentelemetry.io/collector/semconv v0.54.0 // indirect
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.32.0 // indirect
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.32.0 // indirect
|
||||
go.opentelemetry.io/contrib/zpages v0.32.0 // indirect
|
||||
go.opentelemetry.io/otel v1.7.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.7.0 // indirect
|
||||
go.uber.org/automaxprocs v1.4.0 // indirect
|
||||
golang.org/x/crypto v0.0.0-20220214200702-86341886e292 // indirect
|
||||
go.opentelemetry.io/otel/exporters/prometheus v0.30.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk v1.7.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk/metric v0.30.0 // indirect
|
||||
go.uber.org/atomic v1.9.0 // indirect
|
||||
go.uber.org/automaxprocs v1.5.1 // indirect
|
||||
golang.org/x/crypto v0.0.0-20220507011949-2cf3adece122 // indirect
|
||||
golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 // indirect
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
|
||||
google.golang.org/genproto v0.0.0-20200825200019-8632dd797987 // indirect
|
||||
google.golang.org/grpc v1.33.1 // indirect
|
||||
google.golang.org/protobuf v1.27.1 // indirect
|
||||
google.golang.org/appengine v1.6.7 // indirect
|
||||
google.golang.org/genproto v0.0.0-20220608133413-ed9918b62aac // indirect
|
||||
google.golang.org/grpc v1.47.0 // indirect
|
||||
google.golang.org/protobuf v1.28.0 // indirect
|
||||
gopkg.in/ini.v1 v1.66.4 // indirect
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
|
||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
gotest.tools/v3 v3.2.0 // indirect
|
||||
)
|
||||
|
||||
replace go.opentelemetry.io/collector => github.com/flashcatcloud/opentelemetry-collector v0.54.1-0.20220628041301-3b8dabd1bcd0
|
||||
|
|
|
@ -0,0 +1,86 @@
|
|||
package traces
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"flashcat.cloud/categraf/config"
|
||||
"flashcat.cloud/categraf/config/traces"
|
||||
"go.opentelemetry.io/collector/component"
|
||||
)
|
||||
|
||||
// Collector simply wrapped the OpenTelemetry Collector, which means you can get a full support
|
||||
// for recving data from and exporting to popular trace vendors (eg. Jaeger or Zipkin).
|
||||
// For more details, see the official docs:
|
||||
// https://opentelemetry.io/docs/collector/getting-started
|
||||
// https://github.com/open-telemetry/opentelemetry-collector
|
||||
type Collector struct {
|
||||
srv *service
|
||||
cfg *traces.Config
|
||||
}
|
||||
|
||||
// New make a Collector instance
|
||||
func New(cfg *traces.Config) (*Collector, error) {
|
||||
buildInfo := component.BuildInfo{
|
||||
Command: "otelcol-categraf",
|
||||
Description: "OpenTelemetry Collector for categraf.",
|
||||
Version: config.Version,
|
||||
}
|
||||
|
||||
s, err := newService(&settings{
|
||||
Config: cfg.Parsed,
|
||||
Factories: cfg.Factories,
|
||||
BuildInfo: buildInfo,
|
||||
AsyncErrorChannel: make(chan error),
|
||||
LoggingOptions: nil,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create trace service: %v", err)
|
||||
}
|
||||
|
||||
return &Collector{
|
||||
srv: s,
|
||||
cfg: cfg,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Run starts the collector
|
||||
func (c *Collector) Run(ctx context.Context) error {
|
||||
err := c.srv.Start(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to start trace service: %v", err)
|
||||
}
|
||||
|
||||
go c.wait(ctx)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Shutdown stops the collector
|
||||
func (c *Collector) Shutdown(ctx context.Context) error {
|
||||
err := c.srv.Shutdown(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to shutdown trace service: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Collector) wait(ctx context.Context) {
|
||||
log.Println("I! Everything is ready for traces, begin tracing.")
|
||||
|
||||
LOOP:
|
||||
for {
|
||||
select {
|
||||
case err := <-c.srv.host.asyncErrorChannel:
|
||||
log.Println("E! Asynchronous error received, terminating tracing:", err)
|
||||
break LOOP
|
||||
case <-ctx.Done():
|
||||
log.Println("E! Context done, terminating tracing:", ctx.Err())
|
||||
_ = c.Shutdown(context.Background())
|
||||
}
|
||||
}
|
||||
|
||||
_ = c.Shutdown(ctx)
|
||||
}
|
|
@ -0,0 +1,48 @@
|
|||
package traces
|
||||
|
||||
import (
|
||||
"go.opentelemetry.io/collector/component"
|
||||
"go.opentelemetry.io/collector/config"
|
||||
"go.opentelemetry.io/collector/service/pkg/extensions"
|
||||
"go.opentelemetry.io/collector/service/pkg/pipelines"
|
||||
)
|
||||
|
||||
var _ component.Host = (*serviceHost)(nil)
|
||||
|
||||
type serviceHost struct {
|
||||
asyncErrorChannel chan error
|
||||
factories component.Factories
|
||||
buildInfo component.BuildInfo
|
||||
|
||||
pipelines *pipelines.Pipelines
|
||||
builtExtensions *extensions.BuiltExtensions
|
||||
}
|
||||
|
||||
// ReportFatalError is used to report to the host that the receiver encountered
|
||||
// a fatal error (i.e.: an error that the instance can't recover from) after
|
||||
// its start function has already returned.
|
||||
func (host *serviceHost) ReportFatalError(err error) {
|
||||
host.asyncErrorChannel <- err
|
||||
}
|
||||
|
||||
func (host *serviceHost) GetFactory(kind component.Kind, componentType config.Type) component.Factory {
|
||||
switch kind {
|
||||
case component.KindReceiver:
|
||||
return host.factories.Receivers[componentType]
|
||||
case component.KindProcessor:
|
||||
return host.factories.Processors[componentType]
|
||||
case component.KindExporter:
|
||||
return host.factories.Exporters[componentType]
|
||||
case component.KindExtension:
|
||||
return host.factories.Extensions[componentType]
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (host *serviceHost) GetExtensions() map[config.ComponentID]component.Extension {
|
||||
return host.builtExtensions.GetExtensions()
|
||||
}
|
||||
|
||||
func (host *serviceHost) GetExporters() map[config.DataType]map[config.ComponentID]component.Exporter {
|
||||
return host.pipelines.GetExporters()
|
||||
}
|
|
@ -0,0 +1,89 @@
|
|||
package traces
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"go.opentelemetry.io/collector/component"
|
||||
"go.opentelemetry.io/collector/config"
|
||||
"go.opentelemetry.io/collector/service/pkg/extensions"
|
||||
"go.opentelemetry.io/collector/service/pkg/pipelines"
|
||||
"go.opentelemetry.io/collector/service/pkg/telemetrylogs"
|
||||
"go.opentelemetry.io/otel/metric/nonrecording"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/multierr"
|
||||
)
|
||||
|
||||
type service struct {
|
||||
buildInfo component.BuildInfo
|
||||
config *config.Config
|
||||
telemetry component.TelemetrySettings
|
||||
host *serviceHost
|
||||
}
|
||||
|
||||
func newService(set *settings) (srv *service, err error) {
|
||||
srv = &service{
|
||||
buildInfo: set.BuildInfo,
|
||||
config: set.Config,
|
||||
telemetry: component.TelemetrySettings{
|
||||
TracerProvider: trace.NewNoopTracerProvider(),
|
||||
MeterProvider: nonrecording.NewNoopMeterProvider(),
|
||||
},
|
||||
host: &serviceHost{
|
||||
factories: set.Factories,
|
||||
buildInfo: set.BuildInfo,
|
||||
asyncErrorChannel: set.AsyncErrorChannel,
|
||||
},
|
||||
}
|
||||
|
||||
srv.telemetry.Logger, err = telemetrylogs.NewLogger(set.Config.Service.Telemetry.Logs, set.LoggingOptions)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get logger: %w", err)
|
||||
}
|
||||
|
||||
srv.host.builtExtensions, err = extensions.Build(context.Background(),
|
||||
srv.telemetry, srv.buildInfo, srv.config.Extensions, srv.config.Service.Extensions, srv.host.factories.Extensions)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot build tracing extensions: %w", err)
|
||||
}
|
||||
|
||||
srv.host.pipelines, err = pipelines.Build(context.Background(),
|
||||
srv.telemetry, srv.buildInfo, srv.config, srv.host.factories)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot build tracing pipelines: %w", err)
|
||||
}
|
||||
|
||||
return srv, nil
|
||||
}
|
||||
|
||||
func (srv *service) Start(ctx context.Context) error {
|
||||
if err := srv.host.builtExtensions.StartAll(ctx, srv.host); err != nil {
|
||||
return fmt.Errorf("failed to start tracing extensions: %w", err)
|
||||
}
|
||||
|
||||
if err := srv.host.pipelines.StartAll(ctx, srv.host); err != nil {
|
||||
return fmt.Errorf("cannot start tracing pipelines: %w", err)
|
||||
}
|
||||
|
||||
return srv.host.builtExtensions.NotifyPipelineReady()
|
||||
}
|
||||
|
||||
func (srv *service) Shutdown(ctx context.Context) error {
|
||||
// Accumulate errors and proceed with shutting down remaining components.
|
||||
var errs error
|
||||
|
||||
if err := srv.host.builtExtensions.NotifyPipelineNotReady(); err != nil {
|
||||
errs = multierr.Append(errs, fmt.Errorf("failed to notify that tracing pipeline is not ready: %w", err))
|
||||
}
|
||||
|
||||
if err := srv.host.pipelines.ShutdownAll(ctx); err != nil {
|
||||
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown tracing pipelines: %w", err))
|
||||
}
|
||||
|
||||
if err := srv.host.builtExtensions.ShutdownAll(ctx); err != nil {
|
||||
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown tracing extensions: %w", err))
|
||||
}
|
||||
|
||||
// TODO: Shutdown TracerProvider, MeterProvider, and Sync Logger.
|
||||
return errs
|
||||
}
|
|
@ -0,0 +1,25 @@
|
|||
package traces
|
||||
|
||||
import (
|
||||
"go.opentelemetry.io/collector/component"
|
||||
"go.opentelemetry.io/collector/config"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// settings holds configuration for building a new service.
|
||||
type settings struct {
|
||||
// Factories component factories.
|
||||
Factories component.Factories
|
||||
|
||||
// BuildInfo provides collector start information.
|
||||
BuildInfo component.BuildInfo
|
||||
|
||||
// Config represents the configuration of the service.
|
||||
Config *config.Config
|
||||
|
||||
// AsyncErrorChannel is the channel that is used to report fatal errors.
|
||||
AsyncErrorChannel chan error
|
||||
|
||||
// LoggingOptions provides a way to change behavior of zap logging.
|
||||
LoggingOptions []zap.Option
|
||||
}
|
Loading…
Reference in New Issue