新增postgresql监控 (#671)

* add postgresql & remove http_response status_code tag

* add postgresql & remove http_response status_code tag

Co-authored-by: leiyupeng <susu898287771@>
This commit is contained in:
peng19940915 2021-04-27 23:16:07 +08:00 committed by GitHub
parent f40332f197
commit 1112186d1c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 1536 additions and 32 deletions

20
go.mod
View File

@ -3,37 +3,39 @@ module github.com/didi/nightingale/v4
go 1.12
require (
github.com/Shopify/sarama v1.27.2
github.com/Shopify/sarama v1.27.2 // indirect
github.com/alouca/gologger v0.0.0-20120904114645-7d4b7291de9c // indirect
github.com/blang/semver v3.5.1+incompatible
github.com/cespare/xxhash v1.1.0
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/coreos/go-oidc v2.2.1+incompatible // indirect
github.com/freedomkk-qfeng/go-fastping v0.0.0-20160109021039-d7bb493dee3e // indirect
github.com/gaochao1/gosnmp v0.0.0-20150630013918-783a67a067fd // indirect
github.com/gaochao1/sw v4.0.0+incompatible
github.com/garyburd/redigo v1.6.2
github.com/gin-contrib/pprof v1.3.0
github.com/gin-contrib/static v0.0.1
github.com/gin-contrib/static v0.0.1 // indirect
github.com/gin-gonic/gin v1.6.3
github.com/go-ping/ping v0.0.0-20201115131931-3300c582a663
github.com/go-ping/ping v0.0.0-20201115131931-3300c582a663 // indirect
github.com/go-sql-driver/mysql v1.5.0
github.com/google/uuid v1.1.2
github.com/hashicorp/golang-lru v0.5.4
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hpcloud/tail v1.0.0
github.com/influxdata/influxdb v1.8.0
github.com/influxdata/telegraf v1.17.2
github.com/m3db/m3 v0.15.17
github.com/jackc/pgx v3.6.0+incompatible
github.com/m3db/m3 v0.15.17 // indirect
github.com/mattn/go-isatty v0.0.12
github.com/mattn/go-sqlite3 v1.14.0 // indirect
github.com/mojocn/base64Captcha v1.3.1
github.com/mojocn/base64Captcha v1.3.1 // indirect
github.com/pquerna/cachecontrol v0.0.0-20200819021114-67c6ae64274f // indirect
github.com/robfig/go-cache v0.0.0-20130306151617-9fc39e0dbf62 // indirect
github.com/shirou/gopsutil v3.20.11+incompatible // indirect
github.com/spaolacci/murmur3 v1.1.0
github.com/sparrc/go-ping v0.0.0-20190613174326-4e5b6552494c
github.com/spf13/viper v1.7.1
github.com/streadway/amqp v1.0.0
github.com/streadway/amqp v1.0.0 // indirect
github.com/stretchr/testify v1.6.1
github.com/toolkits/file v0.0.0-20160325033739-a5b3c5147e07 // indirect
github.com/toolkits/file v0.0.0-20160325033739-a5b3c5147e07
github.com/toolkits/pkg v1.1.3
github.com/toolkits/sys v0.0.0-20170615103026-1f33b217ffaf // indirect
github.com/ugorji/go/codec v1.1.7

2
go.sum
View File

@ -151,6 +151,7 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bitly/go-hostpool v0.1.0/go.mod h1:4gOCgp6+NZnVqlKyZ/iBZFTAJKembaVENUpMkpg42fw=
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
github.com/blang/semver v3.5.1+incompatible h1:cQNTCjp13qL8KC3Nbxr/y2Bqb63oX6wdnnjpJbkM4JQ=
github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
github.com/bmatcuk/doublestar v1.3.1/go.mod h1:wiQtGV+rzVYxB7WIlirSN++5HPtPlXEo9MEoZQC/PmE=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
@ -564,6 +565,7 @@ github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368/go.mod h1:
github.com/influxdata/wlog v0.0.0-20160411224016-7c63b0a71ef8 h1:W2IgzRCb0L9VzMujq/QuTaZUKcH8096jWwP519mHN6Q=
github.com/influxdata/wlog v0.0.0-20160411224016-7c63b0a71ef8/go.mod h1:/2NMgWB1DHM1ti/gqhOlg+LJeBVk6FqR5aVGYY0hlwI=
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733/go.mod h1:WrMFNQdiFJ80sQsxDoMokWK1W5TQtxBFNpzWTD84ibQ=
github.com/jackc/pgx v3.6.0+incompatible h1:bJeo4JdVbDAW8KB2m8XkFeo8CPipREoG37BwEoKGz+Q=
github.com/jackc/pgx v3.6.0+incompatible/go.mod h1:0ZGrqGqkRlliWnWB4zKnWtjbSWbGkVEFm4TeybAXq+I=
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo=
github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8=

View File

@ -21,9 +21,9 @@ import (
"github.com/didi/nightingale/v4/src/modules/server/collector"
_ "github.com/didi/nightingale/v4/src/modules/server/plugins/all"
_ "github.com/go-sql-driver/mysql"
"github.com/gin-gonic/gin"
_ "github.com/go-sql-driver/mysql"
_ "github.com/lib/pq"
"github.com/toolkits/pkg/file"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/runner"

View File

@ -14,12 +14,12 @@ import (
_ "github.com/didi/nightingale/v4/src/modules/server/plugins/net_response"
_ "github.com/didi/nightingale/v4/src/modules/server/plugins/nginx"
_ "github.com/didi/nightingale/v4/src/modules/server/plugins/ping"
_ "github.com/didi/nightingale/v4/src/modules/server/plugins/postgresql"
_ "github.com/didi/nightingale/v4/src/modules/server/plugins/prometheus"
_ "github.com/didi/nightingale/v4/src/modules/server/plugins/rabbitmq"
_ "github.com/didi/nightingale/v4/src/modules/server/plugins/redis"
_ "github.com/didi/nightingale/v4/src/modules/server/plugins/tengine"
_ "github.com/didi/nightingale/v4/src/modules/server/plugins/zookeeper"
// local
_ "github.com/didi/nightingale/v4/src/modules/server/plugins/log"
_ "github.com/didi/nightingale/v4/src/modules/server/plugins/plugin"

View File

@ -9,8 +9,8 @@ import (
"github.com/didi/nightingale/v4/src/common/i18n"
"github.com/didi/nightingale/v4/src/modules/server/collector"
"github.com/didi/nightingale/v4/src/modules/server/plugins"
"github.com/didi/nightingale/v4/src/modules/server/plugins/http_response/http_response"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs/http_response"
)
func init() {
@ -33,22 +33,22 @@ func NewCollector() *Collector {
var (
langDict = map[string]map[string]string{
"zh": map[string]string{
"URLS": "地址",
"Method": "方法",
"ResponseTimeout": "响应超时",
"Headers": "Headers",
"Username": "用户名",
"Password": "密码",
"Body": "Body",
"ResponseBodyMaxSize": "ResponseBodyMaxSize",
"ResponseStringMatch": "ResponseStringMatch",
"ResponseStatusCode": "ResponseStatusCode",
"Interface": "Interface",
"HTTPProxy": "HTTPProxy",
"FollowRedirects": "FollowRedirects",
"List of urls to query": "要监测的URL地址",
"HTTP Request Method, default GET": "HTTP 的请求方法,默认是 GET",
"HTTP Request Headers": "HTTP 请求的的 Headers",
"URLS": "地址",
"Method": "方法",
"ResponseTimeout": "响应超时",
"Headers": "Headers",
"Username": "用户名",
"Password": "密码",
"Body": "Body",
"ResponseBodyMaxSize": "ResponseBodyMaxSize",
"ResponseStringMatch": "ResponseStringMatch",
"ResponseStatusCode": "ResponseStatusCode",
"Interface": "Interface",
"HTTPProxy": "HTTPProxy",
"FollowRedirects": "FollowRedirects",
"List of urls to query": "要监测的URL地址",
"HTTP Request Method, default GET": "HTTP 的请求方法,默认是 GET",
"HTTP Request Headers": "HTTP 请求的的 Headers",
"Optional HTTP Basic Auth Credentials, Username": "HTTP Basic 认证的用户名",
"Optional HTTP Basic Auth Credentials, Password": "HTTP Basic 认证的密码",
"Optional HTTP Request Body": "HTTP 请求的 Body",

View File

@ -0,0 +1,458 @@
package http_response
import (
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"regexp"
"strings"
"time"
"unicode/utf8"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs"
)
const (
// defaultResponseBodyMaxSize is the default maximum response body size, in bytes.
// if the response body is over this size, we will raise a body_read_error.
defaultResponseBodyMaxSize = 32 * 1024 * 1024
)
// HTTPResponse struct
type HTTPResponse struct {
Address string // deprecated in 1.12
URLs []string `toml:"urls"`
HTTPProxy string `toml:"http_proxy"`
Body string
Method string
ResponseTimeout time.Duration
HTTPHeaderTags map[string]string `toml:"http_header_tags"`
Headers map[string]string
FollowRedirects bool
// Absolute path to file with Bearer token
BearerToken string `toml:"bearer_token"`
ResponseBodyField string `toml:"response_body_field"`
ResponseBodyMaxSize int64 `toml:"response_body_max_size"`
ResponseStringMatch string
ResponseStatusCode int
Interface string
// HTTP Basic Auth Credentials
Username string `toml:"username"`
Password string `toml:"password"`
tls.ClientConfig
Log telegraf.Logger
compiledStringMatch *regexp.Regexp
client httpClient
}
type httpClient interface {
Do(req *http.Request) (*http.Response, error)
}
// Description returns the plugin Description
func (h *HTTPResponse) Description() string {
return "HTTP/HTTPS request given an address a method and a timeout"
}
var sampleConfig = `
## Deprecated in 1.12, use 'urls'
## Server address (default http://localhost)
# address = "http://localhost"
## List of urls to query.
# urls = ["http://localhost"]
## Set http_proxy (telegraf uses the system wide proxy settings if it's is not set)
# http_proxy = "http://localhost:8888"
## Set response_timeout (default 5 seconds)
# response_timeout = "5s"
## HTTP Request Method
# method = "GET"
## Whether to follow redirects from the server (defaults to false)
# follow_redirects = false
## Optional file with Bearer token
## file content is added as an Authorization header
# bearer_token = "/path/to/file"
## Optional HTTP Basic Auth Credentials
# username = "username"
# password = "pa$$word"
## Optional HTTP Request Body
# body = '''
# {'fake':'data'}
# '''
## Optional name of the field that will contain the body of the response.
## By default it is set to an empty String indicating that the body's content won't be added
# response_body_field = ''
## Maximum allowed HTTP response body size in bytes.
## 0 means to use the default of 32MiB.
## If the response body size exceeds this limit a "body_read_error" will be raised
# response_body_max_size = "32MiB"
## Optional substring or regex match in body of the response (case sensitive)
# response_string_match = "\"service_status\": \"up\""
# response_string_match = "ok"
# response_string_match = "\".*_status\".?:.?\"up\""
## Expected response status code.
## The status code of the response is compared to this value. If they match, the field
## "response_status_code_match" will be 1, otherwise it will be 0. If the
## expected status code is 0, the check is disabled and the field won't be added.
# response_status_code = 0
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## HTTP Request Headers (all values must be strings)
# [inputs.http_response.headers]
# Host = "github.com"
## Optional setting to map response http headers into tags
## If the http header is not present on the request, no corresponding tag will be added
## If multiple instances of the http header are present, only the first value will be used
# http_header_tags = {"HTTP_HEADER" = "TAG_NAME"}
## Interface to use when dialing an address
# interface = "eth0"
`
// SampleConfig returns the plugin SampleConfig
func (h *HTTPResponse) SampleConfig() string {
return sampleConfig
}
// ErrRedirectAttempted indicates that a redirect occurred
var ErrRedirectAttempted = errors.New("redirect")
// Set the proxy. A configured proxy overwrites the system wide proxy.
func getProxyFunc(httpProxy string) func(*http.Request) (*url.URL, error) {
if httpProxy == "" {
return http.ProxyFromEnvironment
}
proxyURL, err := url.Parse(httpProxy)
if err != nil {
return func(_ *http.Request) (*url.URL, error) {
return nil, errors.New("bad proxy: " + err.Error())
}
}
return func(r *http.Request) (*url.URL, error) {
return proxyURL, nil
}
}
// createHTTPClient creates an http client which will timeout at the specified
// timeout period and can follow redirects if specified
func (h *HTTPResponse) createHTTPClient() (*http.Client, error) {
tlsCfg, err := h.ClientConfig.TLSConfig()
if err != nil {
return nil, err
}
dialer := &net.Dialer{}
if h.Interface != "" {
dialer.LocalAddr, err = localAddress(h.Interface)
if err != nil {
return nil, err
}
}
client := &http.Client{
Transport: &http.Transport{
Proxy: getProxyFunc(h.HTTPProxy),
DialContext: dialer.DialContext,
DisableKeepAlives: true,
TLSClientConfig: tlsCfg,
},
Timeout: time.Duration(h.ResponseTimeout),
}
if !h.FollowRedirects {
client.CheckRedirect = func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
}
}
return client, nil
}
func localAddress(interfaceName string) (net.Addr, error) {
i, err := net.InterfaceByName(interfaceName)
if err != nil {
return nil, err
}
addrs, err := i.Addrs()
if err != nil {
return nil, err
}
for _, addr := range addrs {
if naddr, ok := addr.(*net.IPNet); ok {
// leaving port set to zero to let kernel pick
return &net.TCPAddr{IP: naddr.IP}, nil
}
}
return nil, fmt.Errorf("cannot create local address for interface %q", interfaceName)
}
func setResult(resultString string, fields map[string]interface{}, tags map[string]string) {
resultCodes := map[string]int{
"success": 0,
"response_string_mismatch": 1,
"body_read_error": 2,
"connection_failed": 3,
"timeout": 4,
"dns_error": 5,
"response_status_code_mismatch": 6,
}
tags["result"] = resultString
fields["result_type"] = resultString
fields["result_code"] = resultCodes[resultString]
}
func setError(err error, fields map[string]interface{}, tags map[string]string) error {
if timeoutError, ok := err.(net.Error); ok && timeoutError.Timeout() {
setResult("timeout", fields, tags)
return timeoutError
}
urlErr, isURLErr := err.(*url.Error)
if !isURLErr {
return nil
}
opErr, isNetErr := (urlErr.Err).(*net.OpError)
if isNetErr {
switch e := (opErr.Err).(type) {
case *net.DNSError:
setResult("dns_error", fields, tags)
return e
case *net.ParseError:
// Parse error has to do with parsing of IP addresses, so we
// group it with address errors
setResult("address_error", fields, tags)
return e
}
}
return nil
}
// HTTPGather gathers all fields and returns any errors it encounters
func (h *HTTPResponse) httpGather(u string) (map[string]interface{}, map[string]string, error) {
// Prepare fields and tags
fields := make(map[string]interface{})
tags := map[string]string{"server": u, "method": h.Method}
var body io.Reader
if h.Body != "" {
body = strings.NewReader(h.Body)
}
request, err := http.NewRequest(h.Method, u, body)
if err != nil {
return nil, nil, err
}
if h.BearerToken != "" {
token, err := ioutil.ReadFile(h.BearerToken)
if err != nil {
return nil, nil, err
}
bearer := "Bearer " + strings.Trim(string(token), "\n")
request.Header.Add("Authorization", bearer)
}
for key, val := range h.Headers {
request.Header.Add(key, val)
if key == "Host" {
request.Host = val
}
}
if h.Username != "" || h.Password != "" {
request.SetBasicAuth(h.Username, h.Password)
}
// Start Timer
start := time.Now()
resp, err := h.client.Do(request)
responseTime := time.Since(start).Seconds()
// If an error in returned, it means we are dealing with a network error, as
// HTTP error codes do not generate errors in the net/http library
if err != nil {
// Log error
h.Log.Debugf("Network error while polling %s: %s", u, err.Error())
// Get error details
if setError(err, fields, tags) == nil {
// Any error not recognized by `set_error` is considered a "connection_failed"
setResult("connection_failed", fields, tags)
}
return fields, tags, nil
}
if _, ok := fields["response_time"]; !ok {
fields["response_time"] = responseTime
}
// This function closes the response body, as
// required by the net/http library
defer resp.Body.Close()
// Add the response headers
for headerName, tag := range h.HTTPHeaderTags {
headerValues, foundHeader := resp.Header[headerName]
if foundHeader && len(headerValues) > 0 {
tags[tag] = headerValues[0]
}
}
// Set log the HTTP response code
//tags["status_code"] = strconv.Itoa(resp.StatusCode)
fields["http_response_code"] = resp.StatusCode
if h.ResponseBodyMaxSize == 0 {
h.ResponseBodyMaxSize = int64(defaultResponseBodyMaxSize)
}
bodyBytes, err := ioutil.ReadAll(io.LimitReader(resp.Body, int64(h.ResponseBodyMaxSize)+1))
// Check first if the response body size exceeds the limit.
if err == nil && int64(len(bodyBytes)) > int64(h.ResponseBodyMaxSize) {
h.setBodyReadError("The body of the HTTP Response is too large", bodyBytes, fields, tags)
return fields, tags, nil
} else if err != nil {
h.setBodyReadError(fmt.Sprintf("Failed to read body of HTTP Response : %s", err.Error()), bodyBytes, fields, tags)
return fields, tags, nil
}
// Add the body of the response if expected
if len(h.ResponseBodyField) > 0 {
// Check that the content of response contains only valid utf-8 characters.
if !utf8.Valid(bodyBytes) {
h.setBodyReadError("The body of the HTTP Response is not a valid utf-8 string", bodyBytes, fields, tags)
return fields, tags, nil
}
fields[h.ResponseBodyField] = string(bodyBytes)
}
fields["content_length"] = len(bodyBytes)
var success = true
// Check the response for a regex
if h.ResponseStringMatch != "" {
if h.compiledStringMatch.Match(bodyBytes) {
fields["response_string_match"] = 1
} else {
success = false
setResult("response_string_mismatch", fields, tags)
fields["response_string_match"] = 0
}
}
// Check the response status code
if h.ResponseStatusCode > 0 {
if resp.StatusCode == h.ResponseStatusCode {
fields["response_status_code_match"] = 1
} else {
success = false
setResult("response_status_code_mismatch", fields, tags)
fields["response_status_code_match"] = 0
}
}
if success {
setResult("success", fields, tags)
}
return fields, tags, nil
}
// Set result in case of a body read error
func (h *HTTPResponse) setBodyReadError(errorMsg string, bodyBytes []byte, fields map[string]interface{}, tags map[string]string) {
h.Log.Debugf(errorMsg)
setResult("body_read_error", fields, tags)
fields["content_length"] = len(bodyBytes)
if h.ResponseStringMatch != "" {
fields["response_string_match"] = 0
}
}
// Gather gets all metric fields and tags and returns any errors it encounters
func (h *HTTPResponse) Gather(acc telegraf.Accumulator) error {
// Compile the body regex if it exist
if h.compiledStringMatch == nil {
var err error
h.compiledStringMatch, err = regexp.Compile(h.ResponseStringMatch)
if err != nil {
return fmt.Errorf("failed to compile regular expression %s : %s", h.ResponseStringMatch, err)
}
}
// Set default values
if h.ResponseTimeout < time.Duration(time.Second) {
h.ResponseTimeout = time.Duration(time.Second * 5)
}
// Check send and expected string
if h.Method == "" {
h.Method = "GET"
}
if len(h.URLs) == 0 {
if h.Address == "" {
h.URLs = []string{"http://localhost"}
} else {
h.Log.Warn("'address' deprecated in telegraf 1.12, please use 'urls'")
h.URLs = []string{h.Address}
}
}
if h.client == nil {
client, err := h.createHTTPClient()
if err != nil {
return err
}
h.client = client
}
for _, u := range h.URLs {
addr, err := url.Parse(u)
if err != nil {
acc.AddError(err)
continue
}
if addr.Scheme != "http" && addr.Scheme != "https" {
acc.AddError(errors.New("only http and https are supported"))
continue
}
// Prepare data
var fields map[string]interface{}
var tags map[string]string
// Gather data
fields, tags, err = h.httpGather(u)
if err != nil {
acc.AddError(err)
continue
}
// Add metrics
acc.AddFields("http_response", fields, tags)
}
return nil
}
func init() {
inputs.Add("http_response", func() telegraf.Input {
return &HTTPResponse{}
})
}

View File

@ -0,0 +1,134 @@
package postgresql
import (
"fmt"
"github.com/didi/nightingale/v4/src/common/i18n"
"github.com/didi/nightingale/v4/src/modules/server/collector"
"github.com/didi/nightingale/v4/src/modules/server/plugins"
"github.com/didi/nightingale/v4/src/modules/server/plugins/postgresql/postgresql"
"github.com/influxdata/telegraf"
"net"
"net/url"
"sort"
"strings"
)
func init() {
collector.CollectorRegister(NewPostgresqlCollector())
i18n.DictRegister(langDict)
}
type PostgresqlCollector struct {
*collector.BaseCollector
}
var (
langDict = map[string]map[string]string{
"zh": map[string]string{
"Dsn": "数据库地址",
"ExcludeDatabases": "不需要监控的数据库",
"if the list is empty, then metrics are gathered from all database": "如果列表为空,则收集所有数据库表",
"PgSetting": "数据库全局配置",
"gather pg setting":"是否采集 pg setting全局配置",
"StatArchiver": "采集pg_stat_archiver视图",
"gather pg_stat_archiver":"主要记录WAL归档信息",
"ReplicationSlot": "采集pg_replication_slot视图",
"gather pg_replication_slots":"用于确保WAL迁移是否正常",
"StatReplication":"采集pg_stat_replication视图",
"gather pg_stat_replication": "pg复制异步同步监控",
"StatDatabaseConfilicts":"采集pg_stat_database_confilicts视图",
"specify servers via a url matching<br />postgresql://[pqgotest[:password]]@host:port[/dbname]?sslmode=[disable|verify-ca|verify-full]]<br />": "通过URL设置指定服务器<br />postgresql://[pqgotest[:password]]@host:port[/dbname]?sslmode=[disable|verify-ca|verify-full]]<br />",
},
}
)
func NewPostgresqlCollector() *PostgresqlCollector {
return &PostgresqlCollector{BaseCollector: collector.NewBaseCollector(
"postgresql",
collector.RemoteCategory,
func() collector.TelegrafPlugin { return &PostgresqlRule{} },
)}
}
type PostgresqlRule struct {
ExcludeDatabases []string `label:"ExcludeDatabases" json:"exclude_databases" description:"if the list is empty, then metrics are gathered from all database"`
GatherPgStatReplication bool `label:"StatReplication" json:"pg_stat_replication" description:"gather pg_stat_replication" default:"false"`
GatherPgReplicationSlots bool `label:"ReplicationSlot" json:"pg_replication_slots" description:"gather pg_replication_slots" default:"false"`
GatherPgStatArchiver bool `label:"StatArchiver" json:"pg_stat_archiver" description:"gather pg_stat_archiver" default:"false"`
GatherPgSetting bool `label:"PgSetting" json:"pg_setting" description:"gather pg setting" default:"false"`
Dsn string `label:"Dsn" json:"dsn, required" description:"specify servers via a url matching<br />postgresql://[pqgotest[:password]]@host:port[/dbname]?sslmode=[disable|verify-ca|verify-full]]<br />" example:"postgresql://postgres:xxx@127.0.0.1:5432/postgres?sslmode=disable"`
plugins.ClientConfig
}
func parseURL(uri string) (string, error) {
u, err := url.Parse(uri)
if err != nil {
return "", err
}
if u.Scheme != "postgres" && u.Scheme != "postgresql" {
return "", fmt.Errorf("invalid connection protocol: %s", u.Scheme)
}
var kvs []string
escaper := strings.NewReplacer(` `, `\ `, `'`, `\'`, `\`, `\\`)
accrue := func(k, v string) {
if v != "" {
kvs = append(kvs, k+"="+escaper.Replace(v))
}
}
if u.User != nil {
v := u.User.Username()
accrue("user", v)
v, _ = u.User.Password()
accrue("password", v)
}
if host, port, err := net.SplitHostPort(u.Host); err != nil {
accrue("host", u.Host)
} else {
accrue("host", host)
accrue("port", port)
}
if u.Path != "" {
accrue("dbname", u.Path[1:])
}
q := u.Query()
for k := range q {
accrue(k, q.Get(k))
}
sort.Strings(kvs) // Makes testing easier (not a performance concern)
return strings.Join(kvs, " "), nil
}
func (p *PostgresqlRule) Validate() error {
if p.Dsn == "" {
return fmt.Errorf("postgresql.rule.address must be set")
}
_, err := parseURL(p.Dsn)
if err != nil {
return fmt.Errorf("address parse failed, detail: %v", err)
}
return nil
}
func (p *PostgresqlRule) TelegrafInput() (telegraf.Input, error) {
if err := p.Validate(); err != nil {
return nil, err
}
return &postgresql.Postgresql{
Dsn: p.Dsn,
ExcludeDatabases: p.ExcludeDatabases,
GatherPgSetting: p.GatherPgSetting,
GatherPgStatReplication: p.GatherPgStatReplication,
GatherPgReplicationSlots: p.GatherPgReplicationSlots,
GatherPgStatArchiver: p.GatherPgStatArchiver,
}, nil
}

View File

@ -0,0 +1,131 @@
package postgresql
import (
"fmt"
"github.com/influxdata/telegraf"
"math"
"strconv"
"strings"
)
// Query the pg_settings view containing runtime variables
func querySettings(host string, server *Server, acc telegraf.Accumulator) error {
// pg_settings docs: https://www.postgresql.org/docs/current/static/view-pg-settings.html
//
// NOTE: If you add more vartypes here, you must update the supported
// types in normaliseUnit() below
query := "SELECT name, setting, COALESCE(unit, ''), short_desc, vartype FROM pg_settings WHERE vartype IN ('bool', 'integer', 'real');"
rows, err := server.db.Query(query)
if err != nil {
return fmt.Errorf("Error running query on database %q: %s %v", server, "pg", err)
}
defer rows.Close() // nolint: errcheck
var fields =make(map[string]interface{})
for rows.Next() {
s := &pgSetting{}
err = rows.Scan(&s.name, &s.setting, &s.unit, &s.shortDesc, &s.vartype)
if err != nil {
return fmt.Errorf("Error retrieving rows on %q: %s %v", server, "pg", err)
}
// 处理结果
k, v := s.metric()
fields[k] = v
}
acc.AddGauge("postgresql_pg_settings",fields, map[string]string{"server": host})
return nil
}
// pgSetting is represents a PostgreSQL runtime variable as returned by the
// pg_settings view.
type pgSetting struct {
name, setting, unit, shortDesc, vartype string
}
func (s *pgSetting) metric() (name string, val float64) {
var (
err error
unit = s.unit // nolint: ineffassign
)
name = strings.Replace(s.name, ".", "_", -1)
switch s.vartype {
case "bool":
if s.setting == "on" {
val = 1
}
case "integer", "real":
if val, unit, err = s.normaliseUnit(); err != nil {
// Panic, since we should recognise all units
// and don't want to silently exlude metrics
panic(err)
}
if len(unit) > 0 {
name = fmt.Sprintf("%s_%s", name, unit)
}
default:
// Panic because we got a type we didn't ask for
panic(fmt.Sprintf("Unsupported vartype %q", s.vartype))
}
return name, val
//acc.AddGauge("postgresql_pg_settings", map[string]interface{}{name: val}, map[string]string{"server": host})
}
// TODO: fix linter override
// nolint: nakedret
func (s *pgSetting) normaliseUnit() (val float64, unit string, err error) {
val, err = strconv.ParseFloat(s.setting, 64)
if err != nil {
return val, unit, fmt.Errorf("Error converting setting %q value %q to float: %s", s.name, s.setting, err)
}
// Units defined in: https://www.postgresql.org/docs/current/static/config-setting.html
switch s.unit {
case "":
return
case "ms", "s", "min", "h", "d":
unit = "seconds"
case "B", "kB", "MB", "GB", "TB", "8kB", "16kB", "32kB", "16MB", "32MB", "64MB":
unit = "bytes"
default:
err = fmt.Errorf("Unknown unit for runtime variable: %q", s.unit)
return
}
// -1 is special, don't modify the value
if val == -1 {
return
}
switch s.unit {
case "ms":
val /= 1000
case "min":
val *= 60
case "h":
val *= 60 * 60
case "d":
val *= 60 * 60 * 24
case "kB":
val *= math.Pow(2, 10)
case "MB":
val *= math.Pow(2, 20)
case "GB":
val *= math.Pow(2, 30)
case "TB":
val *= math.Pow(2, 40)
case "8kB":
val *= math.Pow(2, 13)
case "16kB":
val *= math.Pow(2, 14)
case "32kB":
val *= math.Pow(2, 15)
case "16MB":
val *= math.Pow(2, 24)
case "32MB":
val *= math.Pow(2, 25)
case "64MB":
val *= math.Pow(2, 26)
}
return
}

View File

@ -0,0 +1,603 @@
package postgresql
import (
"database/sql"
"errors"
"fmt"
"net/url"
"regexp"
// register in driver.
_ "github.com/jackc/pgx/stdlib"
"github.com/blang/semver"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
type OverrideQuery struct {
versionRange semver.Range
query string
}
// ColumnUsage should be one of several enum values which describe how a
// queried row is to be converted to a Prometheus metric.
type ColumnUsage int
// nolint: golint
const (
DISCARD ColumnUsage = iota // Ignore this column
LABEL ColumnUsage = iota // Use this column as a label
COUNTER ColumnUsage = iota // Use this column as a counter
GAUGE ColumnUsage = iota // Use this column as a gauge
MAPPEDMETRIC ColumnUsage = iota // Use this column with the supplied mapping of text values
DURATION ColumnUsage = iota // This column should be interpreted as a text duration (and converted to milliseconds)
HISTOGRAM ColumnUsage = iota // Use this column as a histogram
)
// ColumnMapping is the user-friendly representation of a prometheus descriptor map
type ColumnMapping struct {
usage ColumnUsage `yaml:"usage"`
description string `yaml:"description"`
supportedVersions semver.Range `yaml:"pg_version"` // Semantic version ranges which are supported. Unsupported columns are not queried (internally converted to DISCARD).
}
// intermediateMetricMap holds the partially loaded metric map parsing.
// This is mainly so we can parse cacheSeconds around.
type intermediateMetricMap struct {
columnMappings map[string]ColumnMapping
}
var builtinMetricMaps = map[string]intermediateMetricMap{
"pg_stat_bgwriter": {
map[string]ColumnMapping{
"checkpoints_timed": {COUNTER, "Number of scheduled checkpoints that have been performed", nil},
"checkpoints_req": {COUNTER, "Number of requested checkpoints that have been performed", nil},
"checkpoint_write_time": {COUNTER, "Total amount of time that has been spent in the portion of checkpoint processing where files are written to disk, in milliseconds", nil},
"checkpoint_sync_time": {COUNTER, "Total amount of time that has been spent in the portion of checkpoint processing where files are synchronized to disk, in milliseconds", nil},
"buffers_checkpoint": {COUNTER, "Number of buffers written during checkpoints", nil},
"buffers_clean": {COUNTER, "Number of buffers written by the background writer", nil},
"maxwritten_clean": {COUNTER, "Number of times the background writer stopped a cleaning scan because it had written too many buffers", nil},
"buffers_backend": {COUNTER, "Number of buffers written directly by a backend", nil},
"buffers_backend_fsync": {COUNTER, "Number of times a backend had to execute its own fsync call (normally the background writer handles those even when the backend does its own write)", nil},
"buffers_alloc": {COUNTER, "Number of buffers allocated", nil},
"stats_reset": {COUNTER, "Time at which these statistics were last reset", nil},
},
},
"pg_stat_database": {
map[string]ColumnMapping{
"datid": {LABEL, "OID of a database", nil},
"datname": {LABEL, "Name of this database", nil},
"numbackends": {GAUGE, "Number of backends currently connected to this database. This is the only column in this view that returns a value reflecting current state; all other columns return the accumulated values since the last reset.", nil},
"xact_commit": {COUNTER, "Number of transactions in this database that have been committed", nil},
"xact_rollback": {COUNTER, "Number of transactions in this database that have been rolled back", nil},
"blks_read": {COUNTER, "Number of disk blocks read in this database", nil},
"blks_hit": {COUNTER, "Number of times disk blocks were found already in the buffer cache, so that a read was not necessary (this only includes hits in the PostgreSQL buffer cache, not the operating system's file system cache)", nil},
"tup_returned": {COUNTER, "Number of rows returned by queries in this database", nil},
"tup_fetched": {COUNTER, "Number of rows fetched by queries in this database", nil},
"tup_inserted": {COUNTER, "Number of rows inserted by queries in this database", nil},
"tup_updated": {COUNTER, "Number of rows updated by queries in this database", nil},
"tup_deleted": {COUNTER, "Number of rows deleted by queries in this database", nil},
"conflicts": {COUNTER, "Number of queries canceled due to conflicts with recovery in this database. (Conflicts occur only on standby servers; see pg_stat_database_conflicts for details.)", nil},
"temp_files": {COUNTER, "Number of temporary files created by queries in this database. All temporary files are counted, regardless of why the temporary file was created (e.g., sorting or hashing), and regardless of the log_temp_files setting.", nil},
"temp_bytes": {COUNTER, "Total amount of data written to temporary files by queries in this database. All temporary files are counted, regardless of why the temporary file was created, and regardless of the log_temp_files setting.", nil},
"deadlocks": {COUNTER, "Number of deadlocks detected in this database", nil},
"blk_read_time": {COUNTER, "Time spent reading data file blocks by backends in this database, in milliseconds", nil},
"blk_write_time": {COUNTER, "Time spent writing data file blocks by backends in this database, in milliseconds", nil},
"stats_reset": {COUNTER, "Time at which these statistics were last reset", nil},
},
},
"pg_stat_database_count": {
map[string]ColumnMapping{
"datid": {LABEL, "OID of a database", nil},
"datname": {LABEL, "Name of this database", nil},
"dml_data_count": {COUNTER, "", nil},
"tps": {COUNTER, "", nil},
},
},
"pg_stat_database_conflicts": {
map[string]ColumnMapping{
"datid": {LABEL, "OID of a database", nil},
"datname": {LABEL, "Name of this database", nil},
"confl_tablespace": {COUNTER, "Number of queries in this database that have been canceled due to dropped tablespaces", nil},
"confl_lock": {COUNTER, "Number of queries in this database that have been canceled due to lock timeouts", nil},
"confl_snapshot": {COUNTER, "Number of queries in this database that have been canceled due to old snapshots", nil},
"confl_bufferpin": {COUNTER, "Number of queries in this database that have been canceled due to pinned buffers", nil},
"confl_deadlock": {COUNTER, "Number of queries in this database that have been canceled due to deadlocks", nil},
},
},
"pg_locks": {
map[string]ColumnMapping{
"datname": {LABEL, "Name of this database", nil},
"mode": {LABEL, "Type of Lock", nil},
"count": {GAUGE, "Number of locks", nil},
},
},
"pg_stat_replication": {
map[string]ColumnMapping{
"procpid": {DISCARD, "Process ID of a WAL sender process", semver.MustParseRange("<9.2.0")},
"pid": {DISCARD, "Process ID of a WAL sender process", semver.MustParseRange(">=9.2.0")},
"usesysid": {DISCARD, "OID of the user logged into this WAL sender process", nil},
"usename": {DISCARD, "Name of the user logged into this WAL sender process", nil},
"application_name": {LABEL, "Name of the application that is connected to this WAL sender", nil},
"client_addr": {LABEL, "IP address of the client connected to this WAL sender. If this field is null, it indicates that the client is connected via a Unix socket on the server machine.", nil},
"client_hostname": {DISCARD, "Host name of the connected client, as reported by a reverse DNS lookup of client_addr. This field will only be non-null for IP connections, and only when log_hostname is enabled.", nil},
"client_port": {DISCARD, "TCP port number that the client is using for communication with this WAL sender, or -1 if a Unix socket is used", nil},
"backend_start": {DISCARD, "with time zone Time when this process was started, i.e., when the client connected to this WAL sender", nil},
"backend_xmin": {DISCARD, "The current backend's xmin horizon.", nil},
"state": {LABEL, "Current WAL sender state", nil},
"sent_location": {DISCARD, "Last transaction log position sent on this connection", semver.MustParseRange("<10.0.0")},
"write_location": {DISCARD, "Last transaction log position written to disk by this standby server", semver.MustParseRange("<10.0.0")},
"flush_location": {DISCARD, "Last transaction log position flushed to disk by this standby server", semver.MustParseRange("<10.0.0")},
"replay_location": {DISCARD, "Last transaction log position replayed into the database on this standby server", semver.MustParseRange("<10.0.0")},
"sent_lsn": {DISCARD, "Last transaction log position sent on this connection", semver.MustParseRange(">=10.0.0")},
"write_lsn": {DISCARD, "Last transaction log position written to disk by this standby server", semver.MustParseRange(">=10.0.0")},
"flush_lsn": {DISCARD, "Last transaction log position flushed to disk by this standby server", semver.MustParseRange(">=10.0.0")},
"replay_lsn": {DISCARD, "Last transaction log position replayed into the database on this standby server", semver.MustParseRange(">=10.0.0")},
"sync_priority": {DISCARD, "Priority of this standby server for being chosen as the synchronous standby", nil},
"sync_state": {DISCARD, "Synchronous state of this standby server", nil},
"slot_name": {LABEL, "A unique, cluster-wide identifier for the replication slot", semver.MustParseRange(">=9.2.0")},
"plugin": {DISCARD, "The base name of the shared object containing the output plugin this logical slot is using, or null for physical slots", nil},
"slot_type": {DISCARD, "The slot type - physical or logical", nil},
"datoid": {DISCARD, "The OID of the database this slot is associated with, or null. Only logical slots have an associated database", nil},
"database": {DISCARD, "The name of the database this slot is associated with, or null. Only logical slots have an associated database", nil},
"active": {DISCARD, "True if this slot is currently actively being used", nil},
"active_pid": {DISCARD, "Process ID of a WAL sender process", nil},
"xmin": {DISCARD, "The oldest transaction that this slot needs the database to retain. VACUUM cannot remove tuples deleted by any later transaction", nil},
"catalog_xmin": {DISCARD, "The oldest transaction affecting the system catalogs that this slot needs the database to retain. VACUUM cannot remove catalog tuples deleted by any later transaction", nil},
"restart_lsn": {DISCARD, "The address (LSN) of oldest WAL which still might be required by the consumer of this slot and thus won't be automatically removed during checkpoints", nil},
"pg_current_xlog_location": {DISCARD, "pg_current_xlog_location", nil},
"pg_current_wal_lsn": {DISCARD, "pg_current_xlog_location", semver.MustParseRange(">=10.0.0")},
"pg_current_wal_lsn_bytes": {GAUGE, "WAL position in bytes", semver.MustParseRange(">=10.0.0")},
"pg_xlog_location_diff": {GAUGE, "Lag in bytes between master and slave", semver.MustParseRange(">=9.2.0 <10.0.0")},
"pg_wal_lsn_diff": {GAUGE, "Lag in bytes between master and slave", semver.MustParseRange(">=10.0.0")},
"confirmed_flush_lsn": {DISCARD, "LSN position a consumer of a slot has confirmed flushing the data received", nil},
"write_lag": {DISCARD, "Time elapsed between flushing recent WAL locally and receiving notification that this standby server has written it (but not yet flushed it or applied it). This can be used to gauge the delay that synchronous_commit level remote_write incurred while committing if this server was configured as a synchronous standby.", semver.MustParseRange(">=10.0.0")},
"flush_lag": {DISCARD, "Time elapsed between flushing recent WAL locally and receiving notification that this standby server has written and flushed it (but not yet applied it). This can be used to gauge the delay that synchronous_commit level remote_flush incurred while committing if this server was configured as a synchronous standby.", semver.MustParseRange(">=10.0.0")},
"replay_lag": {DISCARD, "Time elapsed between flushing recent WAL locally and receiving notification that this standby server has written, flushed and applied it. This can be used to gauge the delay that synchronous_commit level remote_apply incurred while committing if this server was configured as a synchronous standby.", semver.MustParseRange(">=10.0.0")},
},
},
"pg_replication_slots": {
map[string]ColumnMapping{
"slot_name": {LABEL, "Name of the replication slot", nil},
"database": {LABEL, "Name of the database", nil},
"active": {GAUGE, "Flag indicating if the slot is active", nil},
"pg_wal_lsn_diff": {GAUGE, "Replication lag in bytes", nil},
},
},
"pg_stat_archiver": {
map[string]ColumnMapping{
"archived_count": {COUNTER, "Number of WAL files that have been successfully archived", nil},
"last_archived_wal": {DISCARD, "Name of the last WAL file successfully archived", nil},
"last_archived_time": {DISCARD, "Time of the last successful archive operation", nil},
"failed_count": {COUNTER, "Number of failed attempts for archiving WAL files", nil},
"last_failed_wal": {DISCARD, "Name of the WAL file of the last failed archival operation", nil},
"last_failed_time": {DISCARD, "Time of the last failed archival operation", nil},
"stats_reset": {DISCARD, "Time at which these statistics were last reset", nil},
"last_archive_age": {GAUGE, "Time in seconds since last WAL segment was successfully archived", nil},
},
},
"pg_stat_activity": {
map[string]ColumnMapping{
"datname": {LABEL, "Name of this database", nil},
"state": {LABEL, "connection state", semver.MustParseRange(">=9.2.0")},
"count": {GAUGE, "number of connections in this state", nil},
"max_tx_duration": {GAUGE, "max duration in seconds any active transaction has been running", nil},
},
},
}
// Overriding queries for namespaces above.
// TODO: validate this is a closed set in tests, and there are no overlaps
var queryOverrides = map[string][]OverrideQuery{
"pg_locks": {
{
semver.MustParseRange(">0.0.0"),
`SELECT pg_database.datname,tmp.mode,COALESCE(count,0) as count
FROM
(
VALUES ('accesssharelock'),
('rowsharelock'),
('rowexclusivelock'),
('shareupdateexclusivelock'),
('sharelock'),
('sharerowexclusivelock'),
('exclusivelock'),
('accessexclusivelock'),
('sireadlock')
) AS tmp(mode) CROSS JOIN pg_database
LEFT JOIN
(SELECT database, lower(mode) AS mode,count(*) AS count
FROM pg_locks WHERE database IS NOT NULL
GROUP BY database, lower(mode)
) AS tmp2
ON tmp.mode=tmp2.mode and pg_database.oid = tmp2.database ORDER BY 1`,
},
},
"pg_stat_replication": {
{
semver.MustParseRange(">=10.0.0"),
`
SELECT *,
(case pg_is_in_recovery() when 't' then null else pg_current_wal_lsn() end) AS pg_current_wal_lsn,
(case pg_is_in_recovery() when 't' then null else pg_wal_lsn_diff(pg_current_wal_lsn(), pg_lsn('0/0'))::float end) AS pg_current_wal_lsn_bytes,
(case pg_is_in_recovery() when 't' then null else pg_wal_lsn_diff(pg_current_wal_lsn(), replay_lsn)::float end) AS pg_wal_lsn_diff
FROM pg_stat_replication
`,
},
{
semver.MustParseRange(">=9.2.0 <10.0.0"),
`
SELECT *,
(case pg_is_in_recovery() when 't' then null else pg_current_xlog_location() end) AS pg_current_xlog_location,
(case pg_is_in_recovery() when 't' then null else pg_xlog_location_diff(pg_current_xlog_location(), replay_location)::float end) AS pg_xlog_location_diff
FROM pg_stat_replication
`,
},
{
semver.MustParseRange("<9.2.0"),
`
SELECT *,
(case pg_is_in_recovery() when 't' then null else pg_current_xlog_location() end) AS pg_current_xlog_location
FROM pg_stat_replication
`,
},
},
"pg_replication_slots": {
{
semver.MustParseRange(">=9.4.0"),
`
SELECT slot_name, database, active, pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)
FROM pg_replication_slots
`,
},
},
"pg_stat_database": {
{
semver.MustParseRange(">=0.0.0"),
`
SELECT * FROM pg_stat_database
`,
},
},
"pg_stat_database_count": {
{
semver.MustParseRange(">=0.0.0"),
`
SELECT
datid,
datname,
tup_inserted + tup_updated + tup_deleted AS dml_data_count,
xact_rollback + xact_commit AS tps
FROM
pg_stat_database
`,
},
},
"pg_stat_archiver": {
{
semver.MustParseRange(">=0.0.0"),
`
SELECT *,
extract(epoch from now() - last_archived_time) AS last_archive_age
FROM pg_stat_archiver
`,
},
},
"pg_stat_bgwriter": {
{
semver.MustParseRange(">=0.0.0"),
`
SELECT * FROM pg_stat_bgwriter
`,
},
},
"pg_stat_activity": {
// This query only works
{
semver.MustParseRange(">=9.2.0"),
`
SELECT
pg_database.datname,
tmp.state,
COALESCE(count,0) as count,
COALESCE(max_tx_duration,0) as max_tx_duration
FROM
(
VALUES ('active'),
('idle'),
('idle in transaction'),
('idle in transaction (aborted)'),
('fastpath function call'),
('disabled')
) AS tmp(state) CROSS JOIN pg_database
LEFT JOIN
(
SELECT
datname,
state,
count(*) AS count,
MAX(EXTRACT(EPOCH FROM now() - xact_start))::float AS max_tx_duration
FROM pg_stat_activity GROUP BY datname,state) AS tmp2
ON tmp.state = tmp2.state AND pg_database.datname = tmp2.datname
`,
},
{
semver.MustParseRange("<9.2.0"),
`
SELECT
datname,
'unknown' AS state,
COALESCE(count(*),0) AS count,
COALESCE(MAX(EXTRACT(EPOCH FROM now() - xact_start))::float,0) AS max_tx_duration
FROM pg_stat_activity GROUP BY datname
`,
},
},
}
type Postgresql struct {
Dsn string
server *Server
Version semver.Version
Namespace []string
queryOverrides map[string]string
ExcludeDatabases []string
GatherPgSetting bool
GatherPgStatReplication bool
GatherPgReplicationSlots bool
GatherPgStatArchiver bool
}
var ignoredColumns = map[string]bool{"stats_reset": true}
var sampleConfig = `
## specify address via a url matching:
## postgres://[pqgotest[:password]]@localhost[/dbname]\
## ?sslmode=[disable|verify-ca|verify-full]
## or a simple string:
## host=localhost user=pqgotest password=... sslmode=... dbname=app_production
##
## All connection parameters are optional.
##
## Without the dbname parameter, the driver will default to a database
## with the same name as the user. This dbname is just for instantiating a
## connection with the server and doesn't restrict the databases we are trying
## to grab metrics for.
##
address = "host=localhost user=postgres sslmode=disable"
## A custom name for the database that will be used as the "server" tag in the
## measurement output. If not specified, a default one generated from
## the connection address is used.
# outputaddress = "db01"
## connection configuration.
## maxlifetime - specify the maximum lifetime of a connection.
## default is forever (0s)
max_lifetime = "0s"
## A list of databases to explicitly ignore. If not specified, metrics for all
## databases are gathered. Do NOT use with the 'databases' option.
# ignored_databases = ["postgres", "template0", "template1"]
## A list of databases to pull metrics about. If not specified, metrics for all
## databases are gathered. Do NOT use with the 'ignored_databases' option.
# databases = ["app_production", "testing"]
`
// Regex used to get the "short-version" from the postgres version field.
var versionRegex = regexp.MustCompile(`^\w+ ((\d+)(\.\d+)?(\.\d+)?)`)
var lowestSupportedVersion = semver.MustParse("9.1.0")
// Parses the version of postgres into the short version string we can use to
// match behaviors.
func parseVersion(versionString string) (semver.Version, error) {
submatches := versionRegex.FindStringSubmatch(versionString)
if len(submatches) > 1 {
return semver.ParseTolerant(submatches[1])
}
return semver.Version{},
errors.New(fmt.Sprintln("Could not find a postgres version in string:", versionString))
}
func makeQueryOverrideMap(pgVersion semver.Version, queryOverrides map[string][]OverrideQuery) map[string]string {
resultMap := make(map[string]string)
for name, overrideDef := range queryOverrides {
// Find a matching semver. We make it an error to have overlapping
// ranges at test-time, so only 1 should ever match.
matched := false
for _, queryDef := range overrideDef {
if queryDef.versionRange(pgVersion) {
resultMap[name] = queryDef.query
matched = true
break
}
}
if !matched {
resultMap[name] = ""
}
}
return resultMap
}
// 检查版本号以及使用的sql
func (p *Postgresql) checkMapVersions(server *Server) error {
var versionString string
versionRow := server.db.QueryRow("SELECT version();")
err := versionRow.Scan(&versionString)
if err != nil {
return err
}
p.Version, err = parseVersion(versionString)
if err != nil {
return err
}
if p.Version.LT(lowestSupportedVersion) {
return errors.New(fmt.Sprintf("database version too low, version:%s, lowest version:%s", p.Version.String(), lowestSupportedVersion.String()))
}
// 确定版本号与SQL
p.queryOverrides = makeQueryOverrideMap(p.Version, queryOverrides)
return nil
}
func (p *Postgresql) SampleConfig() string {
return sampleConfig
}
func (p *Postgresql) Description() string {
return "Read metrics from one or many postgresql servers"
}
func (p *Postgresql) IgnoredColumns() map[string]bool {
return ignoredColumns
}
func (p *Postgresql) Gather(acc telegraf.Accumulator) error {
var err error
p.server, err = NewServer(p.Dsn)
if err != nil {
acc.AddGauge("postgresql", map[string]interface{}{"up": false}, nil)
return fmt.Errorf("Error opening connection to database (%s): %s", loggableDSN(p.Dsn), err.Error())
} else {
defer p.server.Close()
}
// 测试连通性
err = p.server.Ping()
if err != nil {
acc.AddGauge("postgresql", map[string]interface{}{"up": false}, nil)
return fmt.Errorf("error opening connection to database (%s): %s", loggableDSN(p.Dsn), err.Error())
}
acc.AddGauge("postgresql", map[string]interface{}{"up": true}, nil)
dsnURI, err := url.Parse(p.Dsn)
if err != nil {
errObj := fmt.Errorf("parse dsn:%s failed,plz recheck", p.Dsn)
acc.AddError(errObj)
return errObj
}
host := dsnURI.Host
// 检查当前版本并提取出对应的sql
err = p.checkMapVersions(p.server)
if err != nil {
acc.AddGauge("postgresql", map[string]interface{}{"collect_success": false}, nil)
return err
}
// 获取该server的数据
err = queryNamespaceMapping(p, p.server, acc, host)
if err != nil {
acc.AddGauge("postgresql", map[string]interface{}{"collect_success": false}, nil)
return err
}
// 查询pg_settings
if p.GatherPgSetting {
err = querySettings(host, p.server, acc)
if err != nil {
acc.AddGauge("postgresql", map[string]interface{}{"collect_success": false}, nil)
return err
}
}
acc.AddGauge("postgresql", map[string]interface{}{"collect_success": true}, nil)
return nil
}
// Query within a namespace mapping and emit metrics. Returns fatal errors if
// the scrape fails, and a slice of errors if they were non-fatal.
func queryNamespaceMapping(p *Postgresql, server *Server, acc telegraf.Accumulator, host string) error {
var namespaces = []string{"pg_stat_bgwriter", "pg_stat_database", "pg_stat_database_count", "pg_locks", "pg_stat_activity"}
if p.GatherPgReplicationSlots {
namespaces = append(namespaces, "pg_replication_slots")
}
if p.GatherPgStatArchiver {
namespaces = append(namespaces, "pg_stat_archiver")
}
if p.GatherPgStatReplication {
namespaces = append(namespaces, "pg_stat_replication")
}
var rows *sql.Rows
var err error
for _, namespace := range namespaces {
mapping := builtinMetricMaps[namespace]
query := p.queryOverrides[namespace]
rows, err = server.db.Query(query)
if err != nil {
return err
}
var columnNames []string
columnNames, err = rows.Columns()
if err != nil {
return fmt.Errorf("get namespace:%s column failed, detail:%s", namespace, err.Error())
}
// Make a lookup map for the column indices
var columnIdx = make(map[string]int, len(columnNames))
for i, n := range columnNames {
columnIdx[n] = i
}
var columnData = make([]interface{}, len(columnNames))
var scanArgs = make([]interface{}, len(columnNames))
for i := range columnData {
scanArgs[i] = &columnData[i]
}
for rows.Next() {
var tags = make(map[string]string, 0)
tags["server"] = host
var counterFields []map[string]interface{}
var gaugeFields []map[string]interface{}
err = rows.Scan(scanArgs...)
if err != nil {
return errors.New(fmt.Sprintln("Error retrieving rows:", namespace, err))
}
// Loop over column names, and match to scan data. Unknown columns
// will be filled with an untyped metric number *if* they can be
// converted to float64s. NULLs are allowed and treated as NaN.
for idx, columnName := range columnNames {
if metricMapping, ok := mapping.columnMappings[columnName]; ok {
switch metricMapping.usage {
case DISCARD:
continue
case LABEL:
v, ok := dbToString(columnData[idx])
if !ok {
acc.AddError(fmt.Errorf("columnName: %s value:%v, %s", columnName, columnData[idx], "类型错误"))
}
tags[columnName] = v
case COUNTER:
counterFields = append(counterFields, map[string]interface{}{columnName: columnData[idx]})
case GAUGE:
gaugeFields = append(gaugeFields, map[string]interface{}{columnName: columnData[idx]})
}
}
}
// 处理数据
if len(counterFields) > 0 {
for _, counterField := range counterFields {
acc.AddCounter("postgresql_"+namespace, counterField, tags)
}
}
if len(gaugeFields) > 0 {
for _, gaugeField := range gaugeFields {
acc.AddGauge("postgresql_"+namespace, gaugeField, tags)
}
}
}
if rows != nil {
_ = rows.Close()
}
}
return nil
}
func init() {
inputs.Add("postgresql", func() telegraf.Input {
return &Postgresql{
GatherPgSetting: false,
GatherPgStatReplication: false,
GatherPgReplicationSlots: false,
GatherPgStatArchiver: false,
ExcludeDatabases: []string{},
}
})
}

View File

@ -0,0 +1,38 @@
package postgresql
import (
"database/sql"
)
type Server struct {
db *sql.DB
}
// NewServer establishes a new connection using DSN.
func NewServer(dsn string) (*Server, error) {
db, err := sql.Open("postgres", dsn)
if err != nil {
return nil, err
}
db.SetMaxOpenConns(1)
db.SetMaxIdleConns(1)
s := &Server{
db: db,
}
return s, nil
}
// Close disconnects from Postgres.
func (s *Server) Close() error {
return s.db.Close()
}
// Ping checks connection availability and possibly invalidates the connection if it fails.
func (s *Server) Ping() error {
if err := s.db.Ping(); err != nil {
if cerr := s.Close(); cerr != nil {
return cerr
}
return err
}
return nil
}

View File

@ -0,0 +1,115 @@
package postgresql
import (
"errors"
"fmt"
"math"
"net/url"
"strconv"
"time"
)
func contains(a []string, x string) bool {
for _, n := range a {
if x == n {
return true
}
}
return false
}
func loggableDSN(dsn string) string {
pDSN, err := url.Parse(dsn)
if err != nil {
return "could not parse DATA_SOURCE_NAME"
}
// Blank user info if not nil
if pDSN.User != nil {
pDSN.User = url.UserPassword(pDSN.User.Username(), "PASSWORD_REMOVED")
}
return pDSN.String()
}
// 获取当前所有数据库
func queryDatabases(p *Server) ([]string, error) {
rows, err := p.db.Query("SELECT datname FROM pg_database WHERE datallowconn = true AND datistemplate = false AND datname != current_database()")
if err != nil {
return []string{},fmt.Errorf("Error retrieving databases: %v", err)
}
defer rows.Close() // nolint: errcheck
var databaseName string
result := make([]string, 0)
for rows.Next() {
err = rows.Scan(&databaseName)
if err != nil {
return []string{},errors.New(fmt.Sprintln("Error retrieving rows:", err))
}
result = append(result, databaseName)
}
return result, nil
}
// Convert database.sql types to float64s for Prometheus consumption. Null types are mapped to NaN. string and []byte
// types are mapped as NaN and !ok
func dbToFloat64(t interface{}) (float64, bool) {
switch v := t.(type) {
case int64:
return float64(v), true
case float64:
return v, true
case time.Time:
return float64(v.Unix()), true
case []byte:
// Try and convert to string and then parse to a float64
strV := string(v)
result, err := strconv.ParseFloat(strV, 64)
if err != nil {
return math.NaN(), false
}
return result, true
case string:
result, err := strconv.ParseFloat(v, 64)
if err != nil {
return math.NaN(), false
}
return result, true
case bool:
if v {
return 1, true
}
return 0, true
case nil:
return math.NaN(), true
default:
return math.NaN(), false
}
}
// Convert database.sql to string for Prometheus labels. Null types are mapped to empty strings.
func dbToString(t interface{}) (string, bool) {
switch v := t.(type) {
case int64:
return fmt.Sprintf("%v", v), true
case float64:
return fmt.Sprintf("%v", v), true
case time.Time:
return fmt.Sprintf("%v", v.Unix()), true
case nil:
return "", true
case []byte:
// Try and convert to string
return string(v), true
case string:
return v, true
case bool:
if v {
return "true", true
}
return "false", true
default:
return "", false
}
}

View File

@ -0,0 +1,21 @@
package postgresql
import (
"github.com/didi/nightingale/v4/src/modules/server/plugins"
_ "github.com/lib/pq"
"testing"
"time"
)
func TestCollect(t *testing.T) {
input := plugins.PluginTest(
t, &PostgresqlRule{
Dsn: "postgres://postgres:xxxx@127.0.0.1:5432/postgres?sslmode=disable",
ExcludeDatabases: []string{},
GatherPgReplicationSlots: true,
ClientConfig: plugins.ClientConfig{},
})
time.Sleep(2 * time.Second)
plugins.PluginInputTest(t, input)
}

View File

@ -26,15 +26,15 @@ import (
"github.com/didi/nightingale/v4/src/modules/server/http/session"
"github.com/didi/nightingale/v4/src/modules/server/judge"
"github.com/didi/nightingale/v4/src/modules/server/judge/query"
_ "github.com/didi/nightingale/v4/src/modules/server/plugins/all"
_ "github.com/didi/nightingale/v4/src/modules/server/plugins/api"
"github.com/didi/nightingale/v4/src/modules/server/rabbitmq"
"github.com/didi/nightingale/v4/src/modules/server/redisc"
"github.com/didi/nightingale/v4/src/modules/server/rpc"
"github.com/didi/nightingale/v4/src/modules/server/ssoc"
"github.com/didi/nightingale/v4/src/modules/server/timer"
"github.com/didi/nightingale/v4/src/modules/server/wechat"
_ "github.com/didi/nightingale/v4/src/modules/server/plugins/all"
_ "github.com/didi/nightingale/v4/src/modules/server/plugins/api"
_ "github.com/lib/pq"
_ "github.com/go-sql-driver/mysql"
"github.com/toolkits/file"