move types.PushSamples to inputs.PushSamples

This commit is contained in:
Ulric Qin 2022-07-12 14:44:27 +08:00
parent b9ed44788c
commit 5c8fdb026b
36 changed files with 1801 additions and 70 deletions

View File

@ -0,0 +1,57 @@
## Jolokia is bundled with ActiveMQ
[[inputs.jolokia2_agent]]
urls = ["http://localhost:8161/api/jolokia"]
name_prefix = "activemq."
username = "admin"
password = "admin"
### JVM Generic
[[inputs.jolokia2_agent.metric]]
name = "OperatingSystem"
mbean = "java.lang:type=OperatingSystem"
paths = ["ProcessCpuLoad","SystemLoadAverage","SystemCpuLoad"]
[[inputs.jolokia2_agent.metric]]
name = "jvm_runtime"
mbean = "java.lang:type=Runtime"
paths = ["Uptime"]
[[inputs.jolokia2_agent.metric]]
name = "jvm_memory"
mbean = "java.lang:type=Memory"
paths = ["HeapMemoryUsage", "NonHeapMemoryUsage", "ObjectPendingFinalizationCount"]
[[inputs.jolokia2_agent.metric]]
name = "jvm_garbage_collector"
mbean = "java.lang:name=*,type=GarbageCollector"
paths = ["CollectionTime", "CollectionCount"]
tag_keys = ["name"]
[[inputs.jolokia2_agent.metric]]
name = "jvm_memory_pool"
mbean = "java.lang:name=*,type=MemoryPool"
paths = ["Usage", "PeakUsage", "CollectionUsage"]
tag_keys = ["name"]
tag_prefix = "pool_"
### ACTIVEMQ
[[inputs.jolokia2_agent.metric]]
name = "queue"
mbean = "org.apache.activemq:brokerName=*,destinationName=*,destinationType=Queue,type=Broker"
paths = ["QueueSize","EnqueueCount","ConsumerCount","DispatchCount","DequeueCount","ProducerCount","InFlightCount"]
tag_keys = ["brokerName","destinationName"]
[[inputs.jolokia2_agent.metric]]
name = "topic"
mbean = "org.apache.activemq:brokerName=*,destinationName=*,destinationType=Topic,type=Broker"
paths = ["ProducerCount","DequeueCount","ConsumerCount","QueueSize","EnqueueCount"]
tag_keys = ["brokerName","destinationName"]
[[inputs.jolokia2_agent.metric]]
name = "broker"
mbean = "org.apache.activemq:brokerName=*,type=Broker"
paths = ["TotalConsumerCount","TotalMessageCount","TotalEnqueueCount","TotalDequeueCount","MemoryLimit","MemoryPercentUsage","StoreLimit","StorePercentUsage","TempPercentUsage","TempLimit"]
tag_keys = ["brokerName"]

View File

@ -0,0 +1,39 @@
[[inputs.jolokia2_agent]]
urls = ["http://localhost:8778/jolokia"]
name_prefix = "bitbucket."
[[inputs.jolokia2_agent.metric]]
name = "jvm_operatingsystem"
mbean = "java.lang:type=OperatingSystem"
[[inputs.jolokia2_agent.metric]]
name = "jvm_runtime"
mbean = "java.lang:type=Runtime"
[[inputs.jolokia2_agent.metric]]
name = "jvm_thread"
mbean = "java.lang:type=Threading"
[[inputs.jolokia2_agent.metric]]
name = "jvm_memory"
mbean = "java.lang:type=Memory"
[[inputs.jolokia2_agent.metric]]
name = "jvm_class_loading"
mbean = "java.lang:type=ClassLoading"
[[inputs.jolokia2_agent.metric]]
name = "jvm_memory_pool"
mbean = "java.lang:type=MemoryPool,name=*"
[[inputs.jolokia2_agent.metric]]
name = "webhooks"
mbean = "com.atlassian.webhooks:name=*"
[[inputs.jolokia2_agent.metric]]
name = "atlassian"
mbean = "com.atlassian.bitbucket:name=*"
[[inputs.jolokia2_agent.metric]]
name = "thread_pools"
mbean = "com.atlassian.bitbucket.thread-pools:name=*"

View File

@ -0,0 +1,95 @@
[[inputs.jolokia2_agent]]
urls = ["http://localhost:8778/jolokia"]
name_prefix = "java_"
[[inputs.jolokia2_agent.metric]]
name = "Memory"
mbean = "java.lang:type=Memory"
[[inputs.jolokia2_agent.metric]]
name = "GarbageCollector"
mbean = "java.lang:name=*,type=GarbageCollector"
tag_keys = ["name"]
field_prefix = "$1_"
[[inputs.jolokia2_agent]]
urls = ["http://localhost:8778/jolokia"]
name_prefix = "cassandra_"
[[inputs.jolokia2_agent.metric]]
name = "Cache"
mbean = "org.apache.cassandra.metrics:name=*,scope=*,type=Cache"
tag_keys = ["name", "scope"]
field_prefix = "$1_"
[[inputs.jolokia2_agent.metric]]
name = "Client"
mbean = "org.apache.cassandra.metrics:name=*,type=Client"
tag_keys = ["name"]
field_prefix = "$1_"
[[inputs.jolokia2_agent.metric]]
name = "ClientRequestMetrics"
mbean = "org.apache.cassandra.metrics:name=*,type=ClientRequestMetrics"
tag_keys = ["name"]
field_prefix = "$1_"
[[inputs.jolokia2_agent.metric]]
name = "ClientRequest"
mbean = "org.apache.cassandra.metrics:name=*,scope=*,type=ClientRequest"
tag_keys = ["name", "scope"]
field_prefix = "$1_"
[[inputs.jolokia2_agent.metric]]
name = "ColumnFamily"
mbean = "org.apache.cassandra.metrics:keyspace=*,name=*,scope=*,type=ColumnFamily"
tag_keys = ["keyspace", "name", "scope"]
field_prefix = "$2_"
[[inputs.jolokia2_agent.metric]]
name = "CommitLog"
mbean = "org.apache.cassandra.metrics:name=*,type=CommitLog"
tag_keys = ["name"]
field_prefix = "$1_"
[[inputs.jolokia2_agent.metric]]
name = "Compaction"
mbean = "org.apache.cassandra.metrics:name=*,type=Compaction"
tag_keys = ["name"]
field_prefix = "$1_"
[[inputs.jolokia2_agent.metric]]
name = "CQL"
mbean = "org.apache.cassandra.metrics:name=*,type=CQL"
tag_keys = ["name"]
field_prefix = "$1_"
[[inputs.jolokia2_agent.metric]]
name = "DroppedMessage"
mbean = "org.apache.cassandra.metrics:name=*,scope=*,type=DroppedMessage"
tag_keys = ["name", "scope"]
field_prefix = "$1_"
[[inputs.jolokia2_agent.metric]]
name = "FileCache"
mbean = "org.apache.cassandra.metrics:name=*,type=FileCache"
tag_keys = ["name"]
field_prefix = "$1_"
[[inputs.jolokia2_agent.metric]]
name = "ReadRepair"
mbean = "org.apache.cassandra.metrics:name=*,type=ReadRepair"
tag_keys = ["name"]
field_prefix = "$1_"
[[inputs.jolokia2_agent.metric]]
name = "Storage"
mbean = "org.apache.cassandra.metrics:name=*,type=Storage"
tag_keys = ["name"]
field_prefix = "$1_"
[[inputs.jolokia2_agent.metric]]
name = "ThreadPools"
mbean = "org.apache.cassandra.metrics:name=*,path=*,scope=*,type=ThreadPools"
tag_keys = ["name", "path", "scope"]
field_prefix = "$1_"

View File

