日志采集新增带上最后一条日志 到extra字段中,为后续报警做准备 (#614)
* m3db writetagged应该并发做,不然会导致transfer rpc变慢 * go func指针传参问题 * 新增k8s-mon三个大盘文件 * 新增k8s-mon三个大盘文件 * 修改k8s-mon三个大盘文件 * 日志采集新增带上最后一条日志 到extra字段中,为后续报警做准备
This commit is contained in:
parent
7a84223d5b
commit
7bb93e8351
|
@ -241,6 +241,7 @@ CREATE TABLE `log_collect` (
|
|||
`created` datetime NOT NULL COMMENT 'created',
|
||||
`last_updator` varchar(64) NOT NULL DEFAULT '' COMMENT 'last_updator',
|
||||
`last_updated` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
||||
`whether_attache_one_log_line` tinyint(1) not null default 0,
|
||||
PRIMARY KEY (`id`),
|
||||
KEY `idx_nid` (`nid`),
|
||||
KEY `idx_collect_type` (`collect_type`)
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
set names utf8;
|
||||
use n9e_mon;
|
||||
|
||||
alter table log_collect add `whether_attache_one_log_line` tinyint(1) not null default 0 after last_updated;
|
||||
|
||||
|
|
@ -177,12 +177,13 @@ type LogCollect struct {
|
|||
Zerofill int `xorm:"zero_fill" json:"zerofill"`
|
||||
Aggregate string `json:"aggregate"`
|
||||
|
||||
LocalUpdated int64 `xorm:"-" json:"-"`
|
||||
TimeReg *regexp.Regexp `xorm:"-" json:"-"`
|
||||
PatternReg *regexp.Regexp `xorm:"-" json:"-"`
|
||||
ExcludeReg *regexp.Regexp `xorm:"-" json:"-"`
|
||||
TagRegs map[string]*regexp.Regexp `xorm:"-" json:"-"`
|
||||
ParseSucc bool `xorm:"-" json:"-"`
|
||||
LocalUpdated int64 `xorm:"-" json:"-"`
|
||||
TimeReg *regexp.Regexp `xorm:"-" json:"-"`
|
||||
PatternReg *regexp.Regexp `xorm:"-" json:"-"`
|
||||
ExcludeReg *regexp.Regexp `xorm:"-" json:"-"`
|
||||
TagRegs map[string]*regexp.Regexp `xorm:"-" json:"-"`
|
||||
ParseSucc bool `xorm:"-" json:"-"`
|
||||
WhetherAttacheOneLogLine int `json:"whether_attache_one_log_line" xorm:"'whether_attache_one_log_line'"`
|
||||
}
|
||||
|
||||
type ApiCollect struct {
|
||||
|
|
|
@ -21,15 +21,17 @@ type AnalysPoint struct {
|
|||
Value float64
|
||||
Tms int64
|
||||
Tags map[string]string
|
||||
OneLogLine string
|
||||
}
|
||||
|
||||
//统计的实体
|
||||
type PointCounter struct {
|
||||
sync.RWMutex
|
||||
Count int64
|
||||
Sum float64
|
||||
Max float64
|
||||
Min float64
|
||||
Count int64
|
||||
Sum float64
|
||||
Max float64
|
||||
Min float64
|
||||
OneLogLine string
|
||||
}
|
||||
|
||||
// 单策略下,单step的统计对象
|
||||
|
@ -104,7 +106,7 @@ func PushToCount(Point *AnalysPoint) error {
|
|||
|
||||
//拿到tmsCount, 更新TagstringMap
|
||||
tagstring := dataobj.SortedTags(Point.Tags)
|
||||
return tmsCount.Update(tagstring, Point.Value)
|
||||
return tmsCount.Update(tagstring, Point.Value, Point.OneLogLine)
|
||||
}
|
||||
|
||||
//时间戳向前对齐
|
||||
|
@ -164,7 +166,7 @@ func (psc *PointsCounter) GetBytagstring(tagstring string) (*PointCounter, error
|
|||
return point, nil
|
||||
}
|
||||
|
||||
func (psc *PointsCounter) Update(tagstring string, value float64) error {
|
||||
func (psc *PointsCounter) Update(tagstring string, value float64, oneLogLine string) error {
|
||||
pointCount, err := psc.GetBytagstring(tagstring)
|
||||
if err != nil {
|
||||
tmp := new(PointCounter)
|
||||
|
@ -200,7 +202,7 @@ func (psc *PointsCounter) Update(tagstring string, value float64) error {
|
|||
|
||||
pointCount.Count = pointCount.Count + 1
|
||||
}
|
||||
|
||||
pointCount.OneLogLine = oneLogLine
|
||||
pointCount.Unlock()
|
||||
|
||||
return nil
|
||||
|
|
|
@ -182,6 +182,7 @@ func ToPushQueue(strategy *stra.Strategy, tms int64, pointMap map[string]*PointC
|
|||
Step: strategy.Interval,
|
||||
TagsMap: tags,
|
||||
CounterType: "GAUGE",
|
||||
Extra: PointCounter.OneLogLine,
|
||||
}
|
||||
//metric.MetricPushDelay(tms)
|
||||
pushQueue <- tmpPoint
|
||||
|
|
|
@ -316,7 +316,11 @@ func (w *Worker) producer(line string, strategy *stra.Strategy) (*AnalysPoint, e
|
|||
Tms: tms.Unix(),
|
||||
Tags: tag,
|
||||
}
|
||||
|
||||
// ==1代表要开启带上一条日志
|
||||
if strategy.WhetherAttacheOneLogLine == 1 {
|
||||
logger.Debugf("[strategy:%+v][WhetherAttacheOneLogLine:%+v]", strategy, strategy.WhetherAttacheOneLogLine)
|
||||
ret.OneLogLine = line
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -14,27 +14,28 @@ import (
|
|||
)
|
||||
|
||||
type Strategy struct {
|
||||
ID int64 `json:"id"`
|
||||
Name string `json:"name"` //监控策略名
|
||||
FilePath string `json:"file_path"` //文件路径
|
||||
TimeFormat string `json:"time_format"` //时间格式
|
||||
Pattern string `json:"pattern"` //表达式
|
||||
Exclude string `json:"-"`
|
||||
MeasurementType string `json:"measurement_type"`
|
||||
Interval int64 `json:"interval"` //采集周期
|
||||
Tags map[string]string `json:"tags"`
|
||||
Func string `json:"func"` //采集方式(max/min/avg/cnt)
|
||||
Degree int64 `json:"degree"`
|
||||
Unit string `json:"unit"`
|
||||
Comment string `json:"comment"`
|
||||
Creator string `json:"creator"`
|
||||
SrvUpdated string `json:"updated"`
|
||||
LocalUpdated int64 `json:"-"`
|
||||
TimeReg *regexp.Regexp `json:"-"`
|
||||
PatternReg *regexp.Regexp `json:"-"`
|
||||
ExcludeReg *regexp.Regexp `json:"-"`
|
||||
TagRegs map[string]*regexp.Regexp `json:"-"`
|
||||
ParseSucc bool `json:"parse_succ"`
|
||||
ID int64 `json:"id"`
|
||||
Name string `json:"name"` //监控策略名
|
||||
FilePath string `json:"file_path"` //文件路径
|
||||
TimeFormat string `json:"time_format"` //时间格式
|
||||
Pattern string `json:"pattern"` //表达式
|
||||
Exclude string `json:"-"`
|
||||
MeasurementType string `json:"measurement_type"`
|
||||
Interval int64 `json:"interval"` //采集周期
|
||||
Tags map[string]string `json:"tags"`
|
||||
Func string `json:"func"` //采集方式(max/min/avg/cnt)
|
||||
Degree int64 `json:"degree"`
|
||||
Unit string `json:"unit"`
|
||||
Comment string `json:"comment"`
|
||||
Creator string `json:"creator"`
|
||||
SrvUpdated string `json:"updated"`
|
||||
LocalUpdated int64 `json:"-"`
|
||||
TimeReg *regexp.Regexp `json:"-"`
|
||||
PatternReg *regexp.Regexp `json:"-"`
|
||||
ExcludeReg *regexp.Regexp `json:"-"`
|
||||
TagRegs map[string]*regexp.Regexp `json:"-"`
|
||||
ParseSucc bool `json:"parse_succ"`
|
||||
WhetherAttacheOneLogLine int `json:"whether_attache_one_log_line"`
|
||||
}
|
||||
|
||||
func GetLogCollects() []*Strategy {
|
||||
|
@ -133,6 +134,7 @@ func ToStrategy(p *models.LogCollect) *Strategy {
|
|||
s.Creator = p.Creator
|
||||
s.SrvUpdated = p.LastUpdated.String()
|
||||
s.LocalUpdated = p.LocalUpdated
|
||||
s.WhetherAttacheOneLogLine = p.WhetherAttacheOneLogLine
|
||||
|
||||
return &s
|
||||
}
|
||||
|
@ -257,9 +259,9 @@ func GetPatAndTimeFormat(tf string) (string, string) {
|
|||
case "yyyy/mm/dd HH:MM:SS":
|
||||
pat = `(2[0-9]{3})/(0[1-9]|1[012])/([012][0-9]|3[01])\s([01][0-9]|2[0-4])(:[012345][0-9]){2}`
|
||||
timeFormat = "2006/01/02 15:04:05"
|
||||
case "yyyy/mm/dd - HH:MM:SS":
|
||||
pat = `(2[0-9]{3})/(0[1-9]|1[012])/([012][0-9]|3[01])\s-\s([01][0-9]|2[0-4])(:[012345][0-9]){2}`
|
||||
timeFormat = "2006/01/02 - 15:04:05"
|
||||
case "yyyy/mm/dd - HH:MM:SS":
|
||||
pat = `(2[0-9]{3})/(0[1-9]|1[012])/([012][0-9]|3[01])\s-\s([01][0-9]|2[0-4])(:[012345][0-9]){2}`
|
||||
timeFormat = "2006/01/02 - 15:04:05"
|
||||
case "yyyymmdd HH:MM:SS":
|
||||
pat = `(2[0-9]{3})(0[1-9]|1[012])([012][0-9]|3[01])\s([01][0-9]|2[0-4])(:[012345][0-9]){2}`
|
||||
timeFormat = "20060102 15:04:05"
|
||||
|
@ -272,9 +274,9 @@ func GetPatAndTimeFormat(tf string) (string, string) {
|
|||
case "dd/mm/yyyy:HH:MM:SS":
|
||||
pat = `([012][0-9]|3[01])/(0[1-9]|1[012])/(2[0-9]{3}):([01][0-9]|2[0-4])(:[012345][0-9]){2}`
|
||||
timeFormat = "02/01/2006:15:04:05"
|
||||
case "mm-dd HH:MM:SS":
|
||||
pat = `(0[1-9]|1[012])-([012][0-9]|3[01])\s([01][0-9]|2[0-4])(:[012345][0-9]){2}`
|
||||
timeFormat = "01-02 15:04:05"
|
||||
case "mm-dd HH:MM:SS":
|
||||
pat = `(0[1-9]|1[012])-([012][0-9]|3[01])\s([01][0-9]|2[0-4])(:[012345][0-9]){2}`
|
||||
timeFormat = "01-02 15:04:05"
|
||||
default:
|
||||
logger.Errorf("match time pac failed : [timeFormat:%s]", tf)
|
||||
return "", ""
|
||||
|
|
Loading…
Reference in New Issue