Try to fix broken tests (#89)
* Try to fix broken tests * Remove uesless function
This commit is contained in:
parent
2d8ffb43e6
commit
c14a7a170a
|
@ -114,7 +114,7 @@ func (r *Reader) Start() {
|
||||||
|
|
||||||
}
|
}
|
||||||
func (r *Reader) check() {
|
func (r *Reader) check() {
|
||||||
nextpath := GetNowPath(r.FilePath)
|
nextpath := GetCurrentPath(r.FilePath)
|
||||||
|
|
||||||
// 文件名发生变化, 一般发生在配置了动态日志场景
|
// 文件名发生变化, 一般发生在配置了动态日志场景
|
||||||
if r.CurrentPath != nextpath {
|
if r.CurrentPath != nextpath {
|
||||||
|
|
|
@ -9,15 +9,11 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func GetNowPath(path string) string {
|
|
||||||
return getLogPath(path, true)
|
|
||||||
}
|
|
||||||
|
|
||||||
func GetCurrentPath(path string) string {
|
func GetCurrentPath(path string) string {
|
||||||
return getLogPath(path, false)
|
return getLogPath(path)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getLogPath(path string, isnext bool) string {
|
func getLogPath(path string) string {
|
||||||
pat := `(\$\{(%[YmdH][^\/]*)+\})`
|
pat := `(\$\{(%[YmdH][^\/]*)+\})`
|
||||||
reg := regexp.MustCompile(pat)
|
reg := regexp.MustCompile(pat)
|
||||||
return reg.ReplaceAllStringFunc(path, func(s string) string {
|
return reg.ReplaceAllStringFunc(path, func(s string) string {
|
||||||
|
@ -29,16 +25,6 @@ func getLogPath(path string, isnext bool) string {
|
||||||
})
|
})
|
||||||
name := strings.Split(strings.TrimLeft(stringv, "%"), "%")
|
name := strings.Split(strings.TrimLeft(stringv, "%"), "%")
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
if isnext {
|
|
||||||
switch name[len(name)-1] {
|
|
||||||
case "Y", "m", "d":
|
|
||||||
if now.Hour() == 23 {
|
|
||||||
now = time.Now() //.Add(time.Hour)
|
|
||||||
}
|
|
||||||
case "H":
|
|
||||||
now = time.Now() //.Add(time.Hour)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for k, v := range name {
|
for k, v := range name {
|
||||||
if strings.Contains(v, "Y") {
|
if strings.Contains(v, "Y") {
|
||||||
if strings.HasPrefix(v, "Y") {
|
if strings.HasPrefix(v, "Y") {
|
||||||
|
|
|
@ -4,27 +4,19 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestGetcurrentpath(t *testing.T) {
|
var (
|
||||||
path := "/home/${%Y%m}/log/${%Y%m%d}/application.log.${%Y-%m-%d-%H}"
|
path = "/home/${%Y%m}/log/${%Y%m%d}/application.log.${%Y-%m-%d-%H}"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestGetCurrentPath(t *testing.T) {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
dir1 := now.Format("200601")
|
dir1 := now.Format("200601")
|
||||||
dir2 := now.Format("20060102")
|
dir2 := now.Format("20060102")
|
||||||
suffix := now.Format("2006-01-02-15")
|
suffix := now.Format("2006-01-02-15")
|
||||||
shouldbe := fmt.Sprintf("/home/%s/log/%s/application.log.%s", dir1, dir2, suffix)
|
expected := fmt.Sprintf("/home/%s/log/%s/application.log.%s", dir1, dir2, suffix)
|
||||||
if GetCurrentPath(path) != shouldbe {
|
assert.Equal(t, expected, GetCurrentPath(path))
|
||||||
t.Error("getcurrentpath failed")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
func TestGetnextpath(t *testing.T) {
|
|
||||||
path := "/home/${%Y%m}/log/${%Y%m%d}/application.log.${%Y-%m-%d-%H}"
|
|
||||||
now := time.Now().Add(time.Hour)
|
|
||||||
dir1 := now.Format("200601")
|
|
||||||
dir2 := now.Format("20060102")
|
|
||||||
suffix := now.Format("2006-01-02-15")
|
|
||||||
shouldbe := fmt.Sprintf("/home/%s/log/%s/application.log.%s", dir1, dir2, suffix)
|
|
||||||
if GetNextPath(path) != shouldbe {
|
|
||||||
t.Error("getcurrentpath failed")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,8 +6,6 @@ import (
|
||||||
|
|
||||||
"github.com/didi/nightingale/src/modules/collector/log/reader"
|
"github.com/didi/nightingale/src/modules/collector/log/reader"
|
||||||
"github.com/didi/nightingale/src/modules/collector/log/strategy"
|
"github.com/didi/nightingale/src/modules/collector/log/strategy"
|
||||||
"github.com/didi/nightingale/src/modules/collector/stra"
|
|
||||||
|
|
||||||
"github.com/toolkits/pkg/logger"
|
"github.com/toolkits/pkg/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -44,7 +42,7 @@ func UpdateConfigsLoop() {
|
||||||
FilePath: st.FilePath,
|
FilePath: st.FilePath,
|
||||||
}
|
}
|
||||||
cache := make(chan string, WorkerConfig.QueueSize)
|
cache := make(chan string, WorkerConfig.QueueSize)
|
||||||
if err := createJob(cfg, cache, st); err != nil {
|
if err := createJob(cfg, cache); err != nil {
|
||||||
logger.Errorf("create job fail [id:%d][filePath:%s][err:%v]", cfg.Id, cfg.FilePath, err)
|
logger.Errorf("create job fail [id:%d][filePath:%s][err:%v]", cfg.Id, cfg.FilePath, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -79,7 +77,7 @@ func GetLatestTmsAndDelay(filepath string) (int64, int64, bool) {
|
||||||
}
|
}
|
||||||
|
|
||||||
//添加任务到管理map( managerjob managerconfig) 启动reader和worker
|
//添加任务到管理map( managerjob managerconfig) 启动reader和worker
|
||||||
func createJob(config *ConfigInfo, cache chan string, st *stra.Strategy) error {
|
func createJob(config *ConfigInfo, cache chan string) error {
|
||||||
if _, ok := ManagerJob[config.FilePath]; ok {
|
if _, ok := ManagerJob[config.FilePath]; ok {
|
||||||
if _, ok := ManagerConfig[config.Id]; !ok {
|
if _, ok := ManagerConfig[config.Id]; !ok {
|
||||||
ManagerConfig[config.Id] = config
|
ManagerConfig[config.Id] = config
|
||||||
|
@ -97,7 +95,7 @@ func createJob(config *ConfigInfo, cache chan string, st *stra.Strategy) error {
|
||||||
}
|
}
|
||||||
//metric.MetricReadAddReaderNum(config.FilePath)
|
//metric.MetricReadAddReaderNum(config.FilePath)
|
||||||
//启动worker
|
//启动worker
|
||||||
w := NewWorkerGroup(config.FilePath, cache, st)
|
w := NewWorkerGroup(config.FilePath, cache)
|
||||||
ManagerJob[config.FilePath] = &Job{
|
ManagerJob[config.FilePath] = &Job{
|
||||||
r: r,
|
r: r,
|
||||||
w: w,
|
w: w,
|
||||||
|
|
|
@ -2,6 +2,7 @@ package worker
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
@ -22,6 +23,6 @@ func TestCreatejobAndDeletejob(t *testing.T) {
|
||||||
fmt.Println(line)
|
fmt.Println(line)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
fmt.Println("create job failed : %v", err)
|
log.Printf("create job failed : %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,7 +67,7 @@ func (this *WorkerGroup) SetLatestTmsAndDelay(tms int64, delay int64) {
|
||||||
/*
|
/*
|
||||||
* filepath和stream依赖外部,其他的都自己创建
|
* filepath和stream依赖外部,其他的都自己创建
|
||||||
*/
|
*/
|
||||||
func NewWorkerGroup(filePath string, stream chan string, st *stra.Strategy) *WorkerGroup {
|
func NewWorkerGroup(filePath string, stream chan string) *WorkerGroup {
|
||||||
wokerNum := WorkerConfig.WorkerNum
|
wokerNum := WorkerConfig.WorkerNum
|
||||||
wg := &WorkerGroup{
|
wg := &WorkerGroup{
|
||||||
WorkerNum: wokerNum,
|
WorkerNum: wokerNum,
|
||||||
|
|
Loading…
Reference in New Issue