@ -0,0 +1,85 @@
################
# NAMENODE #
################
[[inputs.jolokia2_agent]]
urls = ["http://localhost:8778/jolokia"]
name_prefix = "hadoop.hdfs.namenode."
[[inputs.jolokia2_agent.metric]]
name = "FSNamesystem"
mbean = "Hadoop:name=FSNamesystem,service=NameNode"
paths = ["CapacityTotal", "CapacityRemaining", "CapacityUsedNonDFS", "NumLiveDataNodes", "NumDeadDataNodes", "NumInMaintenanceDeadDataNodes", "NumDecomDeadDataNodes"]
[[inputs.jolokia2_agent.metric]]
name = "FSNamesystemState"
mbean = "Hadoop:name=FSNamesystemState,service=NameNode"
paths = ["VolumeFailuresTotal", "UnderReplicatedBlocks", "BlocksTotal"]
[[inputs.jolokia2_agent.metric]]
name = "OperatingSystem"
mbean = "java.lang:type=OperatingSystem"
paths = ["ProcessCpuLoad", "SystemLoadAverage", "SystemCpuLoad"]
[[inputs.jolokia2_agent.metric]]
name = "jvm_runtime"
mbean = "java.lang:type=Runtime"
paths = ["Uptime"]
[[inputs.jolokia2_agent.metric]]
name = "jvm_memory"
mbean = "java.lang:type=Memory"
paths = ["HeapMemoryUsage", "NonHeapMemoryUsage", "ObjectPendingFinalizationCount"]
[[inputs.jolokia2_agent.metric]]
name = "jvm_garbage_collector"
mbean = "java.lang:name=*,type=GarbageCollector"
paths = ["CollectionTime", "CollectionCount"]
tag_keys = ["name"]
[[inputs.jolokia2_agent.metric]]
name = "jvm_memory_pool"
mbean = "java.lang:name=*,type=MemoryPool"
paths = ["Usage", "PeakUsage", "CollectionUsage"]
tag_keys = ["name"]
tag_prefix = "pool_"
################
# DATANODE #
################
[[inputs.jolokia2_agent]]
urls = ["http://localhost:7778/jolokia"]
name_prefix = "hadoop.hdfs.datanode."
[[inputs.jolokia2_agent.metric]]
name = "FSDatasetState"
mbean = "Hadoop:name=FSDatasetState,service=DataNode"
paths = ["Capacity", "DfsUsed", "Remaining", "NumBlocksFailedToUnCache", "NumBlocksFailedToCache", "NumBlocksCached"]
[[inputs.jolokia2_agent.metric]]
name = "OperatingSystem"
mbean = "java.lang:type=OperatingSystem"
paths = ["ProcessCpuLoad", "SystemLoadAverage", "SystemCpuLoad"]
[[inputs.jolokia2_agent.metric]]
name = "jvm_runtime"
mbean = "java.lang:type=Runtime"
paths = ["Uptime"]
[[inputs.jolokia2_agent.metric]]
name = "jvm_memory"
mbean = "java.lang:type=Memory"
paths = ["HeapMemoryUsage", "NonHeapMemoryUsage", "ObjectPendingFinalizationCount"]
[[inputs.jolokia2_agent.metric]]
name = "jvm_garbage_collector"
mbean = "java.lang:name=*,type=GarbageCollector"
paths = ["CollectionTime", "CollectionCount"]
tag_keys = ["name"]
[[inputs.jolokia2_agent.metric]]
name = "jvm_memory_pool"
mbean = "java.lang:name=*,type=MemoryPool"
paths = ["Usage", "PeakUsage", "CollectionUsage"]
tag_keys = ["name"]
tag_prefix = "pool_"

View File

@ -0,0 +1,40 @@
[[inputs.jolokia2_agent]]
urls = ["http://localhost:8080/jolokia"]
[[inputs.jolokia2_agent.metric]]
name = "java_runtime"
mbean = "java.lang:type=Runtime"
paths = ["Uptime"]
[[inputs.jolokia2_agent.metric]]
name = "java_memory"
mbean = "java.lang:type=Memory"
paths = ["HeapMemoryUsage", "NonHeapMemoryUsage", "ObjectPendingFinalizationCount"]
[[inputs.jolokia2_agent.metric]]
name = "java_garbage_collector"
mbean = "java.lang:name=*,type=GarbageCollector"
paths = ["CollectionTime", "CollectionCount"]
tag_keys = ["name"]
[[inputs.jolokia2_agent.metric]]
name = "java_last_garbage_collection"
mbean = "java.lang:name=G1 Young Generation,type=GarbageCollector"
paths = ["LastGcInfo/duration", "LastGcInfo/GcThreadCount", "LastGcInfo/memoryUsageAfterGc"]
[[inputs.jolokia2_agent.metric]]
name = "java_threading"
mbean = "java.lang:type=Threading"
paths = ["TotalStartedThreadCount", "ThreadCount", "DaemonThreadCount", "PeakThreadCount"]
[[inputs.jolokia2_agent.metric]]
name = "java_class_loading"
mbean = "java.lang:type=ClassLoading"
paths = ["LoadedClassCount", "UnloadedClassCount", "TotalLoadedClassCount"]
[[inputs.jolokia2_agent.metric]]
name = "java_memory_pool"
mbean = "java.lang:name=*,type=MemoryPool"
paths = ["Usage", "PeakUsage", "CollectionUsage"]
tag_keys = ["name"]

View File

@ -0,0 +1,59 @@
[[inputs.jolokia2_agent]]
urls = ["http://localhost:8080/jolokia"]
name_prefix = "jboss."
### JVM Generic
[[inputs.jolokia2_agent.metric]]
name = "OperatingSystem"
mbean = "java.lang:type=OperatingSystem"
paths = ["ProcessCpuLoad","SystemLoadAverage","SystemCpuLoad"]
[[inputs.jolokia2_agent.metric]]
name = "jvm_runtime"
mbean = "java.lang:type=Runtime"
paths = ["Uptime"]
[[inputs.jolokia2_agent.metric]]
name = "jvm_memory"
mbean = "java.lang:type=Memory"
paths = ["HeapMemoryUsage", "NonHeapMemoryUsage", "ObjectPendingFinalizationCount"]
[[inputs.jolokia2_agent.metric]]
name = "jvm_garbage_collector"
mbean = "java.lang:name=*,type=GarbageCollector"
paths = ["CollectionTime", "CollectionCount"]
tag_keys = ["name"]
[[inputs.jolokia2_agent.metric]]
name = "jvm_memory_pool"
mbean = "java.lang:name=*,type=MemoryPool"
paths = ["Usage", "PeakUsage", "CollectionUsage"]
tag_keys = ["name"]
tag_prefix = "pool_"
### JBOSS
[[inputs.jolokia2_agent.metric]]
name = "connectors.http"
mbean = "jboss.as:https-listener=*,server=*,subsystem=undertow"
paths = ["bytesReceived","bytesSent","errorCount","requestCount"]
tag_keys = ["server","https-listener"]
[[inputs.jolokia2_agent.metric]]
name = "connectors.http"
mbean = "jboss.as:http-listener=*,server=*,subsystem=undertow"
paths = ["bytesReceived","bytesSent","errorCount","requestCount"]
tag_keys = ["server","http-listener"]
[[inputs.jolokia2_agent.metric]]
name = "datasource.jdbc"
mbean = "jboss.as:data-source=*,statistics=jdbc,subsystem=datasources"
paths = ["PreparedStatementCacheAccessCount","PreparedStatementCacheHitCount","PreparedStatementCacheMissCount"]
tag_keys = ["data-source"]
[[inputs.jolokia2_agent.metric]]
name = "datasource.pool"
mbean = "jboss.as:data-source=*,statistics=pool,subsystem=datasources"
paths = ["AvailableCount","ActiveCount","MaxUsedCount"]
tag_keys = ["data-source"]

View File

