add es plugin, beta version
This commit is contained in:
parent
d13ef2d3d9
commit
d88c8fccb4
|
@ -11,8 +11,6 @@
|
|||
# labels = { cluster="cloud-n9e-es" }
|
||||
|
||||
## specify a list of one or more Elasticsearch servers
|
||||
## you can add username and password to your url to use basic authentication:
|
||||
## servers = ["http://user:pass@localhost:9200"]
|
||||
servers = ["http://localhost:9200"]
|
||||
|
||||
## Timeout for HTTP requests to the elastic search server(s)
|
||||
|
@ -35,9 +33,6 @@ cluster_health = false
|
|||
## Set cluster_stats to true when you want to obtain cluster stats.
|
||||
cluster_stats = false
|
||||
|
||||
## Only gather cluster_stats from the master node. To work this require local = true
|
||||
cluster_stats_only_from_master = true
|
||||
|
||||
## Indices to collect; can be one or more indices names or _all
|
||||
## Use of wildcards is allowed. Use a wildcard at the end to retrieve index names that end with a changing value, like a date.
|
||||
indices_include = ["_all"]
|
||||
|
|
|
@ -0,0 +1,2 @@
|
|||
# elasticsearch
|
||||
|
|
@ -0,0 +1,721 @@
|
|||
package elasticsearch
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"flashcat.cloud/categraf/config"
|
||||
"flashcat.cloud/categraf/inputs"
|
||||
"flashcat.cloud/categraf/pkg/filter"
|
||||
"flashcat.cloud/categraf/pkg/jsonx"
|
||||
"flashcat.cloud/categraf/pkg/tls"
|
||||
"flashcat.cloud/categraf/types"
|
||||
"github.com/toolkits/pkg/container/list"
|
||||
)
|
||||
|
||||
const inputName = "elasticsearch"
|
||||
|
||||
// Nodestats are always generated, so simply define a constant for these endpoints
|
||||
const statsPath = "/_nodes/stats"
|
||||
const statsPathLocal = "/_nodes/_local/stats"
|
||||
|
||||
type nodeStat struct {
|
||||
Host string `json:"host"`
|
||||
Name string `json:"name"`
|
||||
Roles []string `json:"roles"`
|
||||
Attributes map[string]string `json:"attributes"`
|
||||
Indices interface{} `json:"indices"`
|
||||
OS interface{} `json:"os"`
|
||||
Process interface{} `json:"process"`
|
||||
JVM interface{} `json:"jvm"`
|
||||
ThreadPool interface{} `json:"thread_pool"`
|
||||
FS interface{} `json:"fs"`
|
||||
Transport interface{} `json:"transport"`
|
||||
HTTP interface{} `json:"http"`
|
||||
Breakers interface{} `json:"breakers"`
|
||||
}
|
||||
|
||||
type clusterHealth struct {
|
||||
ActivePrimaryShards int `json:"active_primary_shards"`
|
||||
ActiveShards int `json:"active_shards"`
|
||||
ActiveShardsPercentAsNumber float64 `json:"active_shards_percent_as_number"`
|
||||
ClusterName string `json:"cluster_name"`
|
||||
DelayedUnassignedShards int `json:"delayed_unassigned_shards"`
|
||||
InitializingShards int `json:"initializing_shards"`
|
||||
NumberOfDataNodes int `json:"number_of_data_nodes"`
|
||||
NumberOfInFlightFetch int `json:"number_of_in_flight_fetch"`
|
||||
NumberOfNodes int `json:"number_of_nodes"`
|
||||
NumberOfPendingTasks int `json:"number_of_pending_tasks"`
|
||||
RelocatingShards int `json:"relocating_shards"`
|
||||
Status string `json:"status"`
|
||||
TaskMaxWaitingInQueueMillis int `json:"task_max_waiting_in_queue_millis"`
|
||||
TimedOut bool `json:"timed_out"`
|
||||
UnassignedShards int `json:"unassigned_shards"`
|
||||
Indices map[string]indexHealth `json:"indices"`
|
||||
}
|
||||
|
||||
type indexHealth struct {
|
||||
ActivePrimaryShards int `json:"active_primary_shards"`
|
||||
ActiveShards int `json:"active_shards"`
|
||||
InitializingShards int `json:"initializing_shards"`
|
||||
NumberOfReplicas int `json:"number_of_replicas"`
|
||||
NumberOfShards int `json:"number_of_shards"`
|
||||
RelocatingShards int `json:"relocating_shards"`
|
||||
Status string `json:"status"`
|
||||
UnassignedShards int `json:"unassigned_shards"`
|
||||
}
|
||||
|
||||
type clusterStats struct {
|
||||
NodeName string `json:"node_name"`
|
||||
ClusterName string `json:"cluster_name"`
|
||||
Status string `json:"status"`
|
||||
Indices interface{} `json:"indices"`
|
||||
Nodes interface{} `json:"nodes"`
|
||||
}
|
||||
|
||||
type indexStat struct {
|
||||
Primaries interface{} `json:"primaries"`
|
||||
Total interface{} `json:"total"`
|
||||
Shards map[string][]interface{} `json:"shards"`
|
||||
}
|
||||
|
||||
type Elasticsearch struct {
|
||||
config.Interval
|
||||
counter uint64
|
||||
waitgrp sync.WaitGroup
|
||||
Instances []*Instance `toml:"instances"`
|
||||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add(inputName, func() inputs.Input {
|
||||
return &Elasticsearch{}
|
||||
})
|
||||
}
|
||||
|
||||
func (r *Elasticsearch) Prefix() string {
|
||||
return inputName
|
||||
}
|
||||
|
||||
func (r *Elasticsearch) Init() error {
|
||||
if len(r.Instances) == 0 {
|
||||
return types.ErrInstancesEmpty
|
||||
}
|
||||
|
||||
for i := 0; i < len(r.Instances); i++ {
|
||||
if err := r.Instances[i].Init(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Elasticsearch) Drop() {}
|
||||
|
||||
func (r *Elasticsearch) Gather(slist *list.SafeList) {
|
||||
atomic.AddUint64(&r.counter, 1)
|
||||
|
||||
for i := range r.Instances {
|
||||
ins := r.Instances[i]
|
||||
|
||||
r.waitgrp.Add(1)
|
||||
go func(slist *list.SafeList, ins *Instance) {
|
||||
defer r.waitgrp.Done()
|
||||
|
||||
if ins.IntervalTimes > 0 {
|
||||
counter := atomic.LoadUint64(&r.counter)
|
||||
if counter%uint64(ins.IntervalTimes) != 0 {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
ins.gatherOnce(slist)
|
||||
}(slist, ins)
|
||||
}
|
||||
|
||||
r.waitgrp.Wait()
|
||||
}
|
||||
|
||||
type Instance struct {
|
||||
Labels map[string]string `toml:"labels"`
|
||||
IntervalTimes int64 `toml:"interval_times"`
|
||||
|
||||
Local bool `toml:"local"`
|
||||
Servers []string `toml:"servers"`
|
||||
HTTPTimeout config.Duration `toml:"http_timeout"`
|
||||
ClusterHealth bool `toml:"cluster_health"`
|
||||
ClusterHealthLevel string `toml:"cluster_health_level"`
|
||||
ClusterStats bool `toml:"cluster_stats"`
|
||||
IndicesInclude []string `toml:"indices_include"`
|
||||
IndicesLevel string `toml:"indices_level"`
|
||||
NodeStats []string `toml:"node_stats"`
|
||||
Username string `toml:"username"`
|
||||
Password string `toml:"password"`
|
||||
NumMostRecentIndices int `toml:"num_most_recent_indices"`
|
||||
|
||||
tls.ClientConfig
|
||||
client *http.Client
|
||||
indexMatchers map[string]filter.Filter
|
||||
serverInfo map[string]serverInfo
|
||||
serverInfoMutex sync.Mutex
|
||||
}
|
||||
|
||||
type serverInfo struct {
|
||||
nodeID string
|
||||
masterID string
|
||||
}
|
||||
|
||||
func (i serverInfo) isMaster() bool {
|
||||
return i.nodeID == i.masterID
|
||||
}
|
||||
|
||||
func (ins *Instance) Init() error {
|
||||
// Compile the configured indexes to match for sorting.
|
||||
indexMatchers, err := ins.compileIndexMatchers()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ins.indexMatchers = indexMatchers
|
||||
ins.client, err = ins.createHTTPClient()
|
||||
return err
|
||||
}
|
||||
|
||||
func (ins *Instance) compileIndexMatchers() (map[string]filter.Filter, error) {
|
||||
indexMatchers := map[string]filter.Filter{}
|
||||
var err error
|
||||
|
||||
// Compile each configured index into a glob matcher.
|
||||
for _, configuredIndex := range ins.IndicesInclude {
|
||||
if _, exists := indexMatchers[configuredIndex]; !exists {
|
||||
indexMatchers[configuredIndex], err = filter.Compile([]string{configuredIndex})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return indexMatchers, nil
|
||||
}
|
||||
|
||||
func (ins *Instance) gatherOnce(slist *list.SafeList) {
|
||||
if ins.ClusterStats || len(ins.IndicesInclude) > 0 || len(ins.IndicesLevel) > 0 {
|
||||
var wgC sync.WaitGroup
|
||||
wgC.Add(len(ins.Servers))
|
||||
|
||||
ins.serverInfo = make(map[string]serverInfo)
|
||||
for _, serv := range ins.Servers {
|
||||
go func(s string, slist *list.SafeList) {
|
||||
defer wgC.Done()
|
||||
info := serverInfo{}
|
||||
|
||||
var err error
|
||||
|
||||
// Gather node ID
|
||||
if info.nodeID, err = ins.gatherNodeID(s + "/_nodes/_local/name"); err != nil {
|
||||
log.Println("E! failed to gather node id:", err)
|
||||
return
|
||||
}
|
||||
|
||||
// get cat/master information here so NodeStats can determine
|
||||
// whether this node is the Master
|
||||
if info.masterID, err = ins.getCatMaster(s + "/_cat/master"); err != nil {
|
||||
log.Println("E! failed to get cat master:", err)
|
||||
return
|
||||
}
|
||||
|
||||
ins.serverInfoMutex.Lock()
|
||||
ins.serverInfo[s] = info
|
||||
ins.serverInfoMutex.Unlock()
|
||||
}(serv, slist)
|
||||
}
|
||||
wgC.Wait()
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(ins.Servers))
|
||||
|
||||
for _, serv := range ins.Servers {
|
||||
go func(s string, slist *list.SafeList) {
|
||||
defer wg.Done()
|
||||
url := ins.nodeStatsURL(s)
|
||||
|
||||
// Always gather node stats
|
||||
if err := ins.gatherNodeStats(url, slist); err != nil {
|
||||
log.Println("E! failed to gather node stats:", err)
|
||||
return
|
||||
}
|
||||
|
||||
if ins.ClusterHealth {
|
||||
url = s + "/_cluster/health"
|
||||
if ins.ClusterHealthLevel != "" {
|
||||
url = url + "?level=" + ins.ClusterHealthLevel
|
||||
}
|
||||
if err := ins.gatherClusterHealth(url, slist); err != nil {
|
||||
log.Println("E! failed to gather cluster health:", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if ins.ClusterStats && (ins.serverInfo[s].isMaster() || !ins.Local) {
|
||||
if err := ins.gatherClusterStats(s+"/_cluster/stats", slist); err != nil {
|
||||
log.Println("E! failed to gather cluster stats:", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if len(ins.IndicesInclude) > 0 && (ins.serverInfo[s].isMaster() || !ins.Local) {
|
||||
if ins.IndicesLevel != "shards" {
|
||||
if err := ins.gatherIndicesStats(s+"/"+strings.Join(ins.IndicesInclude, ",")+"/_stats", slist); err != nil {
|
||||
log.Println("E! failed to gather indices stats:", err)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
if err := ins.gatherIndicesStats(s+"/"+strings.Join(ins.IndicesInclude, ",")+"/_stats?level=shards", slist); err != nil {
|
||||
log.Println("E! failed to gather indices stats:", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}(serv, slist)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (ins *Instance) gatherIndicesStats(url string, slist *list.SafeList) error {
|
||||
indicesStats := &struct {
|
||||
Shards map[string]interface{} `json:"_shards"`
|
||||
All map[string]interface{} `json:"_all"`
|
||||
Indices map[string]indexStat `json:"indices"`
|
||||
}{}
|
||||
|
||||
if err := ins.gatherJSONData(url, indicesStats); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Total Shards Stats
|
||||
for k, v := range indicesStats.Shards {
|
||||
slist.PushFront(inputs.NewSample("indices_stats_shards_total_"+k, v, ins.Labels))
|
||||
}
|
||||
|
||||
// All Stats
|
||||
for m, s := range indicesStats.All {
|
||||
// parse Json, ignoring bools and excluding strings
|
||||
jsonParser := jsonx.JSONFlattener{}
|
||||
err := jsonParser.FullFlattenJSON("_", s, false, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for key, val := range jsonParser.Fields {
|
||||
slist.PushFront(inputs.NewSample("indices_stats_"+m+"_"+key, val, map[string]string{"index_name": "_all"}, ins.Labels))
|
||||
}
|
||||
}
|
||||
|
||||
// Gather stats for each index.
|
||||
return ins.gatherIndividualIndicesStats(indicesStats.Indices, slist)
|
||||
}
|
||||
|
||||
// gatherSortedIndicesStats gathers stats for all indices in no particular order.
|
||||
func (ins *Instance) gatherIndividualIndicesStats(indices map[string]indexStat, slist *list.SafeList) error {
|
||||
// Sort indices into buckets based on their configured prefix, if any matches.
|
||||
categorizedIndexNames := ins.categorizeIndices(indices)
|
||||
for _, matchingIndices := range categorizedIndexNames {
|
||||
// Establish the number of each category of indices to use. User can configure to use only the latest 'X' amount.
|
||||
indicesCount := len(matchingIndices)
|
||||
indicesToTrackCount := indicesCount
|
||||
|
||||
// Sort the indices if configured to do so.
|
||||
if ins.NumMostRecentIndices > 0 {
|
||||
if ins.NumMostRecentIndices < indicesToTrackCount {
|
||||
indicesToTrackCount = ins.NumMostRecentIndices
|
||||
}
|
||||
sort.Strings(matchingIndices)
|
||||
}
|
||||
|
||||
// Gather only the number of indexes that have been configured, in descending order (most recent, if date-stamped).
|
||||
for i := indicesCount - 1; i >= indicesCount-indicesToTrackCount; i-- {
|
||||
indexName := matchingIndices[i]
|
||||
|
||||
err := ins.gatherSingleIndexStats(indexName, indices[indexName], slist)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ins *Instance) gatherSingleIndexStats(name string, index indexStat, slist *list.SafeList) error {
|
||||
indexTag := map[string]string{"index_name": name}
|
||||
stats := map[string]interface{}{
|
||||
"primaries": index.Primaries,
|
||||
"total": index.Total,
|
||||
}
|
||||
for m, s := range stats {
|
||||
f := jsonx.JSONFlattener{}
|
||||
// parse Json, getting strings and bools
|
||||
err := f.FullFlattenJSON("", s, true, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for key, val := range f.Fields {
|
||||
slist.PushFront(inputs.NewSample("indices_stats_"+m+"_"+key, val, indexTag, ins.Labels))
|
||||
}
|
||||
}
|
||||
|
||||
if ins.IndicesLevel == "shards" {
|
||||
for shardNumber, shards := range index.Shards {
|
||||
for _, shard := range shards {
|
||||
// Get Shard Stats
|
||||
flattened := jsonx.JSONFlattener{}
|
||||
err := flattened.FullFlattenJSON("", shard, true, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// determine shard tag and primary/replica designation
|
||||
shardType := "replica"
|
||||
routingPrimary, _ := flattened.Fields["routing_primary"].(bool)
|
||||
if routingPrimary {
|
||||
shardType = "primary"
|
||||
}
|
||||
delete(flattened.Fields, "routing_primary")
|
||||
|
||||
routingState, ok := flattened.Fields["routing_state"].(string)
|
||||
if ok {
|
||||
flattened.Fields["routing_state"] = mapShardStatusToCode(routingState)
|
||||
}
|
||||
|
||||
routingNode, _ := flattened.Fields["routing_node"].(string)
|
||||
shardTags := map[string]string{
|
||||
"index_name": name,
|
||||
"node_id": routingNode,
|
||||
"shard_name": shardNumber,
|
||||
"type": shardType,
|
||||
}
|
||||
|
||||
for key, field := range flattened.Fields {
|
||||
switch field.(type) {
|
||||
case string, bool:
|
||||
delete(flattened.Fields, key)
|
||||
}
|
||||
}
|
||||
|
||||
for key, val := range flattened.Fields {
|
||||
slist.PushFront(inputs.NewSample("indices_stats_shards_"+key, val, shardTags, ins.Labels))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ins *Instance) categorizeIndices(indices map[string]indexStat) map[string][]string {
|
||||
categorizedIndexNames := map[string][]string{}
|
||||
|
||||
// If all indices are configured to be gathered, bucket them all together.
|
||||
if len(ins.IndicesInclude) == 0 || ins.IndicesInclude[0] == "_all" {
|
||||
for indexName := range indices {
|
||||
categorizedIndexNames["_all"] = append(categorizedIndexNames["_all"], indexName)
|
||||
}
|
||||
|
||||
return categorizedIndexNames
|
||||
}
|
||||
|
||||
// Bucket each returned index with its associated configured index (if any match).
|
||||
for indexName := range indices {
|
||||
match := indexName
|
||||
for name, matcher := range ins.indexMatchers {
|
||||
// If a configured index matches one of the returned indexes, mark it as a match.
|
||||
if matcher.Match(match) {
|
||||
match = name
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Bucket all matching indices together for sorting.
|
||||
categorizedIndexNames[match] = append(categorizedIndexNames[match], indexName)
|
||||
}
|
||||
|
||||
return categorizedIndexNames
|
||||
}
|
||||
|
||||
func (ins *Instance) gatherClusterStats(url string, slist *list.SafeList) error {
|
||||
clusterStats := &clusterStats{}
|
||||
if err := ins.gatherJSONData(url, clusterStats); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tags := map[string]string{
|
||||
"node_name": clusterStats.NodeName,
|
||||
"cluster_name": clusterStats.ClusterName,
|
||||
// "status": clusterStats.Status,
|
||||
}
|
||||
|
||||
stats := map[string]interface{}{
|
||||
"nodes": clusterStats.Nodes,
|
||||
"indices": clusterStats.Indices,
|
||||
}
|
||||
|
||||
for p, s := range stats {
|
||||
f := jsonx.JSONFlattener{}
|
||||
// parse json, including bools and excluding strings
|
||||
err := f.FullFlattenJSON("", s, false, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for key, val := range f.Fields {
|
||||
slist.PushFront(inputs.NewSample("clusterstats_"+p+"_"+key, val, tags, ins.Labels))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ins *Instance) gatherClusterHealth(url string, slist *list.SafeList) error {
|
||||
healthStats := &clusterHealth{}
|
||||
if err := ins.gatherJSONData(url, healthStats); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
clusterFields := map[string]interface{}{
|
||||
"cluster_health_active_primary_shards": healthStats.ActivePrimaryShards,
|
||||
"cluster_health_active_shards": healthStats.ActiveShards,
|
||||
"cluster_health_active_shards_percent_as_number": healthStats.ActiveShardsPercentAsNumber,
|
||||
"cluster_health_delayed_unassigned_shards": healthStats.DelayedUnassignedShards,
|
||||
"cluster_health_initializing_shards": healthStats.InitializingShards,
|
||||
"cluster_health_number_of_data_nodes": healthStats.NumberOfDataNodes,
|
||||
"cluster_health_number_of_in_flight_fetch": healthStats.NumberOfInFlightFetch,
|
||||
"cluster_health_number_of_nodes": healthStats.NumberOfNodes,
|
||||
"cluster_health_number_of_pending_tasks": healthStats.NumberOfPendingTasks,
|
||||
"cluster_health_relocating_shards": healthStats.RelocatingShards,
|
||||
"cluster_health_status_code": mapHealthStatusToCode(healthStats.Status),
|
||||
"cluster_health_task_max_waiting_in_queue_millis": healthStats.TaskMaxWaitingInQueueMillis,
|
||||
"cluster_health_timed_out": healthStats.TimedOut,
|
||||
"cluster_health_unassigned_shards": healthStats.UnassignedShards,
|
||||
}
|
||||
|
||||
inputs.PushSamples(slist, clusterFields, map[string]string{"name": healthStats.ClusterName}, ins.Labels)
|
||||
|
||||
for name, health := range healthStats.Indices {
|
||||
indexFields := map[string]interface{}{
|
||||
"cluster_health_indices_active_primary_shards": health.ActivePrimaryShards,
|
||||
"cluster_health_indices_active_shards": health.ActiveShards,
|
||||
"cluster_health_indices_initializing_shards": health.InitializingShards,
|
||||
"cluster_health_indices_number_of_replicas": health.NumberOfReplicas,
|
||||
"cluster_health_indices_number_of_shards": health.NumberOfShards,
|
||||
"cluster_health_indices_relocating_shards": health.RelocatingShards,
|
||||
"cluster_health_indices_status_code": mapHealthStatusToCode(health.Status),
|
||||
"cluster_health_indices_unassigned_shards": health.UnassignedShards,
|
||||
}
|
||||
inputs.PushSamples(slist, indexFields, map[string]string{"index": name, "name": healthStats.ClusterName}, ins.Labels)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ins *Instance) gatherNodeStats(url string, slist *list.SafeList) error {
|
||||
nodeStats := &struct {
|
||||
ClusterName string `json:"cluster_name"`
|
||||
Nodes map[string]*nodeStat `json:"nodes"`
|
||||
}{}
|
||||
|
||||
if err := ins.gatherJSONData(url, nodeStats); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for id, n := range nodeStats.Nodes {
|
||||
sort.Strings(n.Roles)
|
||||
tags := map[string]string{
|
||||
"node_id": id,
|
||||
"node_host": n.Host,
|
||||
"node_name": n.Name,
|
||||
"cluster_name": nodeStats.ClusterName,
|
||||
"node_roles": strings.Join(n.Roles, ","),
|
||||
}
|
||||
|
||||
for k, v := range n.Attributes {
|
||||
tags["node_attribute_"+k] = v
|
||||
}
|
||||
|
||||
stats := map[string]interface{}{
|
||||
"indices": n.Indices,
|
||||
"os": n.OS,
|
||||
"process": n.Process,
|
||||
"jvm": n.JVM,
|
||||
"thread_pool": n.ThreadPool,
|
||||
"fs": n.FS,
|
||||
"transport": n.Transport,
|
||||
"http": n.HTTP,
|
||||
"breakers": n.Breakers,
|
||||
}
|
||||
|
||||
for p, s := range stats {
|
||||
// if one of the individual node stats is not even in the
|
||||
// original result
|
||||
if s == nil {
|
||||
continue
|
||||
}
|
||||
f := jsonx.JSONFlattener{}
|
||||
// parse Json, ignoring strings and bools
|
||||
err := f.FlattenJSON("", s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for key, val := range f.Fields {
|
||||
slist.PushFront(inputs.NewSample(p+"_"+key, val, tags, ins.Labels))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ins *Instance) nodeStatsURL(baseURL string) string {
|
||||
var url string
|
||||
|
||||
if ins.Local {
|
||||
url = baseURL + statsPathLocal
|
||||
} else {
|
||||
url = baseURL + statsPath
|
||||
}
|
||||
|
||||
if len(ins.NodeStats) == 0 {
|
||||
return url
|
||||
}
|
||||
|
||||
return fmt.Sprintf("%s/%s", url, strings.Join(ins.NodeStats, ","))
|
||||
}
|
||||
|
||||
func (ins *Instance) getCatMaster(url string) (string, error) {
|
||||
req, err := http.NewRequest("GET", url, nil)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if ins.Username != "" || ins.Password != "" {
|
||||
req.SetBasicAuth(ins.Username, ins.Password)
|
||||
}
|
||||
|
||||
r, err := ins.client.Do(req)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer r.Body.Close()
|
||||
if r.StatusCode != http.StatusOK {
|
||||
// NOTE: we are not going to read/discard r.Body under the assumption we'd prefer
|
||||
// to let the underlying transport close the connection and re-establish a new one for
|
||||
// future calls.
|
||||
return "", fmt.Errorf("elasticsearch: Unable to retrieve master node information. API responded with status-code %d, expected %d", r.StatusCode, http.StatusOK)
|
||||
}
|
||||
response, err := io.ReadAll(r.Body)
|
||||
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
masterID := strings.Split(string(response), " ")[0]
|
||||
|
||||
return masterID, nil
|
||||
}
|
||||
|
||||
func (ins *Instance) gatherNodeID(url string) (string, error) {
|
||||
nodeStats := &struct {
|
||||
ClusterName string `json:"cluster_name"`
|
||||
Nodes map[string]*nodeStat `json:"nodes"`
|
||||
}{}
|
||||
if err := ins.gatherJSONData(url, nodeStats); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// Only 1 should be returned
|
||||
for id := range nodeStats.Nodes {
|
||||
return id, nil
|
||||
}
|
||||
return "", nil
|
||||
}
|
||||
|
||||
func (ins *Instance) gatherJSONData(url string, v interface{}) error {
|
||||
req, err := http.NewRequest("GET", url, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if ins.Username != "" || ins.Password != "" {
|
||||
req.SetBasicAuth(ins.Username, ins.Password)
|
||||
}
|
||||
|
||||
r, err := ins.client.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer r.Body.Close()
|
||||
if r.StatusCode != http.StatusOK {
|
||||
// NOTE: we are not going to read/discard r.Body under the assumption we'd prefer
|
||||
// to let the underlying transport close the connection and re-establish a new one for
|
||||
// future calls.
|
||||
return fmt.Errorf("elasticsearch: API responded with status-code %d, expected %d",
|
||||
r.StatusCode, http.StatusOK)
|
||||
}
|
||||
|
||||
return json.NewDecoder(r.Body).Decode(v)
|
||||
}
|
||||
|
||||
// perform status mapping
|
||||
func mapHealthStatusToCode(s string) int {
|
||||
switch strings.ToLower(s) {
|
||||
case "green":
|
||||
return 1
|
||||
case "yellow":
|
||||
return 2
|
||||
case "red":
|
||||
return 3
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// perform shard status mapping
|
||||
func mapShardStatusToCode(s string) int {
|
||||
switch strings.ToUpper(s) {
|
||||
case "UNASSIGNED":
|
||||
return 1
|
||||
case "INITIALIZING":
|
||||
return 2
|
||||
case "STARTED":
|
||||
return 3
|
||||
case "RELOCATING":
|
||||
return 4
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (ins *Instance) createHTTPClient() (*http.Client, error) {
|
||||
tlsCfg, err := ins.ClientConfig.TLSConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tr := &http.Transport{
|
||||
ResponseHeaderTimeout: time.Duration(ins.HTTPTimeout),
|
||||
TLSClientConfig: tlsCfg,
|
||||
}
|
||||
|
||||
client := &http.Client{
|
||||
Transport: tr,
|
||||
Timeout: time.Duration(ins.HTTPTimeout),
|
||||
}
|
||||
|
||||
return client, nil
|
||||
}
|
|
@ -0,0 +1,77 @@
|
|||
package jsonx
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
type JSONFlattener struct {
|
||||
Fields map[string]interface{}
|
||||
}
|
||||
|
||||
// FlattenJSON flattens nested maps/interfaces into a fields map (ignoring bools and string)
|
||||
func (f *JSONFlattener) FlattenJSON(
|
||||
fieldname string,
|
||||
v interface{}) error {
|
||||
if f.Fields == nil {
|
||||
f.Fields = make(map[string]interface{})
|
||||
}
|
||||
|
||||
return f.FullFlattenJSON(fieldname, v, false, false)
|
||||
}
|
||||
|
||||
// FullFlattenJSON flattens nested maps/interfaces into a fields map (including bools and string)
|
||||
func (f *JSONFlattener) FullFlattenJSON(
|
||||
fieldname string,
|
||||
v interface{},
|
||||
convertString bool,
|
||||
convertBool bool,
|
||||
) error {
|
||||
if f.Fields == nil {
|
||||
f.Fields = make(map[string]interface{})
|
||||
}
|
||||
|
||||
switch t := v.(type) {
|
||||
case map[string]interface{}:
|
||||
for k, v := range t {
|
||||
fieldkey := k
|
||||
if fieldname != "" {
|
||||
fieldkey = fieldname + "_" + fieldkey
|
||||
}
|
||||
|
||||
err := f.FullFlattenJSON(fieldkey, v, convertString, convertBool)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
case []interface{}:
|
||||
for i, v := range t {
|
||||
fieldkey := strconv.Itoa(i)
|
||||
if fieldname != "" {
|
||||
fieldkey = fieldname + "_" + fieldkey
|
||||
}
|
||||
err := f.FullFlattenJSON(fieldkey, v, convertString, convertBool)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
case float64:
|
||||
f.Fields[fieldname] = t
|
||||
case string:
|
||||
if !convertString {
|
||||
return nil
|
||||
}
|
||||
f.Fields[fieldname] = v.(string)
|
||||
case bool:
|
||||
if !convertBool {
|
||||
return nil
|
||||
}
|
||||
f.Fields[fieldname] = v.(bool)
|
||||
case nil:
|
||||
return nil
|
||||
default:
|
||||
return fmt.Errorf("JSON Flattener: got unexpected type %T with value %v (%s)",
|
||||
t, t, fieldname)
|
||||
}
|
||||
return nil
|
||||
}
|
Loading…
Reference in New Issue