274 lines
7.8 KiB
Go
274 lines
7.8 KiB
Go
package timer
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"math/rand"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/didi/nightingale/v5/cache"
|
|
"github.com/didi/nightingale/v5/models"
|
|
|
|
"github.com/toolkits/pkg/logger"
|
|
)
|
|
|
|
func SyncCollectRules() {
|
|
err := syncCollectRules()
|
|
if err != nil {
|
|
fmt.Println("timer: sync collect rules fail:", err)
|
|
exit(1)
|
|
}
|
|
|
|
go loopSyncCollectRules()
|
|
}
|
|
|
|
func loopSyncCollectRules() {
|
|
randtime := rand.Intn(10000)
|
|
fmt.Printf("timer: sync collect rules: random sleep %dms\n", randtime)
|
|
time.Sleep(time.Duration(randtime) * time.Millisecond)
|
|
|
|
interval := time.Duration(60) * time.Second
|
|
|
|
for {
|
|
time.Sleep(interval)
|
|
err := syncCollectRules()
|
|
if err != nil {
|
|
logger.Warning("timer: sync collect rules fail:", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func syncCollectRules() error {
|
|
start := time.Now()
|
|
|
|
collectRules, err := models.CollectRuleGetAll()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// ident -> collect_rule1, collect_rule2 ...
|
|
collectRulesMap := make(map[string][]*models.CollectRule)
|
|
// classpath prefix -> classpaths
|
|
prefixClasspath := make(map[string][]models.Classpath)
|
|
|
|
for i := range collectRules {
|
|
classpathAndRes, exists := cache.ClasspathRes.Get(collectRules[i].ClasspathId)
|
|
if !exists {
|
|
continue
|
|
}
|
|
|
|
err := changeCollectRule(collectRules[i])
|
|
if err != nil {
|
|
logger.Errorf("change collect:%+v err:%v", collectRules[i], err)
|
|
continue
|
|
}
|
|
|
|
if collectRules[i].PrefixMatch == 0 {
|
|
// 我这个采集规则所关联的节点下面直接挂载的那些资源,都关联本采集规则
|
|
for _, ident := range classpathAndRes.Res {
|
|
if _, exists := collectRulesMap[ident]; !exists {
|
|
collectRulesMap[ident] = []*models.CollectRule{collectRules[i]}
|
|
} else {
|
|
collectRulesMap[ident] = append(collectRulesMap[ident], collectRules[i])
|
|
}
|
|
}
|
|
} else {
|
|
// 我这个采集规则关联的节点下面的所有的子节点,这个计算量有点大,可能是个问题
|
|
cps, exists := prefixClasspath[classpathAndRes.Classpath.Path]
|
|
if !exists {
|
|
cps, err = models.ClasspathGetsByPrefix(classpathAndRes.Classpath.Path)
|
|
if err != nil {
|
|
logger.Errorf("collectRule %+v get classpath err:%v", collectRules[i], err)
|
|
continue
|
|
}
|
|
prefixClasspath[classpathAndRes.Classpath.Path] = cps
|
|
}
|
|
|
|
for j := range cps {
|
|
classpathAndRes, exists := cache.ClasspathRes.Get(cps[j].Id)
|
|
if !exists {
|
|
continue
|
|
}
|
|
|
|
for _, ident := range classpathAndRes.Res {
|
|
if _, exists := collectRulesMap[ident]; !exists {
|
|
collectRulesMap[ident] = []*models.CollectRule{collectRules[i]}
|
|
} else {
|
|
collectRulesMap[ident] = append(collectRulesMap[ident], collectRules[i])
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
cache.CollectRulesOfIdent.SetAll(collectRulesMap)
|
|
logger.Debugf("timer: sync collect rules done, cost: %dms", time.Since(start).Milliseconds())
|
|
|
|
return nil
|
|
}
|
|
|
|
// 将服务端collect rule转换为agent需要的格式
|
|
func changeCollectRule(rule *models.CollectRule) error {
|
|
switch rule.Type {
|
|
case "port":
|
|
var conf models.PortConfig
|
|
err := json.Unmarshal([]byte(rule.Data), &conf)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
config := PortCollectFormat{
|
|
Instances: []struct {
|
|
MinCollectionInterval int `json:"min_collection_interval,omitempty"`
|
|
Tags []string `json:"tags,omitempty"`
|
|
Protocol string `json:"protocol" description:"udp or tcp"`
|
|
Port int `json:"port"`
|
|
Timeout int `json:"timeout"`
|
|
}{{
|
|
MinCollectionInterval: rule.Step,
|
|
Tags: strings.Fields(strings.Replace(rule.AppendTags, "=", ":", 1)),
|
|
Protocol: conf.Protocol,
|
|
Port: conf.Port,
|
|
Timeout: conf.Timeout,
|
|
}},
|
|
}
|
|
|
|
data, err := json.Marshal(config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
rule.Data = string(data)
|
|
|
|
case "script":
|
|
var conf models.ScriptConfig
|
|
err := json.Unmarshal([]byte(rule.Data), &conf)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
config := ScriptCollectFormat{
|
|
Instances: []struct {
|
|
MinCollectionInterval int `json:"min_collection_interval,omitempty"`
|
|
FilePath string `json:"file_path"`
|
|
Root string `json:"root"`
|
|
Params string `json:"params"`
|
|
Env map[string]string `json:"env"`
|
|
Stdin string `json:"stdin"`
|
|
Timeout int `json:"timeout"`
|
|
}{{
|
|
MinCollectionInterval: rule.Step,
|
|
FilePath: conf.Path,
|
|
Params: conf.Params,
|
|
Env: conf.Env,
|
|
Stdin: conf.Stdin,
|
|
Timeout: conf.Timeout,
|
|
}},
|
|
}
|
|
|
|
data, err := json.Marshal(config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
rule.Data = string(data)
|
|
case "log":
|
|
var conf models.LogConfig
|
|
err := json.Unmarshal([]byte(rule.Data), &conf)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
config := LogCollectFormat{
|
|
Instances: []struct {
|
|
MetricName string `json:"metric_name"` //
|
|
FilePath string `json:"file_path"`
|
|
Pattern string `json:"pattern"`
|
|
TagsPattern map[string]string `json:"tags_pattern"`
|
|
Func string `json:"func"`
|
|
}{{
|
|
MetricName: rule.Name,
|
|
FilePath: conf.FilePath,
|
|
Pattern: conf.Pattern,
|
|
TagsPattern: conf.TagsPattern,
|
|
Func: conf.Func,
|
|
}},
|
|
}
|
|
|
|
data, err := json.Marshal(config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
rule.Data = string(data)
|
|
case "process":
|
|
var conf models.ProcConfig
|
|
err := json.Unmarshal([]byte(rule.Data), &conf)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
config := ProcCollectFormat{
|
|
Instances: []struct {
|
|
MinCollectionInterval int `json:"min_collection_interval,omitempty"`
|
|
Tags []string `json:"tags,omitempty"`
|
|
Target string `json:"target"`
|
|
CollectMethod string `json:"collect_method" description:"name or cmdline"`
|
|
}{{
|
|
MinCollectionInterval: rule.Step,
|
|
Tags: strings.Fields(strings.Replace(rule.AppendTags, "=", ":", 1)),
|
|
Target: conf.Param,
|
|
CollectMethod: conf.Method,
|
|
}},
|
|
}
|
|
|
|
data, err := json.Marshal(config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
rule.Data = string(data)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
type ScriptCollectFormat struct {
|
|
Instances []struct {
|
|
MinCollectionInterval int `json:"min_collection_interval,omitempty"`
|
|
FilePath string `json:"file_path"`
|
|
Root string `json:"root"`
|
|
Params string `json:"params"`
|
|
Env map[string]string `json:"env"`
|
|
Stdin string `json:"stdin"`
|
|
Timeout int `json:"timeout"`
|
|
} `json:"instances"`
|
|
}
|
|
|
|
type PortCollectFormat struct {
|
|
Instances []struct {
|
|
MinCollectionInterval int `json:"min_collection_interval,omitempty"`
|
|
Tags []string `json:"tags,omitempty"`
|
|
Protocol string `json:"protocol" description:"udp or tcp"`
|
|
Port int `json:"port"`
|
|
Timeout int `json:"timeout"`
|
|
} `json:"instances"`
|
|
}
|
|
|
|
type LogCollectFormat struct {
|
|
Instances []struct {
|
|
MetricName string `json:"metric_name"` //
|
|
FilePath string `json:"file_path"` //
|
|
Pattern string `json:"pattern"` //
|
|
TagsPattern map[string]string `json:"tags_pattern"` //
|
|
Func string `json:"func"` // count(c), histogram(h)
|
|
} `json:"instances"`
|
|
}
|
|
|
|
type ProcCollectFormat struct {
|
|
Instances []struct {
|
|
MinCollectionInterval int `json:"min_collection_interval,omitempty"`
|
|
Tags []string `json:"tags,omitempty"`
|
|
Target string `json:"target"`
|
|
CollectMethod string `json:"collect_method" description:"name or cmdline"`
|
|
} `json:"instances"`
|
|
}
|