@ -0,0 +1,90 @@
[[inputs.jolokia2_agent]]
urls = ["http://localhost:8080/jolokia"]
name_prefix = "kafka.connect."
[[processors.enum]]
[[processors.enum.mapping]]
field = "status"
[processors.enum.mapping.value_mappings]
paused = 0
running = 1
unassigned = 2
failed = 3
destroyed = 4
[inputs.jolokia2_agent.tags]
input_type = "kafka-connect"
# https://kafka.apache.org/documentation/#connect_monitoring
[[inputs.jolokia2_agent.metric]]
name = "connectWorkerMetrics"
mbean = "kafka.connect:type=connect-worker-metrics"
paths = ["connector-count", "connector-startup-attempts-total", "connector-startup-failure-percentage", "connector-startup-failure-total", "connector-startup-success-percentage", "connector-startup-success-total", "task-count", "task-startup-attempts-total", "task-startup-failure-percentage", "task-startup-failure-total", "task-startup-success-percentage", "task-startup-success-total"]
[[inputs.jolokia2_agent.metric]]
name = "connectWorkerMetrics"
mbean = "kafka.connect:type=connect-worker-metrics,connector=*"
paths = ["connector-destroyed-task-count", "connector-failed-task-count", "connector-paused-task-count", "connector-running-task-count", "connector-total-task-count", "connector-unassigned-task-count"]
tag_keys = ["connector"]
[[inputs.jolokia2_agent.metric]]
name = "connectWorkerRebalanceMetrics"
mbean = "kafka.connect:type=connect-worker-rebalance-metrics"
paths = ["completed-rebalances-total", "connect-protocol", "epoch", "leader-name", "rebalance-avg-time-ms", "rebalance-max-time-ms", "rebalancing", "time-since-last-rebalance-ms"]
[[inputs.jolokia2_agent.metric]]
name = "connectorMetrics"
mbean = "kafka.connect:type=connector-metrics,connector=*"
paths = ["connector-class", "connector-version", "connector-type", "status"]
tag_keys = ["connector"]
[[inputs.jolokia2_agent.metric]]
name = "connectorTaskMetrics"
mbean = "kafka.connect:type=connector-task-metrics,connector=*,task=*"
paths = ["batch-size-avg", "batch-size-max", "offset-commit-avg-time-ms", "offset-commit-failure-percentage", "offset-commit-max-time-ms", "offset-commit-success-percentage", "pause-ratio", "running-ratio", "status"]
tag_keys = ["connector", "task"]
[[inputs.jolokia2_agent.metric]]
name = "sinkTaskMetrics"
mbean = "kafka.connect:type=sink-task-metrics,connector=*,task=*"
paths = ["offset-commit-completion-rate", "offset-commit-completion-total", "offset-commit-seq-no", "offset-commit-skip-rate", "offset-commit-skip-total", "partition-count", "put-batch-avg-time-ms", "put-batch-max-time-ms", "sink-record-active-count", "sink-record-active-count-avg", "sink-record-active-count-max", "sink-record-lag-max", "sink-record-read-rate", "sink-record-read-total", "sink-record-send-rate", "sink-record-send-total"]
tag_keys = ["connector", "task"]
[[inputs.jolokia2_agent.metric]]
name = "sourceTaskMetrics"
mbean = "kafka.connect:type=source-task-metrics,connector=*,task=*"
paths = ["poll-batch-avg-time-ms", "poll-batch-max-time-ms", "source-record-active-count", "source-record-active-count-avg", "source-record-active-count-max", "source-record-poll-rate", "source-record-poll-total", "source-record-write-rate", "source-record-write-total"]
tag_keys = ["connector", "task"]
[[inputs.jolokia2_agent.metric]]
name = "taskErrorMetrics"
mbean = "kafka.connect:type=task-error-metrics,connector=*,task=*"
paths = ["deadletterqueue-produce-failures", "deadletterqueue-produce-requests", "last-error-timestamp", "total-errors-logged", "total-record-errors", "total-record-failures", "total-records-skipped", "total-retries"]
tag_keys = ["connector", "task"]
# https://kafka.apache.org/documentation/#selector_monitoring
[[inputs.jolokia2_agent.metric]]
name = "connectMetrics"
mbean = "kafka.connect:type=connect-metrics,client-id=*"
paths = ["connection-close-rate", "connection-close-total", "connection-creation-rate", "connection-creation-total", "network-io-rate", "network-io-total", "outgoing-byte-rate", "outgoing-byte-total", "request-rate", "request-total", "request-size-avg", "request-size-max", "incoming-byte-rate", "incoming-byte-rate", "incoming-byte-total", "response-rate", "response-total", "select-rate", "select-total", "io-wait-time-ns-avg", "io-wait-ratio", "io-time-ns-avg", "io-ratio", "connection-count", "successful-authentication-rate", "successful-authentication-total", "failed-authentication-rate", "failed-authentication-total", "successful-reauthentication-rate", "successful-reauthentication-total", "reauthentication-latency-max", "reauthentication-latency-avg", "failed-reauthentication-rate", "failed-reauthentication-total", "successful-authentication-no-reauth-total"]
tag_keys = ["client-id"]
# https://kafka.apache.org/documentation/#common_node_monitoring
[[inputs.jolokia2_agent.metric]]
name = "connectNodeMetrics"
mbean = "kafka.connect:type=connect-node-metrics,client-id=*,node-id=*"
paths = ["outgoing-byte-rate", "outgoing-byte-total", "request-rate", "request-total", "request-size-avg", "request-size-max", "incoming-byte-rate", "incoming-byte-total", "request-latency-avg", "request-latency-max", "response-rate", "response-total"]
tag_keys = ["client-id", "node-id"]
[[inputs.jolokia2_agent.metric]]
name = "appInfo"
mbean = "kafka.connect:type=app-info,client-id=*"
paths = ["start-time-ms", "commit-id", "version"]
tag_keys = ["client-id"]
[[inputs.jolokia2_agent.metric]]
name = "connectCoordinatorMetrics"
mbean = "kafka.connect:type=connect-coordinator-metrics,client-id=*"
paths = ["join-time-max", "failed-rebalance-rate-per-hour", "rebalance-latency-total", "sync-time-avg", "join-rate", "sync-rate", "failed-rebalance-total", "rebalance-total", "last-heartbeat-seconds-ago", "heartbeat-rate", "join-time-avg", "sync-total", "rebalance-latency-max", "sync-time-max", "last-rebalance-seconds-ago", "rebalance-rate-per-hour", "assigned-connectors", "heartbeat-total", "assigned-tasks", "heartbeat-response-time-max", "rebalance-latency-avg", "join-total"]
tag_keys = ["client-id"]

View File

@ -0,0 +1,109 @@
[[inputs.jolokia2_agent]]
name_prefix = "kafka_"
## If you intend to use "non_negative_derivative(1s)" with "*.count" fields, you don't need precalculated fields.
# fielddrop = [
# "*.EventType",
# "*.FifteenMinuteRate",
# "*.FiveMinuteRate",
# "*.MeanRate",
# "*.OneMinuteRate",
# "*.RateUnit",
# "*.LatencyUnit",
# "*.50thPercentile",
# "*.75thPercentile",
# "*.95thPercentile",
# "*.98thPercentile",
# "*.99thPercentile",
# "*.999thPercentile",
# "*.Min",
# "*.Mean",
# "*.Max",
# "*.StdDev"
# ]
## jolokia_agent_url tag is not needed if you have only one instance of Kafka on the server.
# tagexclude = ["jolokia_agent_url"]
urls = ["http://localhost:8080/jolokia"]
[[inputs.jolokia2_agent.metric]]
name = "controller"
mbean = "kafka.controller:name=*,type=*"
field_prefix = "$1."
[[inputs.jolokia2_agent.metric]]
name = "replica_manager"
mbean = "kafka.server:name=*,type=ReplicaManager"
field_prefix = "$1."
[[inputs.jolokia2_agent.metric]]
name = "purgatory"
mbean = "kafka.server:delayedOperation=*,name=*,type=DelayedOperationPurgatory"
field_prefix = "$1."
field_name = "$2"
[[inputs.jolokia2_agent.metric]]
name = "zookeeper"
mbean = "kafka.server:name=*,type=SessionExpireListener"
field_prefix = "$1."
[[inputs.jolokia2_agent.metric]]
name = "user"
mbean = "kafka.server:user=*,type=Request"
field_prefix = ""
tag_keys = ["user"]
[[inputs.jolokia2_agent.metric]]
name = "request"
mbean = "kafka.network:name=*,request=*,type=RequestMetrics"
field_prefix = "$1."
tag_keys = ["request"]
[[inputs.jolokia2_agent.metric]]
name = "topics"
mbean = "kafka.server:name=*,type=BrokerTopicMetrics"
field_prefix = "$1."
[[inputs.jolokia2_agent.metric]]
name = "topic"
mbean = "kafka.server:name=*,topic=*,type=BrokerTopicMetrics"
field_prefix = "$1."
tag_keys = ["topic"]
[[inputs.jolokia2_agent.metric]]
name = "partition"
mbean = "kafka.log:name=*,partition=*,topic=*,type=Log"
field_name = "$1"
tag_keys = ["topic", "partition"]
[[inputs.jolokia2_agent.metric]]
name = "partition"
mbean = "kafka.cluster:name=UnderReplicated,partition=*,topic=*,type=Partition"
field_name = "UnderReplicatedPartitions"
tag_keys = ["topic", "partition"]
## If you have multiple instances of Kafka on the server, use 'jolokia_agent_url' as identity of each instance
# [[processors.rename]]
# namepass = ["kafka_*"]
# order = 1
# [[processors.rename.replace]]
# tag = "jolokia_agent_url"
# dest = "instance"
#
# [[processors.regex]]
# namepass = ["kafka_*"]
# order = 2
# [[processors.regex.tags]]
# key = "instance"
# pattern = "^.+:8080/.+$"
# replacement = "0"
# [[processors.regex.tags]]
# key = "instance"
# pattern = "^.+:8081/.+$"
# replacement = "1"
# [[processors.regex.tags]]
# key = "instance"
# pattern = "^.+:8082/.+$"
# replacement = "2"

View File

@ -0,0 +1,65 @@
[[inputs.jolokia2_agent]]
urls = ["http://localhost:8080/jolokia"]
name_prefix = "tomcat."
### JVM Generic
[[inputs.jolokia2_agent.metric]]
name = "OperatingSystem"
mbean = "java.lang:type=OperatingSystem"
paths = ["ProcessCpuLoad","SystemLoadAverage","SystemCpuLoad"]
[[inputs.jolokia2_agent.metric]]
name = "jvm_runtime"
mbean = "java.lang:type=Runtime"
paths = ["Uptime"]
[[inputs.jolokia2_agent.metric]]
name = "jvm_memory"
mbean = "java.lang:type=Memory"
paths = ["HeapMemoryUsage", "NonHeapMemoryUsage", "ObjectPendingFinalizationCount"]
[[inputs.jolokia2_agent.metric]]
name = "jvm_garbage_collector"
mbean = "java.lang:name=*,type=GarbageCollector"
paths = ["CollectionTime", "CollectionCount"]
tag_keys = ["name"]
[[inputs.jolokia2_agent.metric]]
name = "jvm_memory_pool"
mbean = "java.lang:name=*,type=MemoryPool"
paths = ["Usage", "PeakUsage", "CollectionUsage"]
tag_keys = ["name"]
tag_prefix = "pool_"
### TOMCAT
[[inputs.jolokia2_agent.metric]]
name = "GlobalRequestProcessor"
mbean = "Catalina:name=*,type=GlobalRequestProcessor"
paths = ["requestCount","bytesReceived","bytesSent","processingTime","errorCount"]
tag_keys = ["name"]
[[inputs.jolokia2_agent.metric]]
name = "JspMonitor"
mbean = "Catalina:J2EEApplication=*,J2EEServer=*,WebModule=*,name=jsp,type=JspMonitor"
paths = ["jspReloadCount","jspCount","jspUnloadCount"]
tag_keys = ["J2EEApplication","J2EEServer","WebModule"]
[[inputs.jolokia2_agent.metric]]
name = "ThreadPool"
mbean = "Catalina:name=*,type=ThreadPool"
paths = ["maxThreads","currentThreadCount","currentThreadsBusy"]
tag_keys = ["name"]
[[inputs.jolokia2_agent.metric]]
name = "Servlet"
mbean = "Catalina:J2EEApplication=*,J2EEServer=*,WebModule=*,j2eeType=Servlet,name=*"
paths = ["processingTime","errorCount","requestCount"]
tag_keys = ["name","J2EEApplication","J2EEServer","WebModule"]
[[inputs.jolokia2_agent.metric]]
name = "Cache"
mbean = "Catalina:context=*,host=*,name=Cache,type=WebResourceRoot"
paths = ["hitCount","lookupCount"]
tag_keys = ["context","host"]

