From f40332f1976e673b88f01b40f99a45b16ca2628b Mon Sep 17 00:00:00 2001 From: yubo Date: Mon, 26 Apr 2021 19:15:33 +0800 Subject: [PATCH 1/2] bugfix: add user.Type (#667) --- src/modules/server/http/router_user.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/modules/server/http/router_user.go b/src/modules/server/http/router_user.go index 1bbf5580..7da3dcb6 100644 --- a/src/modules/server/http/router_user.go +++ b/src/modules/server/http/router_user.go @@ -59,7 +59,7 @@ type userProfileForm struct { Im string `json:"im"` IsRoot int `json:"is_root"` LeaderId int64 `json:"leader_id"` - Typ int `json:"typ"` + Type int `json:"type"` Status int `json:"status"` Organization string `json:"organization"` } @@ -86,6 +86,7 @@ func userAddPost(c *gin.Context) { Im: f.Im, IsRoot: f.IsRoot, LeaderId: f.LeaderId, + Type: f.Type, Organization: f.Organization, UpdatedAt: now, UUID: models.GenUUIDForUser(f.Username), @@ -157,13 +158,13 @@ func userProfilePut(c *gin.Context) { target.IsRoot = f.IsRoot } - if f.Typ != target.Type { - arr = append(arr, fmt.Sprintf("typ: %d -> %d", target.Type, f.Typ)) - target.Type = f.Typ + if f.Type != target.Type { + arr = append(arr, fmt.Sprintf("type: %d -> %d", target.Type, f.Type)) + target.Type = f.Type } if f.Status != target.Status { - arr = append(arr, fmt.Sprintf("typ: %d -> %d", target.Status, f.Status)) + arr = append(arr, fmt.Sprintf("status: %d -> %d", target.Status, f.Status)) target.Status = f.Status if target.Status == models.USER_S_ACTIVE { target.LoginErrNum = 0 From 1112186d1c391db578e65571869a471da49e3e5f Mon Sep 17 00:00:00 2001 From: peng19940915 <25658446+peng19940915@users.noreply.github.com> Date: Tue, 27 Apr 2021 23:16:07 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E6=96=B0=E5=A2=9Epostgresql=E7=9B=91?= =?UTF-8?q?=E6=8E=A7=20(#671)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add postgresql & remove http_response status_code tag * add postgresql & remove http_response status_code tag Co-authored-by: leiyupeng --- go.mod | 20 +- go.sum | 2 + src/modules/prober/prober.go | 4 +- src/modules/server/plugins/all/all.go | 2 +- .../plugins/http_response/http_response.go | 34 +- .../http_response/http_response.go | 458 +++++++++++++ .../server/plugins/postgresql/postgresql.go | 134 ++++ .../postgresql/postgresql/pg_setting.go | 131 ++++ .../postgresql/postgresql/postgresql.go | 603 ++++++++++++++++++ .../plugins/postgresql/postgresql/service.go | 38 ++ .../plugins/postgresql/postgresql/utils.go | 115 ++++ .../plugins/postgresql/postgresql_test.go | 21 + src/modules/server/server.go | 6 +- 13 files changed, 1536 insertions(+), 32 deletions(-) create mode 100644 src/modules/server/plugins/http_response/http_response/http_response.go create mode 100644 src/modules/server/plugins/postgresql/postgresql.go create mode 100644 src/modules/server/plugins/postgresql/postgresql/pg_setting.go create mode 100644 src/modules/server/plugins/postgresql/postgresql/postgresql.go create mode 100644 src/modules/server/plugins/postgresql/postgresql/service.go create mode 100644 src/modules/server/plugins/postgresql/postgresql/utils.go create mode 100644 src/modules/server/plugins/postgresql/postgresql_test.go diff --git a/go.mod b/go.mod index fc162021..e551bdb0 100644 --- a/go.mod +++ b/go.mod @@ -3,37 +3,39 @@ module github.com/didi/nightingale/v4 go 1.12 require ( - github.com/Shopify/sarama v1.27.2 + github.com/Shopify/sarama v1.27.2 // indirect github.com/alouca/gologger v0.0.0-20120904114645-7d4b7291de9c // indirect + github.com/blang/semver v3.5.1+incompatible github.com/cespare/xxhash v1.1.0 - github.com/coreos/go-oidc v2.2.1+incompatible + github.com/coreos/go-oidc v2.2.1+incompatible // indirect github.com/freedomkk-qfeng/go-fastping v0.0.0-20160109021039-d7bb493dee3e // indirect github.com/gaochao1/gosnmp v0.0.0-20150630013918-783a67a067fd // indirect github.com/gaochao1/sw v4.0.0+incompatible github.com/garyburd/redigo v1.6.2 github.com/gin-contrib/pprof v1.3.0 - github.com/gin-contrib/static v0.0.1 + github.com/gin-contrib/static v0.0.1 // indirect github.com/gin-gonic/gin v1.6.3 - github.com/go-ping/ping v0.0.0-20201115131931-3300c582a663 + github.com/go-ping/ping v0.0.0-20201115131931-3300c582a663 // indirect github.com/go-sql-driver/mysql v1.5.0 github.com/google/uuid v1.1.2 - github.com/hashicorp/golang-lru v0.5.4 + github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/hpcloud/tail v1.0.0 github.com/influxdata/influxdb v1.8.0 github.com/influxdata/telegraf v1.17.2 - github.com/m3db/m3 v0.15.17 + github.com/jackc/pgx v3.6.0+incompatible + github.com/m3db/m3 v0.15.17 // indirect github.com/mattn/go-isatty v0.0.12 github.com/mattn/go-sqlite3 v1.14.0 // indirect - github.com/mojocn/base64Captcha v1.3.1 + github.com/mojocn/base64Captcha v1.3.1 // indirect github.com/pquerna/cachecontrol v0.0.0-20200819021114-67c6ae64274f // indirect github.com/robfig/go-cache v0.0.0-20130306151617-9fc39e0dbf62 // indirect github.com/shirou/gopsutil v3.20.11+incompatible // indirect github.com/spaolacci/murmur3 v1.1.0 github.com/sparrc/go-ping v0.0.0-20190613174326-4e5b6552494c github.com/spf13/viper v1.7.1 - github.com/streadway/amqp v1.0.0 + github.com/streadway/amqp v1.0.0 // indirect github.com/stretchr/testify v1.6.1 - github.com/toolkits/file v0.0.0-20160325033739-a5b3c5147e07 // indirect + github.com/toolkits/file v0.0.0-20160325033739-a5b3c5147e07 github.com/toolkits/pkg v1.1.3 github.com/toolkits/sys v0.0.0-20170615103026-1f33b217ffaf // indirect github.com/ugorji/go/codec v1.1.7 diff --git a/go.sum b/go.sum index 5e4e11b9..9b6aaf2f 100644 --- a/go.sum +++ b/go.sum @@ -151,6 +151,7 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bitly/go-hostpool v0.1.0/go.mod h1:4gOCgp6+NZnVqlKyZ/iBZFTAJKembaVENUpMkpg42fw= github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84= +github.com/blang/semver v3.5.1+incompatible h1:cQNTCjp13qL8KC3Nbxr/y2Bqb63oX6wdnnjpJbkM4JQ= github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk= github.com/bmatcuk/doublestar v1.3.1/go.mod h1:wiQtGV+rzVYxB7WIlirSN++5HPtPlXEo9MEoZQC/PmE= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= @@ -564,6 +565,7 @@ github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368/go.mod h1: github.com/influxdata/wlog v0.0.0-20160411224016-7c63b0a71ef8 h1:W2IgzRCb0L9VzMujq/QuTaZUKcH8096jWwP519mHN6Q= github.com/influxdata/wlog v0.0.0-20160411224016-7c63b0a71ef8/go.mod h1:/2NMgWB1DHM1ti/gqhOlg+LJeBVk6FqR5aVGYY0hlwI= github.com/jackc/fake v0.0.0-20150926172116-812a484cc733/go.mod h1:WrMFNQdiFJ80sQsxDoMokWK1W5TQtxBFNpzWTD84ibQ= +github.com/jackc/pgx v3.6.0+incompatible h1:bJeo4JdVbDAW8KB2m8XkFeo8CPipREoG37BwEoKGz+Q= github.com/jackc/pgx v3.6.0+incompatible/go.mod h1:0ZGrqGqkRlliWnWB4zKnWtjbSWbGkVEFm4TeybAXq+I= github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo= github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= diff --git a/src/modules/prober/prober.go b/src/modules/prober/prober.go index 364da73f..712d6ca4 100644 --- a/src/modules/prober/prober.go +++ b/src/modules/prober/prober.go @@ -21,9 +21,9 @@ import ( "github.com/didi/nightingale/v4/src/modules/server/collector" _ "github.com/didi/nightingale/v4/src/modules/server/plugins/all" - _ "github.com/go-sql-driver/mysql" - "github.com/gin-gonic/gin" + _ "github.com/go-sql-driver/mysql" + _ "github.com/lib/pq" "github.com/toolkits/pkg/file" "github.com/toolkits/pkg/logger" "github.com/toolkits/pkg/runner" diff --git a/src/modules/server/plugins/all/all.go b/src/modules/server/plugins/all/all.go index 9149b618..ddfd93e9 100644 --- a/src/modules/server/plugins/all/all.go +++ b/src/modules/server/plugins/all/all.go @@ -14,12 +14,12 @@ import ( _ "github.com/didi/nightingale/v4/src/modules/server/plugins/net_response" _ "github.com/didi/nightingale/v4/src/modules/server/plugins/nginx" _ "github.com/didi/nightingale/v4/src/modules/server/plugins/ping" + _ "github.com/didi/nightingale/v4/src/modules/server/plugins/postgresql" _ "github.com/didi/nightingale/v4/src/modules/server/plugins/prometheus" _ "github.com/didi/nightingale/v4/src/modules/server/plugins/rabbitmq" _ "github.com/didi/nightingale/v4/src/modules/server/plugins/redis" _ "github.com/didi/nightingale/v4/src/modules/server/plugins/tengine" _ "github.com/didi/nightingale/v4/src/modules/server/plugins/zookeeper" - // local _ "github.com/didi/nightingale/v4/src/modules/server/plugins/log" _ "github.com/didi/nightingale/v4/src/modules/server/plugins/plugin" diff --git a/src/modules/server/plugins/http_response/http_response.go b/src/modules/server/plugins/http_response/http_response.go index 8339976e..c0770f74 100644 --- a/src/modules/server/plugins/http_response/http_response.go +++ b/src/modules/server/plugins/http_response/http_response.go @@ -9,8 +9,8 @@ import ( "github.com/didi/nightingale/v4/src/common/i18n" "github.com/didi/nightingale/v4/src/modules/server/collector" "github.com/didi/nightingale/v4/src/modules/server/plugins" + "github.com/didi/nightingale/v4/src/modules/server/plugins/http_response/http_response" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/plugins/inputs/http_response" ) func init() { @@ -33,22 +33,22 @@ func NewCollector() *Collector { var ( langDict = map[string]map[string]string{ "zh": map[string]string{ - "URLS": "地址", - "Method": "方法", - "ResponseTimeout": "响应超时", - "Headers": "Headers", - "Username": "用户名", - "Password": "密码", - "Body": "Body", - "ResponseBodyMaxSize": "ResponseBodyMaxSize", - "ResponseStringMatch": "ResponseStringMatch", - "ResponseStatusCode": "ResponseStatusCode", - "Interface": "Interface", - "HTTPProxy": "HTTPProxy", - "FollowRedirects": "FollowRedirects", - "List of urls to query": "要监测的URL地址", - "HTTP Request Method, default GET": "HTTP 的请求方法,默认是 GET", - "HTTP Request Headers": "HTTP 请求的的 Headers", + "URLS": "地址", + "Method": "方法", + "ResponseTimeout": "响应超时", + "Headers": "Headers", + "Username": "用户名", + "Password": "密码", + "Body": "Body", + "ResponseBodyMaxSize": "ResponseBodyMaxSize", + "ResponseStringMatch": "ResponseStringMatch", + "ResponseStatusCode": "ResponseStatusCode", + "Interface": "Interface", + "HTTPProxy": "HTTPProxy", + "FollowRedirects": "FollowRedirects", + "List of urls to query": "要监测的URL地址", + "HTTP Request Method, default GET": "HTTP 的请求方法,默认是 GET", + "HTTP Request Headers": "HTTP 请求的的 Headers", "Optional HTTP Basic Auth Credentials, Username": "HTTP Basic 认证的用户名", "Optional HTTP Basic Auth Credentials, Password": "HTTP Basic 认证的密码", "Optional HTTP Request Body": "HTTP 请求的 Body", diff --git a/src/modules/server/plugins/http_response/http_response/http_response.go b/src/modules/server/plugins/http_response/http_response/http_response.go new file mode 100644 index 00000000..949a8f24 --- /dev/null +++ b/src/modules/server/plugins/http_response/http_response/http_response.go @@ -0,0 +1,458 @@ +package http_response + +import ( + "errors" + "fmt" + "io" + "io/ioutil" + "net" + "net/http" + "net/url" + "regexp" + "strings" + "time" + "unicode/utf8" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/common/tls" + "github.com/influxdata/telegraf/plugins/inputs" +) + +const ( + // defaultResponseBodyMaxSize is the default maximum response body size, in bytes. + // if the response body is over this size, we will raise a body_read_error. + defaultResponseBodyMaxSize = 32 * 1024 * 1024 +) + +// HTTPResponse struct +type HTTPResponse struct { + Address string // deprecated in 1.12 + URLs []string `toml:"urls"` + HTTPProxy string `toml:"http_proxy"` + Body string + Method string + ResponseTimeout time.Duration + HTTPHeaderTags map[string]string `toml:"http_header_tags"` + Headers map[string]string + FollowRedirects bool + // Absolute path to file with Bearer token + BearerToken string `toml:"bearer_token"` + ResponseBodyField string `toml:"response_body_field"` + ResponseBodyMaxSize int64 `toml:"response_body_max_size"` + ResponseStringMatch string + ResponseStatusCode int + Interface string + // HTTP Basic Auth Credentials + Username string `toml:"username"` + Password string `toml:"password"` + tls.ClientConfig + + Log telegraf.Logger + + compiledStringMatch *regexp.Regexp + client httpClient +} + +type httpClient interface { + Do(req *http.Request) (*http.Response, error) +} + +// Description returns the plugin Description +func (h *HTTPResponse) Description() string { + return "HTTP/HTTPS request given an address a method and a timeout" +} + +var sampleConfig = ` + ## Deprecated in 1.12, use 'urls' + ## Server address (default http://localhost) + # address = "http://localhost" + ## List of urls to query. + # urls = ["http://localhost"] + ## Set http_proxy (telegraf uses the system wide proxy settings if it's is not set) + # http_proxy = "http://localhost:8888" + ## Set response_timeout (default 5 seconds) + # response_timeout = "5s" + ## HTTP Request Method + # method = "GET" + ## Whether to follow redirects from the server (defaults to false) + # follow_redirects = false + ## Optional file with Bearer token + ## file content is added as an Authorization header + # bearer_token = "/path/to/file" + ## Optional HTTP Basic Auth Credentials + # username = "username" + # password = "pa$$word" + ## Optional HTTP Request Body + # body = ''' + # {'fake':'data'} + # ''' + ## Optional name of the field that will contain the body of the response. + ## By default it is set to an empty String indicating that the body's content won't be added + # response_body_field = '' + ## Maximum allowed HTTP response body size in bytes. + ## 0 means to use the default of 32MiB. + ## If the response body size exceeds this limit a "body_read_error" will be raised + # response_body_max_size = "32MiB" + ## Optional substring or regex match in body of the response (case sensitive) + # response_string_match = "\"service_status\": \"up\"" + # response_string_match = "ok" + # response_string_match = "\".*_status\".?:.?\"up\"" + ## Expected response status code. + ## The status code of the response is compared to this value. If they match, the field + ## "response_status_code_match" will be 1, otherwise it will be 0. If the + ## expected status code is 0, the check is disabled and the field won't be added. + # response_status_code = 0 + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false + ## HTTP Request Headers (all values must be strings) + # [inputs.http_response.headers] + # Host = "github.com" + ## Optional setting to map response http headers into tags + ## If the http header is not present on the request, no corresponding tag will be added + ## If multiple instances of the http header are present, only the first value will be used + # http_header_tags = {"HTTP_HEADER" = "TAG_NAME"} + ## Interface to use when dialing an address + # interface = "eth0" +` + +// SampleConfig returns the plugin SampleConfig +func (h *HTTPResponse) SampleConfig() string { + return sampleConfig +} + +// ErrRedirectAttempted indicates that a redirect occurred +var ErrRedirectAttempted = errors.New("redirect") + +// Set the proxy. A configured proxy overwrites the system wide proxy. +func getProxyFunc(httpProxy string) func(*http.Request) (*url.URL, error) { + if httpProxy == "" { + return http.ProxyFromEnvironment + } + proxyURL, err := url.Parse(httpProxy) + if err != nil { + return func(_ *http.Request) (*url.URL, error) { + return nil, errors.New("bad proxy: " + err.Error()) + } + } + return func(r *http.Request) (*url.URL, error) { + return proxyURL, nil + } +} + +// createHTTPClient creates an http client which will timeout at the specified +// timeout period and can follow redirects if specified +func (h *HTTPResponse) createHTTPClient() (*http.Client, error) { + tlsCfg, err := h.ClientConfig.TLSConfig() + if err != nil { + return nil, err + } + + dialer := &net.Dialer{} + + if h.Interface != "" { + dialer.LocalAddr, err = localAddress(h.Interface) + if err != nil { + return nil, err + } + } + + client := &http.Client{ + Transport: &http.Transport{ + Proxy: getProxyFunc(h.HTTPProxy), + DialContext: dialer.DialContext, + DisableKeepAlives: true, + TLSClientConfig: tlsCfg, + }, + Timeout: time.Duration(h.ResponseTimeout), + } + + if !h.FollowRedirects { + client.CheckRedirect = func(req *http.Request, via []*http.Request) error { + return http.ErrUseLastResponse + } + } + return client, nil +} + +func localAddress(interfaceName string) (net.Addr, error) { + i, err := net.InterfaceByName(interfaceName) + if err != nil { + return nil, err + } + + addrs, err := i.Addrs() + if err != nil { + return nil, err + } + + for _, addr := range addrs { + if naddr, ok := addr.(*net.IPNet); ok { + // leaving port set to zero to let kernel pick + return &net.TCPAddr{IP: naddr.IP}, nil + } + } + + return nil, fmt.Errorf("cannot create local address for interface %q", interfaceName) +} + +func setResult(resultString string, fields map[string]interface{}, tags map[string]string) { + resultCodes := map[string]int{ + "success": 0, + "response_string_mismatch": 1, + "body_read_error": 2, + "connection_failed": 3, + "timeout": 4, + "dns_error": 5, + "response_status_code_mismatch": 6, + } + + tags["result"] = resultString + fields["result_type"] = resultString + fields["result_code"] = resultCodes[resultString] +} + +func setError(err error, fields map[string]interface{}, tags map[string]string) error { + if timeoutError, ok := err.(net.Error); ok && timeoutError.Timeout() { + setResult("timeout", fields, tags) + return timeoutError + } + + urlErr, isURLErr := err.(*url.Error) + if !isURLErr { + return nil + } + + opErr, isNetErr := (urlErr.Err).(*net.OpError) + if isNetErr { + switch e := (opErr.Err).(type) { + case *net.DNSError: + setResult("dns_error", fields, tags) + return e + case *net.ParseError: + // Parse error has to do with parsing of IP addresses, so we + // group it with address errors + setResult("address_error", fields, tags) + return e + } + } + + return nil +} + +// HTTPGather gathers all fields and returns any errors it encounters +func (h *HTTPResponse) httpGather(u string) (map[string]interface{}, map[string]string, error) { + // Prepare fields and tags + fields := make(map[string]interface{}) + tags := map[string]string{"server": u, "method": h.Method} + + var body io.Reader + if h.Body != "" { + body = strings.NewReader(h.Body) + } + request, err := http.NewRequest(h.Method, u, body) + if err != nil { + return nil, nil, err + } + + if h.BearerToken != "" { + token, err := ioutil.ReadFile(h.BearerToken) + if err != nil { + return nil, nil, err + } + bearer := "Bearer " + strings.Trim(string(token), "\n") + request.Header.Add("Authorization", bearer) + } + + for key, val := range h.Headers { + request.Header.Add(key, val) + if key == "Host" { + request.Host = val + } + } + + if h.Username != "" || h.Password != "" { + request.SetBasicAuth(h.Username, h.Password) + } + + // Start Timer + start := time.Now() + resp, err := h.client.Do(request) + responseTime := time.Since(start).Seconds() + + // If an error in returned, it means we are dealing with a network error, as + // HTTP error codes do not generate errors in the net/http library + if err != nil { + // Log error + h.Log.Debugf("Network error while polling %s: %s", u, err.Error()) + + // Get error details + if setError(err, fields, tags) == nil { + // Any error not recognized by `set_error` is considered a "connection_failed" + setResult("connection_failed", fields, tags) + } + + return fields, tags, nil + } + + if _, ok := fields["response_time"]; !ok { + fields["response_time"] = responseTime + } + + // This function closes the response body, as + // required by the net/http library + defer resp.Body.Close() + + // Add the response headers + for headerName, tag := range h.HTTPHeaderTags { + headerValues, foundHeader := resp.Header[headerName] + if foundHeader && len(headerValues) > 0 { + tags[tag] = headerValues[0] + } + } + + // Set log the HTTP response code + //tags["status_code"] = strconv.Itoa(resp.StatusCode) + fields["http_response_code"] = resp.StatusCode + + if h.ResponseBodyMaxSize == 0 { + h.ResponseBodyMaxSize = int64(defaultResponseBodyMaxSize) + } + bodyBytes, err := ioutil.ReadAll(io.LimitReader(resp.Body, int64(h.ResponseBodyMaxSize)+1)) + // Check first if the response body size exceeds the limit. + if err == nil && int64(len(bodyBytes)) > int64(h.ResponseBodyMaxSize) { + h.setBodyReadError("The body of the HTTP Response is too large", bodyBytes, fields, tags) + return fields, tags, nil + } else if err != nil { + h.setBodyReadError(fmt.Sprintf("Failed to read body of HTTP Response : %s", err.Error()), bodyBytes, fields, tags) + return fields, tags, nil + } + + // Add the body of the response if expected + if len(h.ResponseBodyField) > 0 { + // Check that the content of response contains only valid utf-8 characters. + if !utf8.Valid(bodyBytes) { + h.setBodyReadError("The body of the HTTP Response is not a valid utf-8 string", bodyBytes, fields, tags) + return fields, tags, nil + } + fields[h.ResponseBodyField] = string(bodyBytes) + } + fields["content_length"] = len(bodyBytes) + + var success = true + + // Check the response for a regex + if h.ResponseStringMatch != "" { + if h.compiledStringMatch.Match(bodyBytes) { + fields["response_string_match"] = 1 + } else { + success = false + setResult("response_string_mismatch", fields, tags) + fields["response_string_match"] = 0 + } + } + + // Check the response status code + if h.ResponseStatusCode > 0 { + if resp.StatusCode == h.ResponseStatusCode { + fields["response_status_code_match"] = 1 + } else { + success = false + setResult("response_status_code_mismatch", fields, tags) + fields["response_status_code_match"] = 0 + } + } + + if success { + setResult("success", fields, tags) + } + + return fields, tags, nil +} + +// Set result in case of a body read error +func (h *HTTPResponse) setBodyReadError(errorMsg string, bodyBytes []byte, fields map[string]interface{}, tags map[string]string) { + h.Log.Debugf(errorMsg) + setResult("body_read_error", fields, tags) + fields["content_length"] = len(bodyBytes) + if h.ResponseStringMatch != "" { + fields["response_string_match"] = 0 + } +} + +// Gather gets all metric fields and tags and returns any errors it encounters +func (h *HTTPResponse) Gather(acc telegraf.Accumulator) error { + // Compile the body regex if it exist + if h.compiledStringMatch == nil { + var err error + h.compiledStringMatch, err = regexp.Compile(h.ResponseStringMatch) + if err != nil { + return fmt.Errorf("failed to compile regular expression %s : %s", h.ResponseStringMatch, err) + } + } + + // Set default values + if h.ResponseTimeout < time.Duration(time.Second) { + h.ResponseTimeout = time.Duration(time.Second * 5) + } + // Check send and expected string + if h.Method == "" { + h.Method = "GET" + } + + if len(h.URLs) == 0 { + if h.Address == "" { + h.URLs = []string{"http://localhost"} + } else { + h.Log.Warn("'address' deprecated in telegraf 1.12, please use 'urls'") + h.URLs = []string{h.Address} + } + } + + if h.client == nil { + client, err := h.createHTTPClient() + if err != nil { + return err + } + h.client = client + } + + for _, u := range h.URLs { + addr, err := url.Parse(u) + if err != nil { + acc.AddError(err) + continue + } + + if addr.Scheme != "http" && addr.Scheme != "https" { + acc.AddError(errors.New("only http and https are supported")) + continue + } + + // Prepare data + var fields map[string]interface{} + var tags map[string]string + + // Gather data + fields, tags, err = h.httpGather(u) + if err != nil { + acc.AddError(err) + continue + } + + // Add metrics + acc.AddFields("http_response", fields, tags) + } + + return nil +} + +func init() { + inputs.Add("http_response", func() telegraf.Input { + return &HTTPResponse{} + }) +} + diff --git a/src/modules/server/plugins/postgresql/postgresql.go b/src/modules/server/plugins/postgresql/postgresql.go new file mode 100644 index 00000000..6bdfca1b --- /dev/null +++ b/src/modules/server/plugins/postgresql/postgresql.go @@ -0,0 +1,134 @@ +package postgresql + +import ( + "fmt" + "github.com/didi/nightingale/v4/src/common/i18n" + "github.com/didi/nightingale/v4/src/modules/server/collector" + "github.com/didi/nightingale/v4/src/modules/server/plugins" + "github.com/didi/nightingale/v4/src/modules/server/plugins/postgresql/postgresql" + "github.com/influxdata/telegraf" + "net" + "net/url" + "sort" + "strings" +) + +func init() { + collector.CollectorRegister(NewPostgresqlCollector()) + i18n.DictRegister(langDict) +} + +type PostgresqlCollector struct { + *collector.BaseCollector +} + +var ( + langDict = map[string]map[string]string{ + "zh": map[string]string{ + "Dsn": "数据库地址", + "ExcludeDatabases": "不需要监控的数据库", + "if the list is empty, then metrics are gathered from all database": "如果列表为空,则收集所有数据库表", + "PgSetting": "数据库全局配置", + "gather pg setting":"是否采集 pg setting全局配置", + "StatArchiver": "采集pg_stat_archiver视图", + "gather pg_stat_archiver":"主要记录WAL归档信息", + "ReplicationSlot": "采集pg_replication_slot视图", + "gather pg_replication_slots":"用于确保WAL迁移是否正常", + "StatReplication":"采集pg_stat_replication视图", + "gather pg_stat_replication": "pg复制(异步同步)监控", + "StatDatabaseConfilicts":"采集pg_stat_database_confilicts视图", + "specify servers via a url matching
postgresql://[pqgotest[:password]]@host:port[/dbname]?sslmode=[disable|verify-ca|verify-full]]
": "通过URL设置指定服务器
postgresql://[pqgotest[:password]]@host:port[/dbname]?sslmode=[disable|verify-ca|verify-full]]
", + }, + } +) + +func NewPostgresqlCollector() *PostgresqlCollector { + return &PostgresqlCollector{BaseCollector: collector.NewBaseCollector( + "postgresql", + collector.RemoteCategory, + func() collector.TelegrafPlugin { return &PostgresqlRule{} }, + )} +} + +type PostgresqlRule struct { + ExcludeDatabases []string `label:"ExcludeDatabases" json:"exclude_databases" description:"if the list is empty, then metrics are gathered from all database"` + GatherPgStatReplication bool `label:"StatReplication" json:"pg_stat_replication" description:"gather pg_stat_replication" default:"false"` + GatherPgReplicationSlots bool `label:"ReplicationSlot" json:"pg_replication_slots" description:"gather pg_replication_slots" default:"false"` + GatherPgStatArchiver bool `label:"StatArchiver" json:"pg_stat_archiver" description:"gather pg_stat_archiver" default:"false"` + GatherPgSetting bool `label:"PgSetting" json:"pg_setting" description:"gather pg setting" default:"false"` + Dsn string `label:"Dsn" json:"dsn, required" description:"specify servers via a url matching
postgresql://[pqgotest[:password]]@host:port[/dbname]?sslmode=[disable|verify-ca|verify-full]]
" example:"postgresql://postgres:xxx@127.0.0.1:5432/postgres?sslmode=disable"` + plugins.ClientConfig +} + +func parseURL(uri string) (string, error) { + u, err := url.Parse(uri) + if err != nil { + return "", err + } + + if u.Scheme != "postgres" && u.Scheme != "postgresql" { + return "", fmt.Errorf("invalid connection protocol: %s", u.Scheme) + } + + var kvs []string + escaper := strings.NewReplacer(` `, `\ `, `'`, `\'`, `\`, `\\`) + accrue := func(k, v string) { + if v != "" { + kvs = append(kvs, k+"="+escaper.Replace(v)) + } + } + + if u.User != nil { + v := u.User.Username() + accrue("user", v) + + v, _ = u.User.Password() + accrue("password", v) + } + + if host, port, err := net.SplitHostPort(u.Host); err != nil { + accrue("host", u.Host) + } else { + accrue("host", host) + accrue("port", port) + } + + if u.Path != "" { + accrue("dbname", u.Path[1:]) + } + + q := u.Query() + for k := range q { + accrue(k, q.Get(k)) + } + + sort.Strings(kvs) // Makes testing easier (not a performance concern) + return strings.Join(kvs, " "), nil +} + +func (p *PostgresqlRule) Validate() error { + if p.Dsn == "" { + return fmt.Errorf("postgresql.rule.address must be set") + } + _, err := parseURL(p.Dsn) + if err != nil { + return fmt.Errorf("address parse failed, detail: %v", err) + } + return nil +} +func (p *PostgresqlRule) TelegrafInput() (telegraf.Input, error) { + + if err := p.Validate(); err != nil { + return nil, err + } + + return &postgresql.Postgresql{ + Dsn: p.Dsn, + ExcludeDatabases: p.ExcludeDatabases, + GatherPgSetting: p.GatherPgSetting, + GatherPgStatReplication: p.GatherPgStatReplication, + GatherPgReplicationSlots: p.GatherPgReplicationSlots, + GatherPgStatArchiver: p.GatherPgStatArchiver, + }, nil + +} diff --git a/src/modules/server/plugins/postgresql/postgresql/pg_setting.go b/src/modules/server/plugins/postgresql/postgresql/pg_setting.go new file mode 100644 index 00000000..339d476d --- /dev/null +++ b/src/modules/server/plugins/postgresql/postgresql/pg_setting.go @@ -0,0 +1,131 @@ +package postgresql + +import ( + "fmt" + "github.com/influxdata/telegraf" + "math" + "strconv" + "strings" +) + +// Query the pg_settings view containing runtime variables +func querySettings(host string, server *Server, acc telegraf.Accumulator) error { + // pg_settings docs: https://www.postgresql.org/docs/current/static/view-pg-settings.html + // + // NOTE: If you add more vartypes here, you must update the supported + // types in normaliseUnit() below + query := "SELECT name, setting, COALESCE(unit, ''), short_desc, vartype FROM pg_settings WHERE vartype IN ('bool', 'integer', 'real');" + rows, err := server.db.Query(query) + if err != nil { + return fmt.Errorf("Error running query on database %q: %s %v", server, "pg", err) + } + defer rows.Close() // nolint: errcheck + var fields =make(map[string]interface{}) + for rows.Next() { + s := &pgSetting{} + err = rows.Scan(&s.name, &s.setting, &s.unit, &s.shortDesc, &s.vartype) + if err != nil { + return fmt.Errorf("Error retrieving rows on %q: %s %v", server, "pg", err) + } + // 处理结果 + k, v := s.metric() + fields[k] = v + } + acc.AddGauge("postgresql_pg_settings",fields, map[string]string{"server": host}) + return nil +} + +// pgSetting is represents a PostgreSQL runtime variable as returned by the +// pg_settings view. +type pgSetting struct { + name, setting, unit, shortDesc, vartype string +} + +func (s *pgSetting) metric() (name string, val float64) { + var ( + err error + unit = s.unit // nolint: ineffassign + ) + name = strings.Replace(s.name, ".", "_", -1) + switch s.vartype { + case "bool": + if s.setting == "on" { + val = 1 + } + case "integer", "real": + if val, unit, err = s.normaliseUnit(); err != nil { + // Panic, since we should recognise all units + // and don't want to silently exlude metrics + panic(err) + } + + if len(unit) > 0 { + name = fmt.Sprintf("%s_%s", name, unit) + } + default: + // Panic because we got a type we didn't ask for + panic(fmt.Sprintf("Unsupported vartype %q", s.vartype)) + } + return name, val + //acc.AddGauge("postgresql_pg_settings", map[string]interface{}{name: val}, map[string]string{"server": host}) +} + +// TODO: fix linter override +// nolint: nakedret +func (s *pgSetting) normaliseUnit() (val float64, unit string, err error) { + val, err = strconv.ParseFloat(s.setting, 64) + if err != nil { + return val, unit, fmt.Errorf("Error converting setting %q value %q to float: %s", s.name, s.setting, err) + } + + // Units defined in: https://www.postgresql.org/docs/current/static/config-setting.html + switch s.unit { + case "": + return + case "ms", "s", "min", "h", "d": + unit = "seconds" + case "B", "kB", "MB", "GB", "TB", "8kB", "16kB", "32kB", "16MB", "32MB", "64MB": + unit = "bytes" + default: + err = fmt.Errorf("Unknown unit for runtime variable: %q", s.unit) + return + } + + // -1 is special, don't modify the value + if val == -1 { + return + } + + switch s.unit { + case "ms": + val /= 1000 + case "min": + val *= 60 + case "h": + val *= 60 * 60 + case "d": + val *= 60 * 60 * 24 + case "kB": + val *= math.Pow(2, 10) + case "MB": + val *= math.Pow(2, 20) + case "GB": + val *= math.Pow(2, 30) + case "TB": + val *= math.Pow(2, 40) + case "8kB": + val *= math.Pow(2, 13) + case "16kB": + val *= math.Pow(2, 14) + case "32kB": + val *= math.Pow(2, 15) + case "16MB": + val *= math.Pow(2, 24) + case "32MB": + val *= math.Pow(2, 25) + case "64MB": + val *= math.Pow(2, 26) + } + + return +} diff --git a/src/modules/server/plugins/postgresql/postgresql/postgresql.go b/src/modules/server/plugins/postgresql/postgresql/postgresql.go new file mode 100644 index 00000000..b3cad370 --- /dev/null +++ b/src/modules/server/plugins/postgresql/postgresql/postgresql.go @@ -0,0 +1,603 @@ +package postgresql + +import ( + "database/sql" + "errors" + "fmt" + "net/url" + "regexp" + // register in driver. + _ "github.com/jackc/pgx/stdlib" + + "github.com/blang/semver" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" +) + +type OverrideQuery struct { + versionRange semver.Range + query string +} + +// ColumnUsage should be one of several enum values which describe how a +// queried row is to be converted to a Prometheus metric. +type ColumnUsage int + +// nolint: golint +const ( + DISCARD ColumnUsage = iota // Ignore this column + LABEL ColumnUsage = iota // Use this column as a label + COUNTER ColumnUsage = iota // Use this column as a counter + GAUGE ColumnUsage = iota // Use this column as a gauge + MAPPEDMETRIC ColumnUsage = iota // Use this column with the supplied mapping of text values + DURATION ColumnUsage = iota // This column should be interpreted as a text duration (and converted to milliseconds) + HISTOGRAM ColumnUsage = iota // Use this column as a histogram +) + +// ColumnMapping is the user-friendly representation of a prometheus descriptor map +type ColumnMapping struct { + usage ColumnUsage `yaml:"usage"` + description string `yaml:"description"` + supportedVersions semver.Range `yaml:"pg_version"` // Semantic version ranges which are supported. Unsupported columns are not queried (internally converted to DISCARD). +} + +// intermediateMetricMap holds the partially loaded metric map parsing. +// This is mainly so we can parse cacheSeconds around. +type intermediateMetricMap struct { + columnMappings map[string]ColumnMapping +} + +var builtinMetricMaps = map[string]intermediateMetricMap{ + "pg_stat_bgwriter": { + map[string]ColumnMapping{ + "checkpoints_timed": {COUNTER, "Number of scheduled checkpoints that have been performed", nil}, + "checkpoints_req": {COUNTER, "Number of requested checkpoints that have been performed", nil}, + "checkpoint_write_time": {COUNTER, "Total amount of time that has been spent in the portion of checkpoint processing where files are written to disk, in milliseconds", nil}, + "checkpoint_sync_time": {COUNTER, "Total amount of time that has been spent in the portion of checkpoint processing where files are synchronized to disk, in milliseconds", nil}, + "buffers_checkpoint": {COUNTER, "Number of buffers written during checkpoints", nil}, + "buffers_clean": {COUNTER, "Number of buffers written by the background writer", nil}, + "maxwritten_clean": {COUNTER, "Number of times the background writer stopped a cleaning scan because it had written too many buffers", nil}, + "buffers_backend": {COUNTER, "Number of buffers written directly by a backend", nil}, + "buffers_backend_fsync": {COUNTER, "Number of times a backend had to execute its own fsync call (normally the background writer handles those even when the backend does its own write)", nil}, + "buffers_alloc": {COUNTER, "Number of buffers allocated", nil}, + "stats_reset": {COUNTER, "Time at which these statistics were last reset", nil}, + }, + }, + "pg_stat_database": { + map[string]ColumnMapping{ + "datid": {LABEL, "OID of a database", nil}, + "datname": {LABEL, "Name of this database", nil}, + "numbackends": {GAUGE, "Number of backends currently connected to this database. This is the only column in this view that returns a value reflecting current state; all other columns return the accumulated values since the last reset.", nil}, + "xact_commit": {COUNTER, "Number of transactions in this database that have been committed", nil}, + "xact_rollback": {COUNTER, "Number of transactions in this database that have been rolled back", nil}, + "blks_read": {COUNTER, "Number of disk blocks read in this database", nil}, + "blks_hit": {COUNTER, "Number of times disk blocks were found already in the buffer cache, so that a read was not necessary (this only includes hits in the PostgreSQL buffer cache, not the operating system's file system cache)", nil}, + "tup_returned": {COUNTER, "Number of rows returned by queries in this database", nil}, + "tup_fetched": {COUNTER, "Number of rows fetched by queries in this database", nil}, + "tup_inserted": {COUNTER, "Number of rows inserted by queries in this database", nil}, + "tup_updated": {COUNTER, "Number of rows updated by queries in this database", nil}, + "tup_deleted": {COUNTER, "Number of rows deleted by queries in this database", nil}, + "conflicts": {COUNTER, "Number of queries canceled due to conflicts with recovery in this database. (Conflicts occur only on standby servers; see pg_stat_database_conflicts for details.)", nil}, + "temp_files": {COUNTER, "Number of temporary files created by queries in this database. All temporary files are counted, regardless of why the temporary file was created (e.g., sorting or hashing), and regardless of the log_temp_files setting.", nil}, + "temp_bytes": {COUNTER, "Total amount of data written to temporary files by queries in this database. All temporary files are counted, regardless of why the temporary file was created, and regardless of the log_temp_files setting.", nil}, + "deadlocks": {COUNTER, "Number of deadlocks detected in this database", nil}, + "blk_read_time": {COUNTER, "Time spent reading data file blocks by backends in this database, in milliseconds", nil}, + "blk_write_time": {COUNTER, "Time spent writing data file blocks by backends in this database, in milliseconds", nil}, + "stats_reset": {COUNTER, "Time at which these statistics were last reset", nil}, + }, + }, + "pg_stat_database_count": { + map[string]ColumnMapping{ + "datid": {LABEL, "OID of a database", nil}, + "datname": {LABEL, "Name of this database", nil}, + "dml_data_count": {COUNTER, "", nil}, + "tps": {COUNTER, "", nil}, + }, + }, + "pg_stat_database_conflicts": { + map[string]ColumnMapping{ + "datid": {LABEL, "OID of a database", nil}, + "datname": {LABEL, "Name of this database", nil}, + "confl_tablespace": {COUNTER, "Number of queries in this database that have been canceled due to dropped tablespaces", nil}, + "confl_lock": {COUNTER, "Number of queries in this database that have been canceled due to lock timeouts", nil}, + "confl_snapshot": {COUNTER, "Number of queries in this database that have been canceled due to old snapshots", nil}, + "confl_bufferpin": {COUNTER, "Number of queries in this database that have been canceled due to pinned buffers", nil}, + "confl_deadlock": {COUNTER, "Number of queries in this database that have been canceled due to deadlocks", nil}, + }, + }, + "pg_locks": { + map[string]ColumnMapping{ + "datname": {LABEL, "Name of this database", nil}, + "mode": {LABEL, "Type of Lock", nil}, + "count": {GAUGE, "Number of locks", nil}, + }, + }, + "pg_stat_replication": { + map[string]ColumnMapping{ + "procpid": {DISCARD, "Process ID of a WAL sender process", semver.MustParseRange("<9.2.0")}, + "pid": {DISCARD, "Process ID of a WAL sender process", semver.MustParseRange(">=9.2.0")}, + "usesysid": {DISCARD, "OID of the user logged into this WAL sender process", nil}, + "usename": {DISCARD, "Name of the user logged into this WAL sender process", nil}, + "application_name": {LABEL, "Name of the application that is connected to this WAL sender", nil}, + "client_addr": {LABEL, "IP address of the client connected to this WAL sender. If this field is null, it indicates that the client is connected via a Unix socket on the server machine.", nil}, + "client_hostname": {DISCARD, "Host name of the connected client, as reported by a reverse DNS lookup of client_addr. This field will only be non-null for IP connections, and only when log_hostname is enabled.", nil}, + "client_port": {DISCARD, "TCP port number that the client is using for communication with this WAL sender, or -1 if a Unix socket is used", nil}, + "backend_start": {DISCARD, "with time zone Time when this process was started, i.e., when the client connected to this WAL sender", nil}, + "backend_xmin": {DISCARD, "The current backend's xmin horizon.", nil}, + "state": {LABEL, "Current WAL sender state", nil}, + "sent_location": {DISCARD, "Last transaction log position sent on this connection", semver.MustParseRange("<10.0.0")}, + "write_location": {DISCARD, "Last transaction log position written to disk by this standby server", semver.MustParseRange("<10.0.0")}, + "flush_location": {DISCARD, "Last transaction log position flushed to disk by this standby server", semver.MustParseRange("<10.0.0")}, + "replay_location": {DISCARD, "Last transaction log position replayed into the database on this standby server", semver.MustParseRange("<10.0.0")}, + "sent_lsn": {DISCARD, "Last transaction log position sent on this connection", semver.MustParseRange(">=10.0.0")}, + "write_lsn": {DISCARD, "Last transaction log position written to disk by this standby server", semver.MustParseRange(">=10.0.0")}, + "flush_lsn": {DISCARD, "Last transaction log position flushed to disk by this standby server", semver.MustParseRange(">=10.0.0")}, + "replay_lsn": {DISCARD, "Last transaction log position replayed into the database on this standby server", semver.MustParseRange(">=10.0.0")}, + "sync_priority": {DISCARD, "Priority of this standby server for being chosen as the synchronous standby", nil}, + "sync_state": {DISCARD, "Synchronous state of this standby server", nil}, + "slot_name": {LABEL, "A unique, cluster-wide identifier for the replication slot", semver.MustParseRange(">=9.2.0")}, + "plugin": {DISCARD, "The base name of the shared object containing the output plugin this logical slot is using, or null for physical slots", nil}, + "slot_type": {DISCARD, "The slot type - physical or logical", nil}, + "datoid": {DISCARD, "The OID of the database this slot is associated with, or null. Only logical slots have an associated database", nil}, + "database": {DISCARD, "The name of the database this slot is associated with, or null. Only logical slots have an associated database", nil}, + "active": {DISCARD, "True if this slot is currently actively being used", nil}, + "active_pid": {DISCARD, "Process ID of a WAL sender process", nil}, + "xmin": {DISCARD, "The oldest transaction that this slot needs the database to retain. VACUUM cannot remove tuples deleted by any later transaction", nil}, + "catalog_xmin": {DISCARD, "The oldest transaction affecting the system catalogs that this slot needs the database to retain. VACUUM cannot remove catalog tuples deleted by any later transaction", nil}, + "restart_lsn": {DISCARD, "The address (LSN) of oldest WAL which still might be required by the consumer of this slot and thus won't be automatically removed during checkpoints", nil}, + "pg_current_xlog_location": {DISCARD, "pg_current_xlog_location", nil}, + "pg_current_wal_lsn": {DISCARD, "pg_current_xlog_location", semver.MustParseRange(">=10.0.0")}, + "pg_current_wal_lsn_bytes": {GAUGE, "WAL position in bytes", semver.MustParseRange(">=10.0.0")}, + "pg_xlog_location_diff": {GAUGE, "Lag in bytes between master and slave", semver.MustParseRange(">=9.2.0 <10.0.0")}, + "pg_wal_lsn_diff": {GAUGE, "Lag in bytes between master and slave", semver.MustParseRange(">=10.0.0")}, + "confirmed_flush_lsn": {DISCARD, "LSN position a consumer of a slot has confirmed flushing the data received", nil}, + "write_lag": {DISCARD, "Time elapsed between flushing recent WAL locally and receiving notification that this standby server has written it (but not yet flushed it or applied it). This can be used to gauge the delay that synchronous_commit level remote_write incurred while committing if this server was configured as a synchronous standby.", semver.MustParseRange(">=10.0.0")}, + "flush_lag": {DISCARD, "Time elapsed between flushing recent WAL locally and receiving notification that this standby server has written and flushed it (but not yet applied it). This can be used to gauge the delay that synchronous_commit level remote_flush incurred while committing if this server was configured as a synchronous standby.", semver.MustParseRange(">=10.0.0")}, + "replay_lag": {DISCARD, "Time elapsed between flushing recent WAL locally and receiving notification that this standby server has written, flushed and applied it. This can be used to gauge the delay that synchronous_commit level remote_apply incurred while committing if this server was configured as a synchronous standby.", semver.MustParseRange(">=10.0.0")}, + }, + }, + "pg_replication_slots": { + map[string]ColumnMapping{ + "slot_name": {LABEL, "Name of the replication slot", nil}, + "database": {LABEL, "Name of the database", nil}, + "active": {GAUGE, "Flag indicating if the slot is active", nil}, + "pg_wal_lsn_diff": {GAUGE, "Replication lag in bytes", nil}, + }, + }, + "pg_stat_archiver": { + map[string]ColumnMapping{ + "archived_count": {COUNTER, "Number of WAL files that have been successfully archived", nil}, + "last_archived_wal": {DISCARD, "Name of the last WAL file successfully archived", nil}, + "last_archived_time": {DISCARD, "Time of the last successful archive operation", nil}, + "failed_count": {COUNTER, "Number of failed attempts for archiving WAL files", nil}, + "last_failed_wal": {DISCARD, "Name of the WAL file of the last failed archival operation", nil}, + "last_failed_time": {DISCARD, "Time of the last failed archival operation", nil}, + "stats_reset": {DISCARD, "Time at which these statistics were last reset", nil}, + "last_archive_age": {GAUGE, "Time in seconds since last WAL segment was successfully archived", nil}, + }, + }, + "pg_stat_activity": { + map[string]ColumnMapping{ + "datname": {LABEL, "Name of this database", nil}, + "state": {LABEL, "connection state", semver.MustParseRange(">=9.2.0")}, + "count": {GAUGE, "number of connections in this state", nil}, + "max_tx_duration": {GAUGE, "max duration in seconds any active transaction has been running", nil}, + }, + }, +} + +// Overriding queries for namespaces above. +// TODO: validate this is a closed set in tests, and there are no overlaps +var queryOverrides = map[string][]OverrideQuery{ + "pg_locks": { + { + semver.MustParseRange(">0.0.0"), + `SELECT pg_database.datname,tmp.mode,COALESCE(count,0) as count + FROM + ( + VALUES ('accesssharelock'), + ('rowsharelock'), + ('rowexclusivelock'), + ('shareupdateexclusivelock'), + ('sharelock'), + ('sharerowexclusivelock'), + ('exclusivelock'), + ('accessexclusivelock'), + ('sireadlock') + ) AS tmp(mode) CROSS JOIN pg_database + LEFT JOIN + (SELECT database, lower(mode) AS mode,count(*) AS count + FROM pg_locks WHERE database IS NOT NULL + GROUP BY database, lower(mode) + ) AS tmp2 + ON tmp.mode=tmp2.mode and pg_database.oid = tmp2.database ORDER BY 1`, + }, + }, + + "pg_stat_replication": { + { + semver.MustParseRange(">=10.0.0"), + ` + SELECT *, + (case pg_is_in_recovery() when 't' then null else pg_current_wal_lsn() end) AS pg_current_wal_lsn, + (case pg_is_in_recovery() when 't' then null else pg_wal_lsn_diff(pg_current_wal_lsn(), pg_lsn('0/0'))::float end) AS pg_current_wal_lsn_bytes, + (case pg_is_in_recovery() when 't' then null else pg_wal_lsn_diff(pg_current_wal_lsn(), replay_lsn)::float end) AS pg_wal_lsn_diff + FROM pg_stat_replication + `, + }, + { + semver.MustParseRange(">=9.2.0 <10.0.0"), + ` + SELECT *, + (case pg_is_in_recovery() when 't' then null else pg_current_xlog_location() end) AS pg_current_xlog_location, + (case pg_is_in_recovery() when 't' then null else pg_xlog_location_diff(pg_current_xlog_location(), replay_location)::float end) AS pg_xlog_location_diff + FROM pg_stat_replication + `, + }, + { + semver.MustParseRange("<9.2.0"), + ` + SELECT *, + (case pg_is_in_recovery() when 't' then null else pg_current_xlog_location() end) AS pg_current_xlog_location + FROM pg_stat_replication + `, + }, + }, + + "pg_replication_slots": { + { + semver.MustParseRange(">=9.4.0"), + ` + SELECT slot_name, database, active, pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) + FROM pg_replication_slots + `, + }, + }, + "pg_stat_database": { + { + semver.MustParseRange(">=0.0.0"), + ` + SELECT * FROM pg_stat_database + `, + }, + }, + "pg_stat_database_count": { + { + semver.MustParseRange(">=0.0.0"), + ` + SELECT + datid, + datname, + tup_inserted + tup_updated + tup_deleted AS dml_data_count, + xact_rollback + xact_commit AS tps + FROM + pg_stat_database + `, + }, + }, + "pg_stat_archiver": { + { + semver.MustParseRange(">=0.0.0"), + ` + SELECT *, + extract(epoch from now() - last_archived_time) AS last_archive_age + FROM pg_stat_archiver + `, + }, + }, + "pg_stat_bgwriter": { + { + semver.MustParseRange(">=0.0.0"), + ` + SELECT * FROM pg_stat_bgwriter + `, + }, + }, + "pg_stat_activity": { + // This query only works + { + semver.MustParseRange(">=9.2.0"), + ` + SELECT + pg_database.datname, + tmp.state, + COALESCE(count,0) as count, + COALESCE(max_tx_duration,0) as max_tx_duration + FROM + ( + VALUES ('active'), + ('idle'), + ('idle in transaction'), + ('idle in transaction (aborted)'), + ('fastpath function call'), + ('disabled') + ) AS tmp(state) CROSS JOIN pg_database + LEFT JOIN + ( + SELECT + datname, + state, + count(*) AS count, + MAX(EXTRACT(EPOCH FROM now() - xact_start))::float AS max_tx_duration + FROM pg_stat_activity GROUP BY datname,state) AS tmp2 + ON tmp.state = tmp2.state AND pg_database.datname = tmp2.datname + `, + }, + { + semver.MustParseRange("<9.2.0"), + ` + SELECT + datname, + 'unknown' AS state, + COALESCE(count(*),0) AS count, + COALESCE(MAX(EXTRACT(EPOCH FROM now() - xact_start))::float,0) AS max_tx_duration + FROM pg_stat_activity GROUP BY datname + `, + }, + }, +} + +type Postgresql struct { + Dsn string + server *Server + Version semver.Version + Namespace []string + queryOverrides map[string]string + ExcludeDatabases []string + GatherPgSetting bool + GatherPgStatReplication bool + GatherPgReplicationSlots bool + GatherPgStatArchiver bool +} + +var ignoredColumns = map[string]bool{"stats_reset": true} + +var sampleConfig = ` + ## specify address via a url matching: + ## postgres://[pqgotest[:password]]@localhost[/dbname]\ + ## ?sslmode=[disable|verify-ca|verify-full] + ## or a simple string: + ## host=localhost user=pqgotest password=... sslmode=... dbname=app_production + ## + ## All connection parameters are optional. + ## + ## Without the dbname parameter, the driver will default to a database + ## with the same name as the user. This dbname is just for instantiating a + ## connection with the server and doesn't restrict the databases we are trying + ## to grab metrics for. + ## + address = "host=localhost user=postgres sslmode=disable" + ## A custom name for the database that will be used as the "server" tag in the + ## measurement output. If not specified, a default one generated from + ## the connection address is used. + # outputaddress = "db01" + ## connection configuration. + ## maxlifetime - specify the maximum lifetime of a connection. + ## default is forever (0s) + max_lifetime = "0s" + ## A list of databases to explicitly ignore. If not specified, metrics for all + ## databases are gathered. Do NOT use with the 'databases' option. + # ignored_databases = ["postgres", "template0", "template1"] + ## A list of databases to pull metrics about. If not specified, metrics for all + ## databases are gathered. Do NOT use with the 'ignored_databases' option. + # databases = ["app_production", "testing"] +` + +// Regex used to get the "short-version" from the postgres version field. +var versionRegex = regexp.MustCompile(`^\w+ ((\d+)(\.\d+)?(\.\d+)?)`) +var lowestSupportedVersion = semver.MustParse("9.1.0") + +// Parses the version of postgres into the short version string we can use to +// match behaviors. +func parseVersion(versionString string) (semver.Version, error) { + submatches := versionRegex.FindStringSubmatch(versionString) + if len(submatches) > 1 { + return semver.ParseTolerant(submatches[1]) + } + return semver.Version{}, + errors.New(fmt.Sprintln("Could not find a postgres version in string:", versionString)) +} + +func makeQueryOverrideMap(pgVersion semver.Version, queryOverrides map[string][]OverrideQuery) map[string]string { + resultMap := make(map[string]string) + for name, overrideDef := range queryOverrides { + // Find a matching semver. We make it an error to have overlapping + // ranges at test-time, so only 1 should ever match. + matched := false + for _, queryDef := range overrideDef { + if queryDef.versionRange(pgVersion) { + resultMap[name] = queryDef.query + matched = true + break + } + } + if !matched { + resultMap[name] = "" + } + } + return resultMap +} + +// 检查版本号以及使用的sql +func (p *Postgresql) checkMapVersions(server *Server) error { + var versionString string + versionRow := server.db.QueryRow("SELECT version();") + err := versionRow.Scan(&versionString) + if err != nil { + return err + } + p.Version, err = parseVersion(versionString) + if err != nil { + return err + } + + if p.Version.LT(lowestSupportedVersion) { + return errors.New(fmt.Sprintf("database version too low, version:%s, lowest version:%s", p.Version.String(), lowestSupportedVersion.String())) + } + + // 确定版本号与SQL + p.queryOverrides = makeQueryOverrideMap(p.Version, queryOverrides) + return nil +} + +func (p *Postgresql) SampleConfig() string { + return sampleConfig +} + +func (p *Postgresql) Description() string { + return "Read metrics from one or many postgresql servers" +} + +func (p *Postgresql) IgnoredColumns() map[string]bool { + return ignoredColumns +} + +func (p *Postgresql) Gather(acc telegraf.Accumulator) error { + var err error + p.server, err = NewServer(p.Dsn) + if err != nil { + acc.AddGauge("postgresql", map[string]interface{}{"up": false}, nil) + return fmt.Errorf("Error opening connection to database (%s): %s", loggableDSN(p.Dsn), err.Error()) + } else { + defer p.server.Close() + } + // 测试连通性 + err = p.server.Ping() + if err != nil { + acc.AddGauge("postgresql", map[string]interface{}{"up": false}, nil) + return fmt.Errorf("error opening connection to database (%s): %s", loggableDSN(p.Dsn), err.Error()) + } + acc.AddGauge("postgresql", map[string]interface{}{"up": true}, nil) + dsnURI, err := url.Parse(p.Dsn) + if err != nil { + errObj := fmt.Errorf("parse dsn:%s failed,plz recheck", p.Dsn) + acc.AddError(errObj) + return errObj + } + host := dsnURI.Host + // 检查当前版本,并提取出对应的sql + err = p.checkMapVersions(p.server) + if err != nil { + acc.AddGauge("postgresql", map[string]interface{}{"collect_success": false}, nil) + return err + } + // 获取该server的数据 + err = queryNamespaceMapping(p, p.server, acc, host) + if err != nil { + acc.AddGauge("postgresql", map[string]interface{}{"collect_success": false}, nil) + return err + } + // 查询pg_settings + if p.GatherPgSetting { + err = querySettings(host, p.server, acc) + if err != nil { + acc.AddGauge("postgresql", map[string]interface{}{"collect_success": false}, nil) + return err + } + } + acc.AddGauge("postgresql", map[string]interface{}{"collect_success": true}, nil) + return nil +} + +// Query within a namespace mapping and emit metrics. Returns fatal errors if +// the scrape fails, and a slice of errors if they were non-fatal. +func queryNamespaceMapping(p *Postgresql, server *Server, acc telegraf.Accumulator, host string) error { + var namespaces = []string{"pg_stat_bgwriter", "pg_stat_database", "pg_stat_database_count", "pg_locks", "pg_stat_activity"} + if p.GatherPgReplicationSlots { + namespaces = append(namespaces, "pg_replication_slots") + } + if p.GatherPgStatArchiver { + namespaces = append(namespaces, "pg_stat_archiver") + } + if p.GatherPgStatReplication { + namespaces = append(namespaces, "pg_stat_replication") + } + var rows *sql.Rows + var err error + for _, namespace := range namespaces { + mapping := builtinMetricMaps[namespace] + query := p.queryOverrides[namespace] + rows, err = server.db.Query(query) + if err != nil { + + return err + } + var columnNames []string + columnNames, err = rows.Columns() + if err != nil { + return fmt.Errorf("get namespace:%s column failed, detail:%s", namespace, err.Error()) + } + // Make a lookup map for the column indices + var columnIdx = make(map[string]int, len(columnNames)) + for i, n := range columnNames { + columnIdx[n] = i + } + var columnData = make([]interface{}, len(columnNames)) + var scanArgs = make([]interface{}, len(columnNames)) + for i := range columnData { + scanArgs[i] = &columnData[i] + } + + for rows.Next() { + var tags = make(map[string]string, 0) + tags["server"] = host + var counterFields []map[string]interface{} + var gaugeFields []map[string]interface{} + err = rows.Scan(scanArgs...) + if err != nil { + return errors.New(fmt.Sprintln("Error retrieving rows:", namespace, err)) + } + // Loop over column names, and match to scan data. Unknown columns + // will be filled with an untyped metric number *if* they can be + // converted to float64s. NULLs are allowed and treated as NaN. + for idx, columnName := range columnNames { + if metricMapping, ok := mapping.columnMappings[columnName]; ok { + switch metricMapping.usage { + case DISCARD: + continue + case LABEL: + v, ok := dbToString(columnData[idx]) + if !ok { + acc.AddError(fmt.Errorf("columnName: %s value:%v, %s", columnName, columnData[idx], "类型错误")) + } + + tags[columnName] = v + case COUNTER: + counterFields = append(counterFields, map[string]interface{}{columnName: columnData[idx]}) + case GAUGE: + gaugeFields = append(gaugeFields, map[string]interface{}{columnName: columnData[idx]}) + } + } + } + // 处理数据 + if len(counterFields) > 0 { + for _, counterField := range counterFields { + acc.AddCounter("postgresql_"+namespace, counterField, tags) + } + } + if len(gaugeFields) > 0 { + for _, gaugeField := range gaugeFields { + acc.AddGauge("postgresql_"+namespace, gaugeField, tags) + } + } + } + + if rows != nil { + _ = rows.Close() + } + } + + return nil +} + +func init() { + inputs.Add("postgresql", func() telegraf.Input { + return &Postgresql{ + GatherPgSetting: false, + GatherPgStatReplication: false, + GatherPgReplicationSlots: false, + GatherPgStatArchiver: false, + ExcludeDatabases: []string{}, + } + }) +} diff --git a/src/modules/server/plugins/postgresql/postgresql/service.go b/src/modules/server/plugins/postgresql/postgresql/service.go new file mode 100644 index 00000000..3a1ca190 --- /dev/null +++ b/src/modules/server/plugins/postgresql/postgresql/service.go @@ -0,0 +1,38 @@ +package postgresql + +import ( + "database/sql" +) + +type Server struct { + db *sql.DB +} +// NewServer establishes a new connection using DSN. +func NewServer(dsn string) (*Server, error) { + db, err := sql.Open("postgres", dsn) + if err != nil { + return nil, err + } + db.SetMaxOpenConns(1) + db.SetMaxIdleConns(1) + s := &Server{ + db: db, + } + return s, nil +} + +// Close disconnects from Postgres. +func (s *Server) Close() error { + return s.db.Close() +} + +// Ping checks connection availability and possibly invalidates the connection if it fails. +func (s *Server) Ping() error { + if err := s.db.Ping(); err != nil { + if cerr := s.Close(); cerr != nil { + return cerr + } + return err + } + return nil +} diff --git a/src/modules/server/plugins/postgresql/postgresql/utils.go b/src/modules/server/plugins/postgresql/postgresql/utils.go new file mode 100644 index 00000000..53f70bc1 --- /dev/null +++ b/src/modules/server/plugins/postgresql/postgresql/utils.go @@ -0,0 +1,115 @@ +package postgresql + +import ( + "errors" + "fmt" + "math" + "net/url" + "strconv" + "time" +) + +func contains(a []string, x string) bool { + for _, n := range a { + if x == n { + return true + } + } + return false +} + + +func loggableDSN(dsn string) string { + pDSN, err := url.Parse(dsn) + if err != nil { + return "could not parse DATA_SOURCE_NAME" + } + // Blank user info if not nil + if pDSN.User != nil { + pDSN.User = url.UserPassword(pDSN.User.Username(), "PASSWORD_REMOVED") + } + + return pDSN.String() +} + +// 获取当前所有数据库 +func queryDatabases(p *Server) ([]string, error) { + rows, err := p.db.Query("SELECT datname FROM pg_database WHERE datallowconn = true AND datistemplate = false AND datname != current_database()") + if err != nil { + return []string{},fmt.Errorf("Error retrieving databases: %v", err) + } + defer rows.Close() // nolint: errcheck + var databaseName string + result := make([]string, 0) + for rows.Next() { + err = rows.Scan(&databaseName) + if err != nil { + return []string{},errors.New(fmt.Sprintln("Error retrieving rows:", err)) + } + result = append(result, databaseName) + } + return result, nil +} + +// Convert database.sql types to float64s for Prometheus consumption. Null types are mapped to NaN. string and []byte +// types are mapped as NaN and !ok +func dbToFloat64(t interface{}) (float64, bool) { + switch v := t.(type) { + case int64: + return float64(v), true + case float64: + return v, true + case time.Time: + return float64(v.Unix()), true + case []byte: + // Try and convert to string and then parse to a float64 + strV := string(v) + result, err := strconv.ParseFloat(strV, 64) + if err != nil { + + return math.NaN(), false + } + return result, true + case string: + result, err := strconv.ParseFloat(v, 64) + if err != nil { + return math.NaN(), false + } + return result, true + case bool: + if v { + return 1, true + } + return 0, true + case nil: + return math.NaN(), true + default: + return math.NaN(), false + } +} + +// Convert database.sql to string for Prometheus labels. Null types are mapped to empty strings. +func dbToString(t interface{}) (string, bool) { + switch v := t.(type) { + case int64: + return fmt.Sprintf("%v", v), true + case float64: + return fmt.Sprintf("%v", v), true + case time.Time: + return fmt.Sprintf("%v", v.Unix()), true + case nil: + return "", true + case []byte: + // Try and convert to string + return string(v), true + case string: + return v, true + case bool: + if v { + return "true", true + } + return "false", true + default: + return "", false + } +} \ No newline at end of file diff --git a/src/modules/server/plugins/postgresql/postgresql_test.go b/src/modules/server/plugins/postgresql/postgresql_test.go new file mode 100644 index 00000000..842155d6 --- /dev/null +++ b/src/modules/server/plugins/postgresql/postgresql_test.go @@ -0,0 +1,21 @@ +package postgresql + +import ( + "github.com/didi/nightingale/v4/src/modules/server/plugins" + _ "github.com/lib/pq" + "testing" + "time" +) + +func TestCollect(t *testing.T) { + input := plugins.PluginTest( + t, &PostgresqlRule{ + Dsn: "postgres://postgres:xxxx@127.0.0.1:5432/postgres?sslmode=disable", + ExcludeDatabases: []string{}, + GatherPgReplicationSlots: true, + + ClientConfig: plugins.ClientConfig{}, + }) + time.Sleep(2 * time.Second) + plugins.PluginInputTest(t, input) +} diff --git a/src/modules/server/server.go b/src/modules/server/server.go index 169d13d1..3b4cdf2b 100644 --- a/src/modules/server/server.go +++ b/src/modules/server/server.go @@ -26,15 +26,15 @@ import ( "github.com/didi/nightingale/v4/src/modules/server/http/session" "github.com/didi/nightingale/v4/src/modules/server/judge" "github.com/didi/nightingale/v4/src/modules/server/judge/query" + _ "github.com/didi/nightingale/v4/src/modules/server/plugins/all" + _ "github.com/didi/nightingale/v4/src/modules/server/plugins/api" "github.com/didi/nightingale/v4/src/modules/server/rabbitmq" "github.com/didi/nightingale/v4/src/modules/server/redisc" "github.com/didi/nightingale/v4/src/modules/server/rpc" "github.com/didi/nightingale/v4/src/modules/server/ssoc" "github.com/didi/nightingale/v4/src/modules/server/timer" "github.com/didi/nightingale/v4/src/modules/server/wechat" - - _ "github.com/didi/nightingale/v4/src/modules/server/plugins/all" - _ "github.com/didi/nightingale/v4/src/modules/server/plugins/api" + _ "github.com/lib/pq" _ "github.com/go-sql-driver/mysql" "github.com/toolkits/file"