nightingale1/timer/collect_rule.go

300 lines
8.5 KiB
Go

package timer
import (
"encoding/json"
"fmt"
"math/rand"
"strings"
"time"
"github.com/didi/nightingale/v5/cache"
"github.com/didi/nightingale/v5/models"
"github.com/toolkits/pkg/logger"
)
func SyncCollectRules() {
err := syncCollectRules()
if err != nil {
fmt.Println("timer: sync collect rules fail:", err)
exit(1)
}
go loopSyncCollectRules()
}
func loopSyncCollectRules() {
randtime := rand.Intn(10000)
fmt.Printf("timer: sync collect rules: random sleep %dms\n", randtime)
time.Sleep(time.Duration(randtime) * time.Millisecond)
interval := time.Duration(60) * time.Second
for {
time.Sleep(interval)
err := syncCollectRules()
if err != nil {
logger.Warning("timer: sync collect rules fail:", err)
}
}
}
func syncCollectRules() error {
start := time.Now()
collectRules, err := models.CollectRuleGetAll()
if err != nil {
return err
}
// ident -> collect_rule1, collect_rule2 ...
collectRulesMap := make(map[string][]*models.CollectRule)
// classpath prefix -> classpaths
prefixClasspath := make(map[string][]models.Classpath)
for i := range collectRules {
classpathAndRes, exists := cache.ClasspathRes.Get(collectRules[i].ClasspathId)
if !exists {
continue
}
err := changeCollectRule(collectRules[i])
if err != nil {
logger.Errorf("change collect:%+v err:%v", collectRules[i], err)
continue
}
if collectRules[i].PrefixMatch == 0 {
// 我这个采集规则所关联的节点下面直接挂载的那些资源,都关联本采集规则
for _, ident := range classpathAndRes.Res {
if _, exists := collectRulesMap[ident]; !exists {
collectRulesMap[ident] = []*models.CollectRule{collectRules[i]}
} else {
collectRulesMap[ident] = append(collectRulesMap[ident], collectRules[i])
}
}
} else {
// 我这个采集规则关联的节点下面的所有的子节点,这个计算量有点大,可能是个问题
cps, exists := prefixClasspath[classpathAndRes.Classpath.Path]
if !exists {
cps, err = models.ClasspathGetsByPrefix(classpathAndRes.Classpath.Path)
if err != nil {
logger.Errorf("collectRule %+v get classpath err:%v", collectRules[i], err)
continue
}
prefixClasspath[classpathAndRes.Classpath.Path] = cps
}
for j := range cps {
classpathAndRes, exists := cache.ClasspathRes.Get(cps[j].Id)
if !exists {
continue
}
for _, ident := range classpathAndRes.Res {
if _, exists := collectRulesMap[ident]; !exists {
collectRulesMap[ident] = []*models.CollectRule{collectRules[i]}
} else {
collectRulesMap[ident] = append(collectRulesMap[ident], collectRules[i])
}
}
}
}
}
cache.CollectRulesOfIdent.SetAll(collectRulesMap)
logger.Debugf("timer: sync collect rules done, cost: %dms", time.Since(start).Milliseconds())
return nil
}
// 将服务端collect rule转换为agent需要的格式
func changeCollectRule(rule *models.CollectRule) error {
switch rule.Type {
case "port":
var conf models.PortConfig
err := json.Unmarshal([]byte(rule.Data), &conf)
if err != nil {
return err
}
tags := strings.Fields(rule.AppendTags)
for i := 0; i < len(tags); i++ {
tags[i] = strings.Replace(tags[i], "=", ":", 1)
}
config := PortCollectFormat{
Instances: []struct {
MinCollectionInterval int `json:"min_collection_interval,omitempty"`
Tags []string `json:"tags,omitempty"`
Protocol string `json:"protocol" description:"udp or tcp"`
Port int `json:"port"`
Timeout int `json:"timeout"`
}{{
MinCollectionInterval: rule.Step,
Tags: tags,
Protocol: conf.Protocol,
Port: conf.Port,
Timeout: conf.Timeout,
}},
}
data, err := json.Marshal(config)
if err != nil {
return err
}
rule.Data = string(data)
case "script":
var conf models.ScriptConfig
err := json.Unmarshal([]byte(rule.Data), &conf)
if err != nil {
return err
}
tags := strings.Fields(rule.AppendTags)
for i := 0; i < len(tags); i++ {
tags[i] = strings.Replace(tags[i], "=", ":", 1)
}
config := ScriptCollectFormat{
Instances: []struct {
MinCollectionInterval int `json:"min_collection_interval,omitempty"`
FilePath string `json:"file_path"`
Root string `json:"root"`
Params string `json:"params"`
Env map[string]string `json:"env"`
Stdin string `json:"stdin"`
Timeout int `json:"timeout"`
Tags []string `json:"tags,omitempty"`
}{{
MinCollectionInterval: rule.Step,
FilePath: conf.Path,
Params: conf.Params,
Env: conf.Env,
Stdin: conf.Stdin,
Timeout: conf.Timeout,
Tags: tags,
}},
}
data, err := json.Marshal(config)
if err != nil {
return err
}
rule.Data = string(data)
case "log":
var conf models.LogConfig
err := json.Unmarshal([]byte(rule.Data), &conf)
if err != nil {
return err
}
tags := strings.Fields(rule.AppendTags)
for i := 0; i < len(tags); i++ {
tags[i] = strings.Replace(tags[i], "=", ":", 1)
}
config := LogCollectFormat{
Instances: []struct {
MetricName string `json:"metric_name"` //
FilePath string `json:"file_path"`
Pattern string `json:"pattern"`
TagsPattern map[string]string `json:"tags_pattern"`
Func string `json:"func"`
Tags []string `json:"tags,omitempty"`
}{{
MetricName: rule.Name,
FilePath: conf.FilePath,
Pattern: conf.Pattern,
TagsPattern: conf.TagsPattern,
Func: conf.Func,
Tags: tags,
}},
}
data, err := json.Marshal(config)
if err != nil {
return err
}
rule.Data = string(data)
case "process":
var conf models.ProcConfig
err := json.Unmarshal([]byte(rule.Data), &conf)
if err != nil {
return err
}
tags := strings.Fields(rule.AppendTags)
for i := 0; i < len(tags); i++ {
tags[i] = strings.Replace(tags[i], "=", ":", 1)
}
config := ProcCollectFormat{
Instances: []struct {
MinCollectionInterval int `json:"min_collection_interval,omitempty"`
Tags []string `json:"tags,omitempty"`
Target string `json:"target"`
CollectMethod string `json:"collect_method" description:"name or cmdline"`
}{{
MinCollectionInterval: rule.Step,
Tags: tags,
Target: conf.Param,
CollectMethod: conf.Method,
}},
}
data, err := json.Marshal(config)
if err != nil {
return err
}
rule.Data = string(data)
}
return nil
}
type ScriptCollectFormat struct {
Instances []struct {
MinCollectionInterval int `json:"min_collection_interval,omitempty"`
FilePath string `json:"file_path"`
Root string `json:"root"`
Params string `json:"params"`
Env map[string]string `json:"env"`
Stdin string `json:"stdin"`
Timeout int `json:"timeout"`
Tags []string `json:"tags,omitempty"`
} `json:"instances"`
}
type PortCollectFormat struct {
Instances []struct {
MinCollectionInterval int `json:"min_collection_interval,omitempty"`
Tags []string `json:"tags,omitempty"`
Protocol string `json:"protocol" description:"udp or tcp"`
Port int `json:"port"`
Timeout int `json:"timeout"`
} `json:"instances"`
}
type LogCollectFormat struct {
Instances []struct {
MetricName string `json:"metric_name"` //
FilePath string `json:"file_path"` //
Pattern string `json:"pattern"` //
TagsPattern map[string]string `json:"tags_pattern"` //
Func string `json:"func"` // count(c), histogram(h)
Tags []string `json:"tags,omitempty"`
} `json:"instances"`
}
type ProcCollectFormat struct {
Instances []struct {
MinCollectionInterval int `json:"min_collection_interval,omitempty"`
Tags []string `json:"tags,omitempty"`
Target string `json:"target"`
CollectMethod string `json:"collect_method" description:"name or cmdline"`
} `json:"instances"`
}