fix log agent exit when there is no inputs

This commit is contained in:
kongfei 2022-06-02 10:57:37 +08:00
parent 589f8c1f74
commit 9fca05135d
2 changed files with 28 additions and 16 deletions

View File

@ -6,6 +6,7 @@ import (
"log"
"path"
"strings"
"sync"
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/inputs"
@ -54,7 +55,20 @@ func NewAgent(filters map[string]struct{}) *Agent {
func (a *Agent) Start() {
log.Println("I! agent starting")
a.startInputs()
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
a.startLogAgent()
}()
go func() {
defer wg.Done()
err := a.startInputs()
if err != nil {
log.Println("E!", err)
}
}()
wg.Wait()
}
func (a *Agent) Stop() {
@ -78,10 +92,6 @@ func (a *Agent) Reload() {
}
func (a *Agent) startInputs() error {
if config.LogConfig.Enable {
go startLogAgent()
}
names, err := a.getInputsByDirs()
if err != nil {
return err

View File

@ -12,7 +12,7 @@ import (
"log"
"time"
coreConfig "flashcat.cloud/categraf/config"
coreconfig "flashcat.cloud/categraf/config"
logsconfig "flashcat.cloud/categraf/config/logs"
"flashcat.cloud/categraf/pkg/logs/auditor"
@ -50,20 +50,20 @@ func NewLogAgent(sources *logsconfig.LogSources, services *service.Services, pro
// We pass the health handle to the auditor because it's the end of the pipeline and the most
// critical part. Arguably it could also be plugged to the destination.
auditorTTL := time.Duration(23) * time.Hour
auditor := auditor.New(coreConfig.GetLogRunPath(), auditor.DefaultRegistryFilename, auditorTTL)
auditor := auditor.New(coreconfig.GetLogRunPath(), auditor.DefaultRegistryFilename, auditorTTL)
destinationsCtx := client.NewDestinationsContext()
diagnosticMessageReceiver := diagnostic.NewBufferedMessageReceiver()
// setup the pipeline provider that provides pairs of processor and sender
pipelineProvider := pipeline.NewProvider(logsconfig.NumberOfPipelines, auditor, diagnosticMessageReceiver, processingRules, endpoints, destinationsCtx)
validatePodContainerID := coreConfig.ValidatePodContainerID()
validatePodContainerID := coreconfig.ValidatePodContainerID()
// setup the inputs
inputs := []restart.Restartable{
file.NewScanner(sources, coreConfig.OpenLogsLimit(), pipelineProvider, auditor,
file.DefaultSleepDuration, validatePodContainerID, time.Duration(time.Duration(coreConfig.FileScanPeriod())*time.Second)),
listener.NewLauncher(sources, coreConfig.LogFrameSize(), pipelineProvider),
file.NewScanner(sources, coreconfig.OpenLogsLimit(), pipelineProvider, auditor,
file.DefaultSleepDuration, validatePodContainerID, time.Duration(time.Duration(coreconfig.FileScanPeriod())*time.Second)),
listener.NewLauncher(sources, coreconfig.LogFrameSize(), pipelineProvider),
journald.NewLauncher(sources, pipelineProvider, auditor),
}
@ -147,10 +147,12 @@ const (
invalidProcessingRules = "invalid_global_processing_rules"
)
func startLogAgent() {
func (a *Agent) startLogAgent() {
if !coreconfig.LogConfig.Enable {
return
}
logSources := logsconfig.NewLogSources()
for _, c := range coreConfig.LogConfig.Items {
for _, c := range coreconfig.LogConfig.Items {
if c == nil {
continue
}
@ -163,7 +165,7 @@ func startLogAgent() {
logSources.AddSource(source)
}
if coreConfig.LogConfig != nil && len(coreConfig.LogConfig.Items) == 0 {
if coreconfig.LogConfig != nil && len(coreconfig.LogConfig.Items) == 0 {
return
}
httpConnectivity := logsconfig.HTTPConnectivityFailure
@ -197,7 +199,7 @@ func GetContainerColloectAll() bool {
// GlobalProcessingRules returns the global processing rules to apply to all logs.
func GlobalProcessingRules() ([]*logsconfig.ProcessingRule, error) {
rules := coreConfig.LogConfig.GlobalProcessingRules
rules := coreconfig.LogConfig.GlobalProcessingRules
err := logsconfig.ValidateProcessingRules(rules)
if err != nil {
return nil, err