View File

@ -0,0 +1,56 @@
[[inputs.jolokia2_agent]]
urls = ["http://localhost:8080/jolokia"]
name_prefix = "weblogic."
### JVM Generic
[[inputs.jolokia2_agent.metric]]
name = "OperatingSystem"
mbean = "java.lang:type=OperatingSystem"
paths = ["ProcessCpuLoad","SystemLoadAverage","SystemCpuLoad"]
[[inputs.jolokia2_agent.metric]]
name = "jvm_runtime"
mbean = "java.lang:type=Runtime"
paths = ["Uptime"]
[[inputs.jolokia2_agent.metric]]
name = "jvm_memory"
mbean = "java.lang:type=Memory"
paths = ["HeapMemoryUsage", "NonHeapMemoryUsage", "ObjectPendingFinalizationCount"]
[[inputs.jolokia2_agent.metric]]
name = "jvm_garbage_collector"
mbean = "java.lang:name=*,type=GarbageCollector"
paths = ["CollectionTime", "CollectionCount"]
tag_keys = ["name"]
[[inputs.jolokia2_agent.metric]]
name = "jvm_memory_pool"
mbean = "java.lang:name=*,type=MemoryPool"
paths = ["Usage", "PeakUsage", "CollectionUsage"]
tag_keys = ["name"]
tag_prefix = "pool_"
### WLS
[[inputs.jolokia2_agent.metric]]
name = "JTARuntime"
mbean = "com.bea:Name=JTARuntime,ServerRuntime=*,Type=JTARuntime"
paths = ["SecondsActiveTotalCount","TransactionRolledBackTotalCount","TransactionRolledBackSystemTotalCount","TransactionRolledBackAppTotalCount","TransactionRolledBackResourceTotalCount","TransactionHeuristicsTotalCount","TransactionAbandonedTotalCount","TransactionTotalCount","TransactionRolledBackTimeoutTotalCount","ActiveTransactionsTotalCount","TransactionCommittedTotalCount"]
tag_keys = ["ServerRuntime"]
tag_prefix = "wls_"
[[inputs.jolokia2_agent.metric]]
name = "ThreadPoolRuntime"
mbean = "com.bea:Name=ThreadPoolRuntime,ServerRuntime=*,Type=ThreadPoolRuntime"
paths = ["StuckThreadCount","CompletedRequestCount","ExecuteThreadTotalCount","ExecuteThreadIdleCount","StandbyThreadCount","Throughput","HoggingThreadCount","PendingUserRequestCount"]
tag_keys = ["ServerRuntime"]
tag_prefix = "wls_"
[[inputs.jolokia2_agent.metric]]
name = "JMSRuntime"
mbean = "com.bea:Name=*.jms,ServerRuntime=*,Type=JMSRuntime"
paths = ["ConnectionsCurrentCount","ConnectionsHighCount","ConnectionsTotalCount","JMSServersCurrentCount","JMSServersHighCount","JMSServersTotalCount"]
tag_keys = ["name","ServerRuntime"]
tag_prefix = "wls_"

View File

@ -0,0 +1,18 @@
[[inputs.jolokia2_agent]]
urls = ["http://localhost:8080/jolokia"]
name_prefix = "zk_"
[[inputs.jolokia2_agent.metric]]
name = "quorum"
mbean = "org.apache.ZooKeeperService:name0=*"
tag_keys = ["name0"]
[[inputs.jolokia2_agent.metric]]
name = "leader"
mbean = "org.apache.ZooKeeperService:name0=*,name1=*,name2=Leader"
tag_keys = ["name1"]
[[inputs.jolokia2_agent.metric]]
name = "follower"
mbean = "org.apache.ZooKeeperService:name0=*,name1=*,name2=Follower"
tag_keys = ["name1"]

View File

@ -12,7 +12,6 @@ import (
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/types"
"github.com/toolkits/pkg/container/list"
)
@ -101,5 +100,5 @@ func (c *Conntrack) Gather(slist *list.SafeList) {
log.Println("E! Conntrack input failed to collect metrics. Is the conntrack kernel module loaded?")
}
types.PushSamples(slist, fields)
inputs.PushSamples(slist, fields)
}

View File

@ -9,7 +9,6 @@ import (
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/inputs/system"
"flashcat.cloud/categraf/types"
)
const inputName = "cpu"
@ -96,7 +95,7 @@ func (c *CPUStats) Gather(slist *list.SafeList) {
"usage_active": 100 * (active - lastActive) / totalDelta,
}
types.PushSamples(slist, fields, tags)
inputs.PushSamples(slist, fields, tags)
}
c.lastStats = make(map[string]cpuUtil.TimesStat)

View File

@ -8,7 +8,6 @@ import (
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/inputs/system"
"flashcat.cloud/categraf/pkg/choice"
"flashcat.cloud/categraf/types"
"github.com/toolkits/pkg/container/list"
)
@ -85,7 +84,7 @@ func (s *DiskStats) Gather(slist *list.SafeList) {
"inodes_used": du.InodesUsed,
}
types.PushSamples(slist, fields, tags)
inputs.PushSamples(slist, fields, tags)
}
}

View File

@ -8,7 +8,6 @@ import (
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/inputs/system"
"flashcat.cloud/categraf/pkg/filter"
"flashcat.cloud/categraf/types"
"github.com/toolkits/pkg/container/list"
)
@ -82,6 +81,6 @@ func (d *DiskIO) Gather(slist *list.SafeList) {
"merged_writes": io.MergedWriteCount,
}
types.PushSamples(slist, fields, map[string]string{"name": io.Name})
inputs.PushSamples(slist, fields, map[string]string{"name": io.Name})
}
}

View File

