214 lines
7.0 KiB
Go
214 lines
7.0 KiB
Go
// Unless explicitly stated otherwise all files in this repository are licensed
|
|
// under the Apache License Version 2.0.
|
|
// This product includes software developed at Datadog (https://www.datadoghq.com/).
|
|
// Copyright 2016-present Datadog, Inc.
|
|
|
|
package agent
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"time"
|
|
|
|
coreconfig "flashcat.cloud/categraf/config"
|
|
|
|
logsconfig "flashcat.cloud/categraf/config/logs"
|
|
"flashcat.cloud/categraf/logs/auditor"
|
|
"flashcat.cloud/categraf/logs/client"
|
|
"flashcat.cloud/categraf/logs/client/http"
|
|
"flashcat.cloud/categraf/logs/diagnostic"
|
|
"flashcat.cloud/categraf/logs/input/file"
|
|
"flashcat.cloud/categraf/logs/input/journald"
|
|
"flashcat.cloud/categraf/logs/input/listener"
|
|
"flashcat.cloud/categraf/logs/pipeline"
|
|
"flashcat.cloud/categraf/logs/restart"
|
|
"flashcat.cloud/categraf/logs/service"
|
|
logService "flashcat.cloud/categraf/logs/service"
|
|
"flashcat.cloud/categraf/logs/status"
|
|
)
|
|
|
|
// LogAgent represents the data pipeline that collects, decodes,
|
|
// processes and sends logs to the backend
|
|
// + ------------------------------------------------------ +
|
|
// | |
|
|
// | Collector -> Decoder -> Processor -> Sender -> Auditor |
|
|
// | |
|
|
// + ------------------------------------------------------ +
|
|
type LogAgent struct {
|
|
auditor auditor.Auditor
|
|
destinationsCtx *client.DestinationsContext
|
|
pipelineProvider pipeline.Provider
|
|
inputs []restart.Restartable
|
|
diagnosticMessageReceiver *diagnostic.BufferedMessageReceiver
|
|
}
|
|
|
|
// NewAgent returns a new Logs LogAgent
|
|
func NewLogAgent(sources *logsconfig.LogSources, services *service.Services, processingRules []*logsconfig.ProcessingRule, endpoints *logsconfig.Endpoints) *LogAgent {
|
|
// setup the auditor
|
|
// We pass the health handle to the auditor because it's the end of the pipeline and the most
|
|
// critical part. Arguably it could also be plugged to the destination.
|
|
auditorTTL := time.Duration(23) * time.Hour
|
|
auditor := auditor.New(coreconfig.GetLogRunPath(), auditor.DefaultRegistryFilename, auditorTTL)
|
|
destinationsCtx := client.NewDestinationsContext()
|
|
diagnosticMessageReceiver := diagnostic.NewBufferedMessageReceiver()
|
|
|
|
// setup the pipeline provider that provides pairs of processor and sender
|
|
pipelineProvider := pipeline.NewProvider(logsconfig.NumberOfPipelines, auditor, diagnosticMessageReceiver, processingRules, endpoints, destinationsCtx)
|
|
|
|
validatePodContainerID := coreconfig.ValidatePodContainerID()
|
|
|
|
// setup the inputs
|
|
inputs := []restart.Restartable{
|
|
file.NewScanner(sources, coreconfig.OpenLogsLimit(), pipelineProvider, auditor,
|
|
file.DefaultSleepDuration, validatePodContainerID, time.Duration(time.Duration(coreconfig.FileScanPeriod())*time.Second)),
|
|
listener.NewLauncher(sources, coreconfig.LogFrameSize(), pipelineProvider),
|
|
journald.NewLauncher(sources, pipelineProvider, auditor),
|
|
}
|
|
|
|
return &LogAgent{
|
|
auditor: auditor,
|
|
destinationsCtx: destinationsCtx,
|
|
pipelineProvider: pipelineProvider,
|
|
inputs: inputs,
|
|
diagnosticMessageReceiver: diagnosticMessageReceiver,
|
|
}
|
|
}
|
|
|
|
// Start starts all the elements of the data pipeline
|
|
// in the right order to prevent data loss
|
|
func (a *LogAgent) Start() {
|
|
starter := restart.NewStarter(a.destinationsCtx, a.auditor, a.pipelineProvider, a.diagnosticMessageReceiver)
|
|
for _, input := range a.inputs {
|
|
starter.Add(input)
|
|
}
|
|
starter.Start()
|
|
}
|
|
|
|
// Flush flushes synchronously the pipelines managed by the Logs LogAgent.
|
|
func (a *LogAgent) Flush(ctx context.Context) {
|
|
a.pipelineProvider.Flush(ctx)
|
|
}
|
|
|
|
// Stop stops all the elements of the data pipeline
|
|
// in the right order to prevent data loss
|
|
func (a *LogAgent) Stop() {
|
|
inputs := restart.NewParallelStopper()
|
|
for _, input := range a.inputs {
|
|
inputs.Add(input)
|
|
}
|
|
stopper := restart.NewSerialStopper(
|
|
inputs,
|
|
a.pipelineProvider,
|
|
a.auditor,
|
|
a.destinationsCtx,
|
|
a.diagnosticMessageReceiver,
|
|
)
|
|
|
|
// This will try to stop everything in order, including the potentially blocking
|
|
// parts like the sender. After StopTimeout it will just stop the last part of the
|
|
// pipeline, disconnecting it from the auditor, to make sure that the pipeline is
|
|
// flushed before stopping.
|
|
// TODO: Add this feature in the stopper.
|
|
c := make(chan struct{})
|
|
go func() {
|
|
stopper.Stop()
|
|
close(c)
|
|
}()
|
|
timeout := time.Duration(30) * time.Second
|
|
select {
|
|
case <-c:
|
|
case <-time.After(timeout):
|
|
log.Println("I! Timed out when stopping logs-agent, forcing it to stop now")
|
|
// We force all destinations to read/flush all the messages they get without
|
|
// trying to write to the network.
|
|
a.destinationsCtx.Stop()
|
|
// Wait again for the stopper to complete.
|
|
// In some situation, the stopper unfortunately never succeed to complete,
|
|
// we've already reached the grace period, give it some more seconds and
|
|
// then force quit.
|
|
timeout := time.NewTimer(5 * time.Second)
|
|
select {
|
|
case <-c:
|
|
case <-timeout.C:
|
|
log.Println("W! Force close of the Logs LogAgent, dumping the Go routines.")
|
|
}
|
|
}
|
|
}
|
|
|
|
var (
|
|
logAgent *LogAgent
|
|
)
|
|
|
|
const (
|
|
intakeTrackType = "logs"
|
|
AgentJSONIntakeProtocol = "agent-json"
|
|
invalidProcessingRules = "invalid_global_processing_rules"
|
|
)
|
|
|
|
func (a *Agent) startLogAgent() {
|
|
if coreconfig.Config == nil ||
|
|
!coreconfig.Config.Logs.Enable ||
|
|
len(coreconfig.Config.Logs.Items) == 0 {
|
|
return
|
|
}
|
|
|
|
httpConnectivity := logsconfig.HTTPConnectivityFailure
|
|
if endpoints, err := BuildHTTPEndpoints(intakeTrackType, AgentJSONIntakeProtocol, logsconfig.DefaultIntakeOrigin); err == nil {
|
|
httpConnectivity = http.CheckConnectivity(endpoints.Main)
|
|
}
|
|
endpoints, err := BuildEndpoints(httpConnectivity, intakeTrackType, AgentJSONIntakeProtocol, logsconfig.DefaultIntakeOrigin)
|
|
processingRules, err := GlobalProcessingRules()
|
|
if err != nil {
|
|
message := fmt.Sprintf("Invalid processing rules: %v", err)
|
|
status.AddGlobalError(invalidProcessingRules, message)
|
|
log.Println("E!", errors.New(message))
|
|
return
|
|
}
|
|
|
|
sources := logsconfig.NewLogSources()
|
|
services := logService.NewServices()
|
|
log.Println("I! Starting logs-agent...")
|
|
logAgent = NewLogAgent(sources, services, processingRules, endpoints)
|
|
logAgent.Start()
|
|
|
|
// add source
|
|
for _, c := range coreconfig.Config.Logs.Items {
|
|
if c == nil {
|
|
continue
|
|
}
|
|
source := logsconfig.NewLogSource(c.Name, c)
|
|
if err := c.Validate(); err != nil {
|
|
log.Println("W! Invalid logs configuration:", err)
|
|
source.Status.Error(err)
|
|
continue
|
|
}
|
|
sources.AddSource(source)
|
|
}
|
|
}
|
|
|
|
func stopLogAgent() {
|
|
if logAgent != nil {
|
|
logAgent.Stop()
|
|
}
|
|
}
|
|
|
|
func GetContainerColloectAll() bool {
|
|
return false
|
|
}
|
|
|
|
// GlobalProcessingRules returns the global processing rules to apply to all logs.
|
|
func GlobalProcessingRules() ([]*logsconfig.ProcessingRule, error) {
|
|
rules := coreconfig.Config.Logs.GlobalProcessingRules
|
|
err := logsconfig.ValidateProcessingRules(rules)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
err = logsconfig.CompileProcessingRules(rules)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return rules, nil
|
|
}
|