add job and node filter

This commit is contained in:
lsy1990 2022-07-30 22:24:34 +08:00
parent 93b2eb3558
commit 77ed17bbe4
1 changed files with 24 additions and 2 deletions

View File

@ -14,6 +14,7 @@ import (
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/pkg/filter"
"flashcat.cloud/categraf/pkg/tls"
"flashcat.cloud/categraf/types"
)
@ -64,6 +65,11 @@ type Instance struct {
MaxSubJobPerLayer int `toml:"max_subjob_per_layer"`
JobExclude []string `toml:"job_exclude"`
JobInclude []string `toml:"job_include"`
jobFilter filter.Filter
NodeExclude []string `toml:"node_exclude"`
NodeInclude []string `toml:"node_include"`
nodeFilter filter.Filter
semaphore chan struct{}
}
@ -135,6 +141,16 @@ func (ins *Instance) initialize(client *http.Client) error {
}
ins.Source = u.Hostname()
// init filters
ins.jobFilter, err = filter.NewIncludeExcludeFilter(ins.JobInclude, ins.JobExclude)
if err != nil {
return fmt.Errorf("error compiling job filters[%s]: %v", ins.URL, err)
}
ins.nodeFilter, err = filter.NewIncludeExcludeFilter(ins.NodeInclude, ins.NodeExclude)
if err != nil {
return fmt.Errorf("error compiling node filters[%s]: %v", ins.URL, err)
}
// init tcp pool with default value
if ins.MaxConnections <= 0 {
ins.MaxConnections = 5
@ -157,7 +173,10 @@ func (ins *Instance) gatherNodeData(n node, slist *types.SampleList) error {
if n.DisplayName == "" {
return fmt.Errorf("error empty node name")
}
// filter out excluded or not included node_name
if !ins.nodeFilter.Match(tags["node_name"]) {
return nil
}
tags["node_name"] = n.DisplayName
monitorData := n.MonitorData
@ -247,7 +266,10 @@ func (ins *Instance) getJobDetail(jr jobRequest, slist *types.SampleList) error
if ins.MaxSubJobDepth > 0 && jr.layer == ins.MaxSubJobDepth {
return nil
}
// filter out excluded or not included jobs
if !ins.jobFilter.Match(jr.hierarchyName()) {
return nil
}
js, err := ins.client.getJobs(context.Background(), &jr)
if err != nil {
return err