@ -355,7 +355,7 @@ func (ins *Instance) gatherContainerInspect(container types.Container, slist *li
statefields["docker_container_status_uptime"] = uptime.Seconds()
}
itypes.PushSamples(slist, statefields, tags, ins.Labels)
inputs.PushSamples(slist, statefields, tags, ins.Labels)
if info.State.Health != nil {
slist.PushFront(itypes.NewSample("docker_container_health_failing_streak", info.ContainerJSONBase.State.Health.FailingStreak, tags, ins.Labels))
@ -438,7 +438,7 @@ func (ins *Instance) parseContainerStats(stat *types.StatsJSON, slist *list.Safe
memfields["docker_container_mem_private_working_set"] = stat.MemoryStats.PrivateWorkingSet
}
itypes.PushSamples(slist, memfields, tags, ins.Labels)
inputs.PushSamples(slist, memfields, tags, ins.Labels)
// cpu
@ -463,7 +463,7 @@ func (ins *Instance) parseContainerStats(stat *types.StatsJSON, slist *list.Safe
cpufields["docker_container_cpu_usage_percent"] = cpuPercent
}
itypes.PushSamples(slist, cpufields, map[string]string{"cpu": "cpu-total"}, tags, ins.Labels)
inputs.PushSamples(slist, cpufields, map[string]string{"cpu": "cpu-total"}, tags, ins.Labels)
}
if choice.Contains("cpu", ins.PerDeviceInclude) && len(stat.CPUStats.CPUUsage.PercpuUsage) > 0 {
@ -501,7 +501,7 @@ func (ins *Instance) parseContainerStats(stat *types.StatsJSON, slist *list.Safe
}
if choice.Contains("network", ins.PerDeviceInclude) {
itypes.PushSamples(slist, netfields, map[string]string{"network": network}, tags, ins.Labels)
inputs.PushSamples(slist, netfields, map[string]string{"network": network}, tags, ins.Labels)
}
if choice.Contains("network", ins.TotalInclude) {
@ -528,7 +528,7 @@ func (ins *Instance) parseContainerStats(stat *types.StatsJSON, slist *list.Safe
// totalNetworkStatMap could be empty if container is running with --net=host.
if choice.Contains("network", ins.TotalInclude) && len(totalNetworkStatMap) != 0 {
itypes.PushSamples(slist, totalNetworkStatMap, map[string]string{"network": "total"}, tags, ins.Labels)
inputs.PushSamples(slist, totalNetworkStatMap, map[string]string{"network": "total"}, tags, ins.Labels)
}
ins.gatherBlockIOMetrics(slist, stat, tags)
@ -544,7 +544,7 @@ func (ins *Instance) gatherBlockIOMetrics(slist *list.SafeList, stat *types.Stat
totalStatMap := make(map[string]interface{})
for device, fields := range deviceStatMap {
if perDeviceBlkio {
itypes.PushSamples(slist, fields, map[string]string{"device": device}, tags, ins.Labels)
inputs.PushSamples(slist, fields, map[string]string{"device": device}, tags, ins.Labels)
}
if totalBlkio {
for field, value := range fields {
@ -569,7 +569,7 @@ func (ins *Instance) gatherBlockIOMetrics(slist *list.SafeList, stat *types.Stat
}
if totalBlkio {
itypes.PushSamples(slist, totalStatMap, map[string]string{"device": "total"}, tags, ins.Labels)
inputs.PushSamples(slist, totalStatMap, map[string]string{"device": "total"}, tags, ins.Labels)
}
}
@ -700,7 +700,7 @@ func (ins *Instance) gatherSwarmInfo(slist *list.SafeList) {
log.Println("E! Unknown replica mode")
}
itypes.PushSamples(slist, fields, tags, ins.Labels)
inputs.PushSamples(slist, fields, tags, ins.Labels)
}
}
@ -728,7 +728,7 @@ func (ins *Instance) gatherInfo(slist *list.SafeList) error {
"docker_memory_total": info.MemTotal,
}
itypes.PushSamples(slist, fields, ins.Labels)
inputs.PushSamples(slist, fields, ins.Labels)
return nil
}

View File

@ -531,7 +531,7 @@ func (ins *Instance) gatherClusterHealth(url string, address string, slist *list
"cluster_health_unassigned_shards": healthStats.UnassignedShards,
}
types.PushSamples(slist, clusterFields, map[string]string{"cluster_name": healthStats.ClusterName}, addrTag, ins.Labels)
inputs.PushSamples(slist, clusterFields, map[string]string{"cluster_name": healthStats.ClusterName}, addrTag, ins.Labels)
for name, health := range healthStats.Indices {
indexFields := map[string]interface{}{
@ -544,7 +544,7 @@ func (ins *Instance) gatherClusterHealth(url string, address string, slist *list
"cluster_health_indices_status_code": mapHealthStatusToCode(health.Status),
"cluster_health_indices_unassigned_shards": health.UnassignedShards,
}
types.PushSamples(slist, indexFields, map[string]string{"index": name, "name": healthStats.ClusterName}, addrTag, ins.Labels)
inputs.PushSamples(slist, indexFields, map[string]string{"index": name, "name": healthStats.ClusterName}, addrTag, ins.Labels)
}
return nil

271
inputs/jolokia/client.go Normal file
View File

@ -0,0 +1,271 @@
package jolokia
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"path"
"time"
"flashcat.cloud/categraf/pkg/tls"
)
type Client struct {
URL string
client *http.Client
config *ClientConfig
}
type ClientConfig struct {
ResponseTimeout time.Duration
Username string
Password string
ProxyConfig *ProxyConfig
tls.ClientConfig
}
type ProxyConfig struct {
DefaultTargetUsername string
DefaultTargetPassword string
Targets []ProxyTargetConfig
}
type ProxyTargetConfig struct {
Username string
Password string
URL string
}
type ReadRequest struct {
Mbean string
Attributes []string
Path string
}
type ReadResponse struct {
Status int
Value interface{}
RequestMbean string
RequestAttributes []string
RequestPath string
RequestTarget string
}
// Jolokia JSON request object. Example: {
// "type": "read",
// "mbean: "java.lang:type="Runtime",
// "attribute": "Uptime",
// "target": {
// "url: "service:jmx:rmi:///jndi/rmi://target:9010/jmxrmi"
// }
// }
type jolokiaRequest struct {
Type string `json:"type"`
Mbean string `json:"mbean"`
Attribute interface{} `json:"attribute,omitempty"`
Path string `json:"path,omitempty"`
Target *jolokiaTarget `json:"target,omitempty"`
}
type jolokiaTarget struct {
URL string `json:"url"`
User string `json:"user,omitempty"`
Password string `json:"password,omitempty"`
}
// Jolokia JSON response object. Example: {
// "request": {
// "type": "read"
// "mbean": "java.lang:type=Runtime",
// "attribute": "Uptime",
// "target": {
// "url": "service:jmx:rmi:///jndi/rmi://target:9010/jmxrmi"
// }
// },
// "value": 1214083,
// "timestamp": 1488059309,
// "status": 200
// }
type jolokiaResponse struct {
Request jolokiaRequest `json:"request"`
Value interface{} `json:"value"`
Status int `json:"status"`
}
func NewClient(address string, config *ClientConfig) (*Client, error) {
tlsConfig, err := config.ClientConfig.TLSConfig()
if err != nil {
return nil, err
}
transport := &http.Transport{
ResponseHeaderTimeout: config.ResponseTimeout,
TLSClientConfig: tlsConfig,
}
client := &http.Client{
Transport: transport,
Timeout: config.ResponseTimeout,
}
return &Client{
URL: address,
config: config,
client: client,
}, nil
}
func (c *Client) read(requests []ReadRequest) ([]ReadResponse, error) {
jRequests := makeJolokiaRequests(requests, c.config.ProxyConfig)
requestBody, err := json.Marshal(jRequests)
if err != nil {
return nil, err
}
requestURL, err := formatReadURL(c.URL, c.config.Username, c.config.Password)
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", requestURL, bytes.NewBuffer(requestBody))
if err != nil {
//err is not contained in returned error - it may contain sensitive data (password) which should not be logged
return nil, fmt.Errorf("unable to create new request for: '%s'", c.URL)
}
req.Header.Add("Content-type", "application/json")
resp, err := c.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("response from url \"%s\" has status code %d (%s), expected %d (%s)",
c.URL, resp.StatusCode, http.StatusText(resp.StatusCode), http.StatusOK, http.StatusText(http.StatusOK))
}
responseBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var jResponses []jolokiaResponse
if err = json.Unmarshal(responseBody, &jResponses); err != nil {
return nil, fmt.Errorf("decoding JSON response: %s: %s", err, responseBody)
}
return makeReadResponses(jResponses), nil
}
func makeJolokiaRequests(rrequests []ReadRequest, proxyConfig *ProxyConfig) []jolokiaRequest {
jrequests := make([]jolokiaRequest, 0)
if proxyConfig == nil {
for _, rr := range rrequests {
jrequests = append(jrequests, makeJolokiaRequest(rr, nil))
}
} else {
for _, t := range proxyConfig.Targets {
if t.Username == "" {
t.Username = proxyConfig.DefaultTargetUsername
}
if t.Password == "" {
t.Password = proxyConfig.DefaultTargetPassword
}
for _, rr := range rrequests {
jtarget := &jolokiaTarget{
URL: t.URL,
User: t.Username,
Password: t.Password,
}
jrequests = append(jrequests, makeJolokiaRequest(rr, jtarget))
}
}
}
return jrequests
}
func makeJolokiaRequest(rrequest ReadRequest, jtarget *jolokiaTarget) jolokiaRequest {
jrequest := jolokiaRequest{
Type: "read",
Mbean: rrequest.Mbean,
Path: rrequest.Path,
Target: jtarget,
}
if len(rrequest.Attributes) == 1 {
jrequest.Attribute = rrequest.Attributes[0]
}
if len(rrequest.Attributes) > 1 {
jrequest.Attribute = rrequest.Attributes
}
return jrequest
}
func makeReadResponses(jresponses []jolokiaResponse) []ReadResponse {
rresponses := make([]ReadResponse, 0)
for _, jr := range jresponses {
rrequest := ReadRequest{
Mbean: jr.Request.Mbean,
Path: jr.Request.Path,
Attributes: []string{},
}
attrValue := jr.Request.Attribute
if attrValue != nil {
attribute, ok := attrValue.(string)
if ok {
rrequest.Attributes = []string{attribute}
} else {
attributes, _ := attrValue.([]interface{})
rrequest.Attributes = make([]string, len(attributes))
for i, attr := range attributes {
rrequest.Attributes[i] = attr.(string)
}
}
}
rresponse := ReadResponse{
Value: jr.Value,
Status: jr.Status,
RequestMbean: rrequest.Mbean,
RequestAttributes: rrequest.Attributes,
RequestPath: rrequest.Path,
}
if jtarget := jr.Request.Target; jtarget != nil {
rresponse.RequestTarget = jtarget.URL
}
rresponses = append(rresponses, rresponse)
}
return rresponses
}
func formatReadURL(configURL, username, password string) (string, error) {
parsedURL, err := url.Parse(configURL)
if err != nil {
return "", err
}
readURL := url.URL{
Host: parsedURL.Host,
Scheme: parsedURL.Scheme,
}
if username != "" || password != "" {
readURL.User = url.UserPassword(username, password)
}
readURL.Path = path.Join(parsedURL.Path, "read")
readURL.Query().Add("ignoreErrors", "true")
return readURL.String(), nil
}

262
inputs/jolokia/gatherer.go Normal file
View File

@ -0,0 +1,262 @@
package jolokia
import (
"fmt"
"log"
"sort"
"strings"
"github.com/toolkits/pkg/container/list"
)
const defaultFieldName = "value"
type Gatherer struct {
metrics []Metric
requests []ReadRequest
}
func NewGatherer(metrics []Metric) *Gatherer {
return &Gatherer{
metrics: metrics,
requests: makeReadRequests(metrics),
}
}
// Gather adds points to an accumulator from responses returned
// by a Jolokia agent.
func (g *Gatherer) Gather(client *Client, slist *list.SafeList) error {
var tags map[string]string
if client.config.ProxyConfig != nil {
tags = map[string]string{"jolokia_proxy_url": client.URL}
} else {
tags = map[string]string{"jolokia_agent_url": client.URL}
}
requests := makeReadRequests(g.metrics)
responses, err := client.read(requests)
if err != nil {
return err
}
g.gatherResponses(responses, tags, slist)
return nil
}
// gatherResponses adds points to an accumulator from the ReadResponse objects
// returned by a Jolokia agent.
func (g *Gatherer) gatherResponses(responses []ReadResponse, tags map[string]string, slist *list.SafeList) {
series := make(map[string][]point)
for _, metric := range g.metrics {
points, ok := series[metric.Name]
if !ok {
points = make([]point, 0)
}
responsePoints, responseErrors := g.generatePoints(metric, responses)
points = append(points, responsePoints...)
for _, err := range responseErrors {
log.Println("E!", err)
}
series[metric.Name] = points
}
// for measurement, points := range series {
// for _, point := range compactPoints(points) {
// // acc.AddFields(measurement,
// // point.Fields, mergeTags(point.Tags, tags))
// }
// }
}
// generatePoints creates points for the supplied metric from the ReadResponse
// objects returned by the Jolokia client.
func (g *Gatherer) generatePoints(metric Metric, responses []ReadResponse) ([]point, []error) {
points := make([]point, 0)
errors := make([]error, 0)
for _, response := range responses {
switch response.Status {
case 200:
// Correct response status - do nothing.
case 404:
continue
default:
errors = append(errors, fmt.Errorf("unexpected status in response from target %s (%q): %d",
response.RequestTarget, response.RequestMbean, response.Status))
continue
}
if !metricMatchesResponse(metric, response) {
continue
}
pb := newPointBuilder(metric, response.RequestAttributes, response.RequestPath)
for _, point := range pb.Build(metric.Mbean, response.Value) {
if response.RequestTarget != "" {
point.Tags["jolokia_agent_url"] = response.RequestTarget
}
points = append(points, point)
}
}
return points, errors
}
// mergeTags combines two tag sets into a single tag set.
func mergeTags(metricTags, outerTags map[string]string) map[string]string {
tags := make(map[string]string)
for k, v := range outerTags {
tags[k] = v
}
for k, v := range metricTags {
tags[k] = v
}
return tags
}
// metricMatchesResponse returns true when the name, attributes, and path
// of a Metric match the corresponding elements in a ReadResponse object
// returned by a Jolokia agent.
func metricMatchesResponse(metric Metric, response ReadResponse) bool {
if !metric.MatchObjectName(response.RequestMbean) {
return false
}
if len(metric.Paths) == 0 {
return len(response.RequestAttributes) == 0
}
for _, attribute := range response.RequestAttributes {
if metric.MatchAttributeAndPath(attribute, response.RequestPath) {
return true
}
}
return false
}
// compactPoints attempts to remove points by compacting points
// with matching tag sets. When a match is found, the fields from
// one point are moved to another, and the empty point is removed.
func compactPoints(points []point) []point {
compactedPoints := make([]point, 0)
for _, sourcePoint := range points {
keepPoint := true
for _, compactPoint := range compactedPoints {
if !tagSetsMatch(sourcePoint.Tags, compactPoint.Tags) {
continue
}
keepPoint = false
for key, val := range sourcePoint.Fields {
compactPoint.Fields[key] = val
}
}
if keepPoint {
compactedPoints = append(compactedPoints, sourcePoint)
}
}
return compactedPoints
}
// tagSetsMatch returns true if two maps are equivalent.
func tagSetsMatch(a, b map[string]string) bool {
if len(a) != len(b) {
return false
}
for ak, av := range a {
bv, ok := b[ak]
if !ok {
return false
}
if av != bv {
return false
}
}
return true
}
// makeReadRequests creates ReadRequest objects from metrics definitions.
func makeReadRequests(metrics []Metric) []ReadRequest {
var requests []ReadRequest
for _, metric := range metrics {
if len(metric.Paths) == 0 {
requests = append(requests, ReadRequest{
Mbean: metric.Mbean,
Attributes: []string{},
})
} else {
attributes := make(map[string][]string)
for _, path := range metric.Paths {
segments := strings.Split(path, "/")
attribute := segments[0]
if _, ok := attributes[attribute]; !ok {
attributes[attribute] = make([]string, 0)
}
if len(segments) > 1 {
paths := attributes[attribute]
attributes[attribute] = append(paths, strings.Join(segments[1:], "/"))
}
}
rootAttributes := findRequestAttributesWithoutPaths(attributes)
if len(rootAttributes) > 0 {
requests = append(requests, ReadRequest{
Mbean: metric.Mbean,
Attributes: rootAttributes,
})
}
for _, deepAttribute := range findRequestAttributesWithPaths(attributes) {
for _, path := range attributes[deepAttribute] {
requests = append(requests, ReadRequest{
Mbean: metric.Mbean,
Attributes: []string{deepAttribute},
Path: path,
})
}
}
}
}
return requests
}
func findRequestAttributesWithoutPaths(attributes map[string][]string) []string {
results := make([]string, 0)
for attr, paths := range attributes {
if len(paths) == 0 {
results = append(results, attr)
}
}
sort.Strings(results)
return results
}
func findRequestAttributesWithPaths(attributes map[string][]string) []string {
results := make([]string, 0)
for attr, paths := range attributes {
if len(paths) != 0 {
results = append(results, attr)
}
}
sort.Strings(results)
return results
}

128
inputs/jolokia/metric.go Normal file
View File

@ -0,0 +1,128 @@
package jolokia
import "strings"
// A MetricConfig represents a TOML form of
// a Metric with some optional fields.
type MetricConfig struct {
Name string
Mbean string
Paths []string
FieldName *string
FieldPrefix *string
FieldSeparator *string
TagPrefix *string
TagKeys []string
}
// A Metric represents a specification for a
// Jolokia read request, and the transformations
// to apply to points generated from the responses.
type Metric struct {
Name string
Mbean string
Paths []string
FieldName string
FieldPrefix string
FieldSeparator string
TagPrefix string
TagKeys []string
mbeanDomain string
mbeanProperties []string
}
func NewMetric(config MetricConfig, defaultFieldPrefix, defaultFieldSeparator, defaultTagPrefix string) Metric {
metric := Metric{
Name: config.Name,
Mbean: config.Mbean,
Paths: config.Paths,
TagKeys: config.TagKeys,
}
if config.FieldName != nil {
metric.FieldName = *config.FieldName
}
if config.FieldPrefix == nil {
metric.FieldPrefix = defaultFieldPrefix
} else {
metric.FieldPrefix = *config.FieldPrefix
}
if config.FieldSeparator == nil {
metric.FieldSeparator = defaultFieldSeparator
} else {
metric.FieldSeparator = *config.FieldSeparator
}
if config.TagPrefix == nil {
metric.TagPrefix = defaultTagPrefix
} else {
metric.TagPrefix = *config.TagPrefix
}
mbeanDomain, mbeanProperties := parseMbeanObjectName(config.Mbean)
metric.mbeanDomain = mbeanDomain
metric.mbeanProperties = mbeanProperties
return metric
}
func (m Metric) MatchObjectName(name string) bool {
if name == m.Mbean {
return true
}
mbeanDomain, mbeanProperties := parseMbeanObjectName(name)
if mbeanDomain != m.mbeanDomain {
return false
}
if len(mbeanProperties) != len(m.mbeanProperties) {
return false
}
NEXT_PROPERTY:
for _, mbeanProperty := range m.mbeanProperties {
for i := range mbeanProperties {
if mbeanProperties[i] == mbeanProperty {
continue NEXT_PROPERTY
}
}
return false
}
return true
}
func (m Metric) MatchAttributeAndPath(attribute, innerPath string) bool {
path := attribute
if innerPath != "" {
path = path + "/" + innerPath
}
for i := range m.Paths {
if path == m.Paths[i] {
return true
}
}
return false
}
func parseMbeanObjectName(name string) (string, []string) {
index := strings.Index(name, ":")
if index == -1 {
return name, []string{}
}
domain := name[:index]
if index+1 > len(name) {
return domain, []string{}
}
return domain, strings.Split(name[index+1:], ",")
}

View File

@ -0,0 +1,274 @@
package jolokia
import (
"fmt"
"strings"
)
type point struct {
Tags map[string]string
Fields map[string]interface{}
}
type pointBuilder struct {
metric Metric
objectAttributes []string
objectPath string
substitutions []string
}
func newPointBuilder(metric Metric, attributes []string, path string) *pointBuilder {
return &pointBuilder{
metric: metric,
objectAttributes: attributes,
objectPath: path,
substitutions: makeSubstitutionList(metric.Mbean),
}
}
// Build generates a point for a given mbean name/pattern and value object.
func (pb *pointBuilder) Build(mbean string, value interface{}) []point {
hasPattern := strings.Contains(mbean, "*")
if !hasPattern {
value = map[string]interface{}{mbean: value}
}
valueMap, ok := value.(map[string]interface{})
if !ok { // FIXME: log it and move on.
panic(fmt.Sprintf("There should be a map here for %s!\n", mbean))
}
points := make([]point, 0)
for mbean, value := range valueMap {
points = append(points, point{
Tags: pb.extractTags(mbean),
Fields: pb.extractFields(mbean, value),
})
}
return compactPoints(points)
}
// extractTags generates the map of tags for a given mbean name/pattern.
func (pb *pointBuilder) extractTags(mbean string) map[string]string {
propertyMap := makePropertyMap(mbean)
tagMap := make(map[string]string)
for key, value := range propertyMap {
if pb.includeTag(key) {
tagName := pb.formatTagName(key)
tagMap[tagName] = value
}
}
return tagMap
}
func (pb *pointBuilder) includeTag(tagName string) bool {
for _, t := range pb.metric.TagKeys {
if tagName == t {
return true
}
}
return false
}
func (pb *pointBuilder) formatTagName(tagName string) string {
if tagName == "" {
return ""
}
if tagPrefix := pb.metric.TagPrefix; tagPrefix != "" {
return tagPrefix + tagName
}
return tagName
}
// extractFields generates the map of fields for a given mbean name
// and value object.
func (pb *pointBuilder) extractFields(mbean string, value interface{}) map[string]interface{} {
fieldMap := make(map[string]interface{})
valueMap, ok := value.(map[string]interface{})
if ok {
// complex value
if len(pb.objectAttributes) == 0 {
// if there were no attributes requested,
// then the keys are attributes
pb.fillFields("", valueMap, fieldMap)
} else if len(pb.objectAttributes) == 1 {
// if there was a single attribute requested,
// then the keys are the attribute's properties
fieldName := pb.formatFieldName(pb.objectAttributes[0], pb.objectPath)
pb.fillFields(fieldName, valueMap, fieldMap)
} else {
// if there were multiple attributes requested,
// then the keys are the attribute names
for _, attribute := range pb.objectAttributes {
fieldName := pb.formatFieldName(attribute, pb.objectPath)
pb.fillFields(fieldName, valueMap[attribute], fieldMap)
}
}
} else {
// scalar value
var fieldName string
if len(pb.objectAttributes) == 0 {
fieldName = pb.formatFieldName(defaultFieldName, pb.objectPath)
} else {
fieldName = pb.formatFieldName(pb.objectAttributes[0], pb.objectPath)
}
pb.fillFields(fieldName, value, fieldMap)
}
if len(pb.substitutions) > 1 {
pb.applySubstitutions(mbean, fieldMap)
}
return fieldMap
}
// formatFieldName generates a field name from the supplied attribute and
// path. The return value has the configured FieldPrefix and FieldSuffix
// instructions applied.
func (pb *pointBuilder) formatFieldName(attribute, path string) string {
fieldName := attribute
fieldPrefix := pb.metric.FieldPrefix
fieldSeparator := pb.metric.FieldSeparator
if fieldPrefix != "" {
fieldName = fieldPrefix + fieldName
}
if path != "" {
fieldName = fieldName + fieldSeparator + strings.Replace(path, "/", fieldSeparator, -1)
}
return fieldName
}
// fillFields recurses into the supplied value object, generating a named field
// for every value it discovers.
func (pb *pointBuilder) fillFields(name string, value interface{}, fieldMap map[string]interface{}) {
if valueMap, ok := value.(map[string]interface{}); ok {
// keep going until we get to something that is not a map
for key, innerValue := range valueMap {
if _, ok := innerValue.([]interface{}); ok {
continue
}
var innerName string
if name == "" {
innerName = pb.metric.FieldPrefix + key
} else {
innerName = name + pb.metric.FieldSeparator + key
}
pb.fillFields(innerName, innerValue, fieldMap)
}
return
}
if _, ok := value.([]interface{}); ok {
return
}
if pb.metric.FieldName != "" {
name = pb.metric.FieldName
if prefix := pb.metric.FieldPrefix; prefix != "" {
name = prefix + name
}
}
if name == "" {
name = defaultFieldName
}
fieldMap[name] = value
}
// applySubstitutions updates all the keys in the supplied map
// of fields to account for $1-style substitution instructions.
func (pb *pointBuilder) applySubstitutions(mbean string, fieldMap map[string]interface{}) {
properties := makePropertyMap(mbean)
for i, subKey := range pb.substitutions[1:] {
symbol := fmt.Sprintf("$%d", i+1)
substitution := properties[subKey]
for fieldName, fieldValue := range fieldMap {
newFieldName := strings.Replace(fieldName, symbol, substitution, -1)
if fieldName != newFieldName {
fieldMap[newFieldName] = fieldValue
delete(fieldMap, fieldName)
}
}
}
}
// makePropertyMap returns a the mbean property-key list as
// a dictionary. foo:x=y becomes map[string]string { "x": "y" }
func makePropertyMap(mbean string) map[string]string {
props := make(map[string]string)
object := strings.SplitN(mbean, ":", 2)
domain := object[0]
if domain != "" && len(object) == 2 {
list := object[1]
for _, keyProperty := range strings.Split(list, ",") {
pair := strings.SplitN(keyProperty, "=", 2)
if len(pair) != 2 {
continue
}
if key := pair[0]; key != "" {
props[key] = pair[1]
}
}
}
return props
}
// makeSubstitutionList returns an array of values to
// use as substitutions when renaming fields
// with the $1..$N syntax. The first item in the list
// is always the mbean domain.
func makeSubstitutionList(mbean string) []string {
subs := make([]string, 0)
object := strings.SplitN(mbean, ":", 2)
domain := object[0]
if domain != "" && len(object) == 2 {
subs = append(subs, domain)
list := object[1]
for _, keyProperty := range strings.Split(list, ",") {
pair := strings.SplitN(keyProperty, "=", 2)
if len(pair) != 2 {
continue
}
key := pair[0]
if key == "" {
continue
}
property := pair[1]
if !strings.Contains(property, "*") {
continue
}
subs = append(subs, key)
}
}
return subs
}

View File

@ -0,0 +1,105 @@
package jolokia_agent
import (
"errors"
"sync"
"sync/atomic"
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/inputs/jolokia"
"flashcat.cloud/categraf/pkg/tls"
"flashcat.cloud/categraf/types"
"github.com/toolkits/pkg/container/list"
)
const inputName = "jolokia_agent"
type JolokiaAgent struct {
config.Interval
counter uint64
waitgrp sync.WaitGroup
Instances []*Instance `toml:"instances"`
}
func init() {
inputs.Add(inputName, func() inputs.Input {
return &JolokiaAgent{}
})
}
func (r *JolokiaAgent) Prefix() string {
return ""
}
func (r *JolokiaAgent) Init() error {
if len(r.Instances) == 0 {
return types.ErrInstancesEmpty
}
for i := 0; i < len(r.Instances); i++ {
if err := r.Instances[i].Init(); err != nil {
if !errors.Is(err, types.ErrInstancesEmpty) {
return err
}
}
}
return nil
}
func (r *JolokiaAgent) Drop() {}
func (r *JolokiaAgent) Gather(slist *list.SafeList) {
atomic.AddUint64(&r.counter, 1)
for i := range r.Instances {
ins := r.Instances[i]
if len(ins.URLs) == 0 {
continue
}
r.waitgrp.Add(1)
go func(slist *list.SafeList, ins *Instance) {
defer r.waitgrp.Done()
if ins.IntervalTimes > 0 {
counter := atomic.LoadUint64(&r.counter)
if counter%uint64(ins.IntervalTimes) != 0 {
return
}
}
ins.gatherOnce(slist)
}(slist, ins)
}
r.waitgrp.Wait()
}
type Instance struct {
Labels map[string]string `toml:"labels"`
IntervalTimes int64 `toml:"interval_times"`
URLs []string `toml:"urls"`
Username string `toml:"username"`
Password string `toml:"password"`
ResponseTimeout config.Duration `toml:"response_timeout"`
Metrics []jolokia.MetricConfig `toml:"metric"`
tls.ClientConfig
clients []*jolokia.Client
}
func (ins *Instance) Init() error {
if len(ins.URLs) == 0 {
return nil
}
return nil
}
func (ins *Instance) gatherOnce(slist *list.SafeList) {
}

View File

@ -13,7 +13,6 @@ import (
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/types"
"github.com/toolkits/pkg/container/list"
)
@ -118,7 +117,7 @@ func (s *KernelStats) Gather(slist *list.SafeList) {
}
}
types.PushSamples(slist, fields)
inputs.PushSamples(slist, fields)
}
func (s *KernelStats) getProcStat() ([]byte, error) {

View File

@ -12,7 +12,6 @@ import (
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/types"
"github.com/toolkits/pkg/container/list"
)
@ -74,7 +73,7 @@ func (s *KernelVmstat) Gather(slist *list.SafeList) {
}
}
types.PushSamples(slist, fields)
inputs.PushSamples(slist, fields)
}
func (s *KernelVmstat) getProcVmstat() ([]byte, error) {

View File

@ -210,7 +210,7 @@ func (ins *Instance) buildPodMetrics(summaryMetrics *SummaryMetrics, podInfo []M
fields["pod_container_logsfs_available_bytes"] = container.LogsFS.AvailableBytes
fields["pod_container_logsfs_capacity_bytes"] = container.LogsFS.CapacityBytes
fields["pod_container_logsfs_used_bytes"] = container.LogsFS.UsedBytes
types.PushSamples(slist, fields, tags, ins.Labels)
inputs.PushSamples(slist, fields, tags, ins.Labels)
}
}
@ -229,7 +229,7 @@ func (ins *Instance) buildPodMetrics(summaryMetrics *SummaryMetrics, podInfo []M
fields["pod_volume_available_bytes"] = volume.AvailableBytes
fields["pod_volume_capacity_bytes"] = volume.CapacityBytes
fields["pod_volume_used_bytes"] = volume.UsedBytes
types.PushSamples(slist, fields, tags, ins.Labels)
inputs.PushSamples(slist, fields, tags, ins.Labels)
}
}
@ -247,7 +247,7 @@ func (ins *Instance) buildPodMetrics(summaryMetrics *SummaryMetrics, podInfo []M
fields["pod_network_rx_errors"] = pod.Network.RXErrors
fields["pod_network_tx_bytes"] = pod.Network.TXBytes
fields["pod_network_tx_errors"] = pod.Network.TXErrors
types.PushSamples(slist, fields, tags, ins.Labels)
inputs.PushSamples(slist, fields, tags, ins.Labels)
}
}
}
@ -272,7 +272,7 @@ func (ins *Instance) buildSystemContainerMetrics(summaryMetrics *SummaryMetrics,
fields["system_container_logsfs_available_bytes"] = container.LogsFS.AvailableBytes
fields["system_container_logsfs_capacity_bytes"] = container.LogsFS.CapacityBytes
types.PushSamples(slist, fields, tags, ins.Labels)
inputs.PushSamples(slist, fields, tags, ins.Labels)
}
}
@ -300,7 +300,7 @@ func (ins *Instance) buildNodeMetrics(summaryMetrics *SummaryMetrics, slist *lis
fields["node_runtime_image_fs_capacity_bytes"] = summaryMetrics.Node.Runtime.ImageFileSystem.CapacityBytes
fields["node_runtime_image_fs_used_bytes"] = summaryMetrics.Node.Runtime.ImageFileSystem.UsedBytes
types.PushSamples(slist, fields, tags, ins.Labels)
inputs.PushSamples(slist, fields, tags, ins.Labels)
}
func (ins *Instance) gatherPodInfo(baseURL string) ([]Metadata, error) {

View File

@ -14,7 +14,6 @@ import (
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/pkg/osx"
"flashcat.cloud/categraf/types"
"github.com/toolkits/pkg/container/list"
)
@ -68,7 +67,7 @@ func (s *SysctlFS) Gather(slist *list.SafeList) {
log.Println("E! failed to gather file-nr:", err)
}
types.PushSamples(slist, fields)
inputs.PushSamples(slist, fields)
}
func (s *SysctlFS) gatherOne(name string, fields map[string]interface{}) error {

View File

@ -7,7 +7,6 @@ import (
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/inputs/system"
"flashcat.cloud/categraf/types"
"github.com/toolkits/pkg/container/list"
)
@ -110,5 +109,5 @@ func (s *MemStats) Gather(slist *list.SafeList) {
}
}
types.PushSamples(slist, fields)
inputs.PushSamples(slist, fields)
}

