resolve conflicts

This commit is contained in:
yushuangyu 2022-07-03 22:14:09 +08:00
commit f3d496099a
11 changed files with 2067 additions and 58 deletions

View File

@ -1,6 +1,6 @@
# Categraf
![Release](https://github.com/flashcatcloud/categraf/workflows/Release/badge.svg)
[![Powered By Wine](https://img.shields.io/badge/Powered%20By-Flashcat-red)](https://flashcat.cloud/)
[![Powered By Flashcat](https://img.shields.io/badge/Powered%20By-Flashcat-red)](https://flashcat.cloud/)
Categraf is a monitoring agent for nightingale / prometheus / m3db / victoriametrics / thanos / influxdb / tdengine.
@ -102,7 +102,7 @@ Click on the links to see the README of each plugin.
- [ ] rocketmq
- [ ] activemq
- [ ] kafka
- [ ] elasticsearch
- [x] [elasticsearch](inputs/elasticsearch)
- [x] windows
- [ ] mssql
- [ ] iis

View File

@ -23,29 +23,28 @@ http_timeout = "5s"
local = true
## Set cluster_health to true when you want to obtain cluster health stats
cluster_health = false
cluster_health = true
## Adjust cluster_health_level when you want to obtain detailed health stats
## The options are
## - indices (default)
## - cluster
# cluster_health_level = "indices"
cluster_health_level = "cluster"
## Set cluster_stats to true when you want to obtain cluster stats.
cluster_stats = false
cluster_stats = true
## Indices to collect; can be one or more indices names or _all
## Use of wildcards is allowed. Use a wildcard at the end to retrieve index names that end with a changing value, like a date.
indices_include = ["_all"]
# indices_include = ["zipkin*"]
## One of "shards", "cluster", "indices"
## Currently only "shards" is implemented
indices_level = "shards"
## use "shards" or blank string for indices level
indices_level = ""
## node_stats is a list of sub-stats that you want to have gathered. Valid options
## are "indices", "os", "process", "jvm", "thread_pool", "fs", "transport", "http",
## "breaker". Per default, all stats are gathered.
# node_stats = ["jvm", "http"]
node_stats = ["jvm", "breaker", "process", "os", "fs", "indices"]
## HTTP Basic Authentication username and password.
username = "elastic"
@ -63,4 +62,4 @@ password = "password"
## Each 'indices_include' entry ending with a wildcard (*) or glob matching pattern will group together all indices that match it, and
## sort them by the date or number after the wildcard. Metrics then are gathered for only the 'num_most_recent_indices' amount of most
## recent indices.
# num_most_recent_indices = 0
num_most_recent_indices = 1

View File

@ -125,6 +125,7 @@ func (mh *MetricsHouseType) post(conn driver.Conn, series []*types.Sample) error
e.Timestamp, //会自动转换时间格式
e.Timestamp, //会自动转换时间格式
e.Metric,
config.Config.GetHostname(),
convertTags(e),
e.Value,
)
@ -147,6 +148,7 @@ CREATE TABLE IF NOT EXISTS %s (
event_time DateTime
, event_date Date
, metric LowCardinality(String)
, hostname LowCardinality(String)
, tags String
, value Float64
) ENGINE = MergeTree

View File

@ -1,2 +1,14 @@
# elasticsearch
forked from telegraf/elasticsearch
## 改动
- 不再处理json中的数组类型
- 修改一些不合法的metric名称
- 配置去掉cluster_stats_only_from_master
- 调整默认配置不采集每个索引的监控数据nodestats不采集http数据
## 监控大盘
在该 README 文件同级目录下的 dashboard.json 可以直接导入夜莺使用

File diff suppressed because it is too large Load Diff

View File

@ -242,7 +242,7 @@ func (ins *Instance) gatherOnce(slist *list.SafeList) {
// Gather node ID
if info.nodeID, err = ins.gatherNodeID(s + "/_nodes/_local/name"); err != nil {
slist.PushFront(types.NewSample("up", 0, ins.Labels))
slist.PushFront(types.NewSample("up", 0, map[string]string{"address": s}, ins.Labels))
log.Println("E! failed to gather node id:", err)
return
}
@ -250,12 +250,12 @@ func (ins *Instance) gatherOnce(slist *list.SafeList) {
// get cat/master information here so NodeStats can determine
// whether this node is the Master
if info.masterID, err = ins.getCatMaster(s + "/_cat/master"); err != nil {
slist.PushFront(types.NewSample("up", 0, ins.Labels))
slist.PushFront(types.NewSample("up", 0, map[string]string{"address": s}, ins.Labels))
log.Println("E! failed to get cat master:", err)
return
}
slist.PushFront(types.NewSample("up", 1, ins.Labels))
slist.PushFront(types.NewSample("up", 1, map[string]string{"address": s}, ins.Labels))
ins.serverInfoMutex.Lock()
ins.serverInfo[s] = info
ins.serverInfoMutex.Unlock()
@ -273,7 +273,7 @@ func (ins *Instance) gatherOnce(slist *list.SafeList) {
url := ins.nodeStatsURL(s)
// Always gather node stats
if err := ins.gatherNodeStats(url, slist); err != nil {
if err := ins.gatherNodeStats(url, s, slist); err != nil {
log.Println("E! failed to gather node stats:", err)
return
}
@ -283,14 +283,14 @@ func (ins *Instance) gatherOnce(slist *list.SafeList) {
if ins.ClusterHealthLevel != "" {
url = url + "?level=" + ins.ClusterHealthLevel
}
if err := ins.gatherClusterHealth(url, slist); err != nil {
if err := ins.gatherClusterHealth(url, s, slist); err != nil {
log.Println("E! failed to gather cluster health:", err)
return
}
}
if ins.ClusterStats && (ins.serverInfo[s].isMaster() || !ins.Local) {
if err := ins.gatherClusterStats(s+"/_cluster/stats", slist); err != nil {
if err := ins.gatherClusterStats(s+"/_cluster/stats", s, slist); err != nil {
log.Println("E! failed to gather cluster stats:", err)
return
}
@ -298,12 +298,12 @@ func (ins *Instance) gatherOnce(slist *list.SafeList) {
if len(ins.IndicesInclude) > 0 && (ins.serverInfo[s].isMaster() || !ins.Local) {
if ins.IndicesLevel != "shards" {
if err := ins.gatherIndicesStats(s+"/"+strings.Join(ins.IndicesInclude, ",")+"/_stats", slist); err != nil {
if err := ins.gatherIndicesStats(s+"/"+strings.Join(ins.IndicesInclude, ",")+"/_stats", s, slist); err != nil {
log.Println("E! failed to gather indices stats:", err)
return
}
} else {
if err := ins.gatherIndicesStats(s+"/"+strings.Join(ins.IndicesInclude, ",")+"/_stats?level=shards", slist); err != nil {
if err := ins.gatherIndicesStats(s+"/"+strings.Join(ins.IndicesInclude, ",")+"/_stats?level=shards", s, slist); err != nil {
log.Println("E! failed to gather indices stats:", err)
return
}
@ -315,7 +315,7 @@ func (ins *Instance) gatherOnce(slist *list.SafeList) {
wg.Wait()
}
func (ins *Instance) gatherIndicesStats(url string, slist *list.SafeList) error {
func (ins *Instance) gatherIndicesStats(url string, address string, slist *list.SafeList) error {
indicesStats := &struct {
Shards map[string]interface{} `json:"_shards"`
All map[string]interface{} `json:"_all"`
@ -326,9 +326,11 @@ func (ins *Instance) gatherIndicesStats(url string, slist *list.SafeList) error
return err
}
addrTag := map[string]string{"address": address}
// Total Shards Stats
for k, v := range indicesStats.Shards {
slist.PushFront(types.NewSample("indices_stats_shards_total_"+k, v, ins.Labels))
slist.PushFront(types.NewSample("indices_stats_shards_total_"+k, v, addrTag, ins.Labels))
}
// All Stats
@ -340,16 +342,16 @@ func (ins *Instance) gatherIndicesStats(url string, slist *list.SafeList) error
return err
}
for key, val := range jsonParser.Fields {
slist.PushFront(types.NewSample("indices_stats_"+m+"_"+key, val, map[string]string{"index_name": "_all"}, ins.Labels))
slist.PushFront(types.NewSample("indices_stats_"+m+"_"+key, val, map[string]string{"index_name": "_all"}, addrTag, ins.Labels))
}
}
// Gather stats for each index.
return ins.gatherIndividualIndicesStats(indicesStats.Indices, slist)
return ins.gatherIndividualIndicesStats(indicesStats.Indices, addrTag, slist)
}
// gatherSortedIndicesStats gathers stats for all indices in no particular order.
func (ins *Instance) gatherIndividualIndicesStats(indices map[string]indexStat, slist *list.SafeList) error {
func (ins *Instance) gatherIndividualIndicesStats(indices map[string]indexStat, addrTag map[string]string, slist *list.SafeList) error {
// Sort indices into buckets based on their configured prefix, if any matches.
categorizedIndexNames := ins.categorizeIndices(indices)
for _, matchingIndices := range categorizedIndexNames {
@ -369,7 +371,7 @@ func (ins *Instance) gatherIndividualIndicesStats(indices map[string]indexStat,
for i := indicesCount - 1; i >= indicesCount-indicesToTrackCount; i-- {
indexName := matchingIndices[i]
err := ins.gatherSingleIndexStats(indexName, indices[indexName], slist)
err := ins.gatherSingleIndexStats(indexName, indices[indexName], addrTag, slist)
if err != nil {
return err
}
@ -379,7 +381,7 @@ func (ins *Instance) gatherIndividualIndicesStats(indices map[string]indexStat,
return nil
}
func (ins *Instance) gatherSingleIndexStats(name string, index indexStat, slist *list.SafeList) error {
func (ins *Instance) gatherSingleIndexStats(name string, index indexStat, addrTag map[string]string, slist *list.SafeList) error {
indexTag := map[string]string{"index_name": name}
stats := map[string]interface{}{
"primaries": index.Primaries,
@ -393,7 +395,7 @@ func (ins *Instance) gatherSingleIndexStats(name string, index indexStat, slist
return err
}
for key, val := range f.Fields {
slist.PushFront(types.NewSample("indices_stats_"+m+"_"+key, val, indexTag, ins.Labels))
slist.PushFront(types.NewSample("indices_stats_"+m+"_"+key, val, indexTag, addrTag, ins.Labels))
}
}
@ -436,7 +438,7 @@ func (ins *Instance) gatherSingleIndexStats(name string, index indexStat, slist
}
for key, val := range flattened.Fields {
slist.PushFront(types.NewSample("indices_stats_shards_"+key, val, shardTags, ins.Labels))
slist.PushFront(types.NewSample("indices_stats_shards_"+key, val, shardTags, addrTag, ins.Labels))
}
}
}
@ -475,16 +477,17 @@ func (ins *Instance) categorizeIndices(indices map[string]indexStat) map[string]
return categorizedIndexNames
}
func (ins *Instance) gatherClusterStats(url string, slist *list.SafeList) error {
func (ins *Instance) gatherClusterStats(url string, address string, slist *list.SafeList) error {
clusterStats := &clusterStats{}
if err := ins.gatherJSONData(url, clusterStats); err != nil {
return err
}
tags := map[string]string{
"node_name": clusterStats.NodeName,
"cluster_name": clusterStats.ClusterName,
// "node_name": clusterStats.NodeName,
// "status": clusterStats.Status,
"cluster_name": clusterStats.ClusterName,
"address": address,
}
stats := map[string]interface{}{
@ -508,12 +511,14 @@ func (ins *Instance) gatherClusterStats(url string, slist *list.SafeList) error
return nil
}
func (ins *Instance) gatherClusterHealth(url string, slist *list.SafeList) error {
func (ins *Instance) gatherClusterHealth(url string, address string, slist *list.SafeList) error {
healthStats := &clusterHealth{}
if err := ins.gatherJSONData(url, healthStats); err != nil {
return err
}
addrTag := map[string]string{"address": address}
clusterFields := map[string]interface{}{
"cluster_health_active_primary_shards": healthStats.ActivePrimaryShards,
"cluster_health_active_shards": healthStats.ActiveShards,
@ -531,7 +536,7 @@ func (ins *Instance) gatherClusterHealth(url string, slist *list.SafeList) error
"cluster_health_unassigned_shards": healthStats.UnassignedShards,
}
types.PushSamples(slist, clusterFields, map[string]string{"name": healthStats.ClusterName}, ins.Labels)
types.PushSamples(slist, clusterFields, map[string]string{"cluster_name": healthStats.ClusterName}, addrTag, ins.Labels)
for name, health := range healthStats.Indices {
indexFields := map[string]interface{}{
@ -544,13 +549,13 @@ func (ins *Instance) gatherClusterHealth(url string, slist *list.SafeList) error
"cluster_health_indices_status_code": mapHealthStatusToCode(health.Status),
"cluster_health_indices_unassigned_shards": health.UnassignedShards,
}
types.PushSamples(slist, indexFields, map[string]string{"index": name, "name": healthStats.ClusterName}, ins.Labels)
types.PushSamples(slist, indexFields, map[string]string{"index": name, "name": healthStats.ClusterName}, addrTag, ins.Labels)
}
return nil
}
func (ins *Instance) gatherNodeStats(url string, slist *list.SafeList) error {
func (ins *Instance) gatherNodeStats(url string, address string, slist *list.SafeList) error {
nodeStats := &struct {
ClusterName string `json:"cluster_name"`
Nodes map[string]*nodeStat `json:"nodes"`
@ -560,6 +565,8 @@ func (ins *Instance) gatherNodeStats(url string, slist *list.SafeList) error {
return err
}
addrTag := map[string]string{"address": address}
for id, n := range nodeStats.Nodes {
// sort.Strings(n.Roles)
tags := map[string]string{
@ -571,7 +578,7 @@ func (ins *Instance) gatherNodeStats(url string, slist *list.SafeList) error {
}
for k, v := range n.Attributes {
slist.PushFront(types.NewSample("node_attribute_"+k, v, tags, ins.Labels))
slist.PushFront(types.NewSample("node_attribute_"+k, v, tags, addrTag, ins.Labels))
}
stats := map[string]interface{}{
@ -600,7 +607,7 @@ func (ins *Instance) gatherNodeStats(url string, slist *list.SafeList) error {
}
for key, val := range f.Fields {
slist.PushFront(types.NewSample(p+"_"+key, val, tags, ins.Labels))
slist.PushFront(types.NewSample(p+"_"+key, val, tags, addrTag, ins.Labels))
}
}
}

View File

@ -36,3 +36,7 @@ ips = [
## 自定义 oid
`[[instances.customs]]` 部分可以配置多个,表示自定义 oid默认情况下该插件采集的都是设备各个网口的监控数据以及CPU和内存的使用率如果要采集别的 oid就需要使用这个自定义功能
## 监控大盘
社区有小伙伴帮忙做了一个监控大盘,就在该 README 同级目录下,大家可以导入夜莺使用

View File

@ -0,0 +1,145 @@
{
"name": "通用交换机",
"tags": "",
"configs": {
"var": [
{
"name": "IP",
"definition": "label_values(switch_legacy_if_in,ip)",
"multi": false
},
{
"definition": "label_values(switch_legacy_if_in,ifname)",
"name": "Interface"
}
],
"panels": [
{
"targets": [
{
"refId": "A",
"expr": "switch_legacy_cpu_util{ip=\"$IP\"}",
"legend": "cpu_util"
}
],
"name": "CPU",
"options": {
"tooltip": {
"mode": "all",
"sort": "none"
},
"legend": {
"displayMode": "list",
"placement": "bottom"
},
"standardOptions": {
"util": "percent"
},
"thresholds": {}
},
"custom": {
"drawStyle": "lines",
"lineInterpolation": "smooth",
"fillOpacity": 0.5,
"stack": "off"
},
"version": "2.0.0",
"type": "timeseries",
"layout": {
"h": 4,
"w": 11,
"x": 1,
"y": 0,
"i": "23220dc3-ab9a-40f9-b1d3-135bb3bbb734",
"isResizable": true
},
"id": "23220dc3-ab9a-40f9-b1d3-135bb3bbb734"
},
{
"targets": [
{
"refId": "A",
"expr": "switch_legacy_mem_util{ip=\"$IP\"}",
"legend": "mem_util"
}
],
"name": "内存",
"options": {
"tooltip": {
"mode": "all",
"sort": "none"
},
"legend": {
"displayMode": "list",
"placement": "bottom"
},
"standardOptions": {
"util": "percent"
},
"thresholds": {}
},
"custom": {
"drawStyle": "lines",
"lineInterpolation": "smooth",
"fillOpacity": 0.5,
"stack": "off"
},
"version": "2.0.0",
"type": "timeseries",
"layout": {
"h": 4,
"w": 11,
"x": 12,
"y": 0,
"i": "7f36ea1c-fd51-43bf-93ab-2787d630c530",
"isResizable": true
},
"id": "7f36ea1c-fd51-43bf-93ab-2787d630c530"
},
{
"targets": [
{
"refId": "A",
"expr": "switch_legacy_if_in{ip=\"$IP\"}",
"legend": "入流量"
},
{
"expr": "switch_legacy_if_out{ip=\"$IP\"}",
"refId": "B",
"legend": "出流量"
}
],
"name": "端口流量",
"options": {
"tooltip": {
"mode": "all",
"sort": "none"
},
"legend": {
"displayMode": "hidden"
},
"standardOptions": {},
"thresholds": {}
},
"custom": {
"drawStyle": "lines",
"lineInterpolation": "smooth",
"fillOpacity": 0.5,
"stack": "off"
},
"version": "2.0.0",
"type": "timeseries",
"layout": {
"h": 4,
"w": 22,
"x": 1,
"y": 4,
"i": "eec57d38-9a9f-4530-a0b5-e524d867759d",
"isResizable": true
},
"id": "eec57d38-9a9f-4530-a0b5-e524d867759d"
}
],
"version": "2.0.0"
}
}

View File

@ -84,8 +84,7 @@ func (p *Parser) Parse(buf []byte, slist *list.SafeList) error {
} else if mf.GetType() == dto.MetricType_HISTOGRAM {
p.HandleHistogram(m, tags, metricName, slist)
} else {
fields := getNameAndValue(m, metricName)
types.PushSamples(slist, fields, tags)
p.handleGaugeCounter(m, tags, metricName, slist)
}
}
}
@ -114,6 +113,13 @@ func (p *Parser) HandleHistogram(m *dto.Metric, tags map[string]string, metricNa
}
}
func (p *Parser) handleGaugeCounter(m *dto.Metric, tags map[string]string, metricName string, slist *list.SafeList) {
fields := getNameAndValue(m, metricName)
for metric, value := range fields {
slist.PushFront(types.NewSample(prom.BuildMetric(p.NamePrefix, metric, ""), value, tags))
}
}
// Get labels from metric
func (p *Parser) makeLabels(m *dto.Metric) map[string]string {
result := map[string]string{}

View File

@ -2,7 +2,6 @@ package jsonx
import (
"fmt"
"strconv"
)
type JSONFlattener struct {
@ -45,16 +44,17 @@ func (f *JSONFlattener) FullFlattenJSON(
}
}
case []interface{}:
for i, v := range t {
fieldkey := strconv.Itoa(i)
if fieldname != "" {
fieldkey = fieldname + "_" + fieldkey
}
err := f.FullFlattenJSON(fieldkey, v, convertString, convertBool)
if err != nil {
return err
}
}
// for i, v := range t {
// fieldkey := strconv.Itoa(i)
// if fieldname != "" {
// fieldkey = fieldname + "_" + fieldkey
// }
// err := f.FullFlattenJSON(fieldkey, v, convertString, convertBool)
// if err != nil {
// return err
// }
// }
return nil
case float64:
f.Fields[fieldname] = t
case string:

View File

@ -1,8 +1,6 @@
package types
import (
"log"
"reflect"
"time"
"flashcat.cloud/categraf/pkg/conv"
@ -19,7 +17,6 @@ type Sample struct {
func NewSample(metric string, value interface{}, labels ...map[string]string) *Sample {
floatValue, err := conv.ToFloat64(value)
if err != nil {
log.Printf("E! can not convert value type %v to float: %v\n", reflect.TypeOf(value), err)
return nil
}
@ -58,10 +55,6 @@ func NewSamples(fields map[string]interface{}, labels ...map[string]string) []*S
func PushSamples(slist *list.SafeList, fields map[string]interface{}, labels ...map[string]string) {
for metric, value := range fields {
floatValue, err := conv.ToFloat64(value)
if err != nil {
continue
}
slist.PushFront(NewSample(metric, floatValue, labels...))
slist.PushFront(NewSample(metric, value, labels...))
}
}