View File

@ -9,7 +9,6 @@ import (
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/inputs/system"
"flashcat.cloud/categraf/pkg/filter"
"flashcat.cloud/categraf/types"
"github.com/toolkits/pkg/container/list"
)
@ -114,6 +113,6 @@ func (s *NetIOStats) Gather(slist *list.SafeList) {
"drop_out": io.Dropout,
}
types.PushSamples(slist, fields, tags)
inputs.PushSamples(slist, fields, tags)
}
}

View File

@ -7,7 +7,6 @@ import (
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/inputs/system"
"flashcat.cloud/categraf/types"
"github.com/toolkits/pkg/container/list"
)
@ -77,5 +76,5 @@ func (s *NetStats) Gather(slist *list.SafeList) {
"udp_socket": counts["UDP"],
}
types.PushSamples(slist, fields, tags)
inputs.PushSamples(slist, fields, tags)
}

View File

@ -251,7 +251,7 @@ func (ins *Instance) gather(slist *list.SafeList, target string) {
"fall": server.Fall,
}
types.PushSamples(slist, fields, tags, labels)
inputs.PushSamples(slist, fields, tags, labels)
}
}

View File

@ -17,7 +17,6 @@ import (
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/pkg/osx"
"flashcat.cloud/categraf/types"
"github.com/toolkits/pkg/container/list"
)
@ -71,7 +70,7 @@ func (p *Processes) Gather(slist *list.SafeList) {
}
}
types.PushSamples(slist, fields)
inputs.PushSamples(slist, fields)
}
// Gets empty fields of metrics based on the OS

12
inputs/pusher.go Normal file
View File

@ -0,0 +1,12 @@
package inputs
import (
"flashcat.cloud/categraf/types"
"github.com/toolkits/pkg/container/list"
)
func PushSamples(slist *list.SafeList, fields map[string]interface{}, labels ...map[string]string) {
for metric, value := range fields {
slist.PushFront(types.NewSample(metric, value, labels...))
}
}

View File

@ -512,7 +512,7 @@ func gatherOverview(ins *Instance, slist *list.SafeList) {
"overview_return_unroutable_rate": overview.MessageStats.ReturnUnroutableDetails.Rate,
}
types.PushSamples(slist, fields, tags)
inputs.PushSamples(slist, fields, tags)
}
func gatherExchanges(ins *Instance, slist *list.SafeList) {
@ -549,7 +549,7 @@ func gatherExchanges(ins *Instance, slist *list.SafeList) {
"exchange_messages_publish_out_rate": exchange.MessageStats.PublishOutDetails.Rate,
}
types.PushSamples(slist, fields, tags)
inputs.PushSamples(slist, fields, tags)
}
}
@ -607,7 +607,7 @@ func gatherFederationLinks(ins *Instance, slist *list.SafeList) {
"federation_messages_return_unroutable": link.LocalChannel.MessageStats.ReturnUnroutable,
}
types.PushSamples(slist, fields, tags, ins.Labels)
inputs.PushSamples(slist, fields, tags, ins.Labels)
}
}
@ -737,7 +737,7 @@ func gatherNodes(ins *Instance, slist *list.SafeList) {
}
}
types.PushSamples(slist, fields, tags, ins.Labels)
inputs.PushSamples(slist, fields, tags, ins.Labels)
}(node)
}
@ -821,6 +821,6 @@ func gatherQueues(ins *Instance, slist *list.SafeList) {
"queue_messages_redeliver_rate": queue.MessageStats.RedeliverDetails.Rate,
}
types.PushSamples(slist, fields, tags, ins.Labels)
inputs.PushSamples(slist, fields, tags, ins.Labels)
}
}

View File

@ -7,7 +7,6 @@ import (
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/types"
"github.com/shirou/gopsutil/v3/cpu"
"github.com/shirou/gopsutil/v3/host"
"github.com/shirou/gopsutil/v3/load"
@ -78,5 +77,5 @@ func (s *SystemStats) Gather(slist *list.SafeList) {
}
}
types.PushSamples(slist, fields)
inputs.PushSamples(slist, fields)
}

View File

@ -244,7 +244,7 @@ func (t *Tomcat) gatherOnce(slist *list.SafeList, ins *Instance) {
"jvm_memorypool_used": mp.UsageUsed,
}
types.PushSamples(slist, tcmpFields, tags, tcmpTags)
inputs.PushSamples(slist, tcmpFields, tags, tcmpTags)
}
// add tomcat_connector measurements
@ -270,6 +270,6 @@ func (t *Tomcat) gatherOnce(slist *list.SafeList, ins *Instance) {
"connector_bytes_sent": c.RequestInfo.BytesSent,
}
types.PushSamples(slist, tccFields, tags, tccTags)
inputs.PushSamples(slist, tccFields, tags, tccTags)
}
}

View File

@ -4,7 +4,6 @@ import (
"time"
"flashcat.cloud/categraf/pkg/conv"
"github.com/toolkits/pkg/container/list"
)
type Sample struct {
@ -37,24 +36,3 @@ func NewSample(metric string, value interface{}, labels ...map[string]string) *S
return s
}
func NewSamples(fields map[string]interface{}, labels ...map[string]string) []*Sample {
count := len(fields)
samples := make([]*Sample, 0, count)
for metric, value := range fields {
floatValue, err := conv.ToFloat64(value)
if err != nil {
continue
}
samples = append(samples, NewSample(metric, floatValue, labels...))
}
return samples
}
func PushSamples(slist *list.SafeList, fields map[string]interface{}, labels ...map[string]string) {
for metric, value := range fields {
slist.PushFront(NewSample(metric, value, labels...))
}
}