From 748f61830bd4df1f97efa975ce6c5b91d8e3890f Mon Sep 17 00:00:00 2001 From: bandl <1658002533@qq.com> Date: Tue, 28 Sep 2021 20:55:09 +0800 Subject: [PATCH] feat(event): fix event wait err --- pkg/event/define.go | 8 +++- pkg/event/driver.go | 91 ++++++++++++++++++++++++++------------------- 2 files changed, 59 insertions(+), 40 deletions(-) diff --git a/pkg/event/define.go b/pkg/event/define.go index 2fad03e..1208c3a 100644 --- a/pkg/event/define.go +++ b/pkg/event/define.go @@ -7,10 +7,14 @@ import ( type eventType int8 const ( - normalEvent = eventType(iota) - waitEvent + defaultEventState = eventType(iota) //默认情况下的状态 + waitEventState // 等待状态 + workEventState //工作状态 + closeEventState //事件关闭状态 ) +type EventWorkFunc func() (interface{}, error) + type DriverInterface interface { Get() *Event Put(event *Event) diff --git a/pkg/event/driver.go b/pkg/event/driver.go index 114f2a8..c6b6838 100644 --- a/pkg/event/driver.go +++ b/pkg/event/driver.go @@ -1,9 +1,10 @@ package event import ( - "gitee.com/timedb/wheatCache/pkg/errorx" "sync" "time" + + "gitee.com/timedb/wheatCache/pkg/errorx" ) type Active func() ([]string, error) // 事件带函数 @@ -14,8 +15,10 @@ type Event struct { WorkTime time.Duration // 工作时间 msg map[string]string // 消息 waitResult chan interface{} // 等待返回 + err error ru sync.RWMutex - eventOnType eventType + muClose sync.Mutex //关闭锁 + eventStatus eventType } func (e *Event) SetMsg(key string, val string) { @@ -33,8 +36,12 @@ func (e *Event) GetMsg(key string) string { return e.msg[key] } -// SetCtxValue 写入 ctx 传递用参数 -func (e *Event) SetCtxValue(key string, value interface{}) { +func (e *Event) GetEventName() string { + return e.eventName +} + +// SetValue 写入 ctx 传递用参数 +func (e *Event) SetValue(key string, value interface{}) { e.ru.Lock() defer e.ru.Unlock() if e.msgCtx == nil { @@ -43,61 +50,69 @@ func (e *Event) SetCtxValue(key string, value interface{}) { e.msgCtx[key] = value } -func (e *Event) GetCtxValue(key string) (interface{}, bool) { +func (e *Event) GetValue(key string) (interface{}, bool) { e.ru.RLock() defer e.ru.RUnlock() val, ok := e.msgCtx[key] return val, ok } -func (e *Event) SetWaitResult(val interface{}) (err error) { - defer func() { - if errChan := recover(); errChan != nil { - err = errorx.New("channel err:%v", errChan) - } - }() - e.waitResult <- val - return nil +func (e *Event) SetErr(err error) { + e.err = err } -// UpdateWaitEvent 升级到 wait Event -func (e *Event) UpdateWaitEvent(ttl time.Duration) (<-chan interface{}, error) { - if e.eventOnType == waitEvent { - return nil, errorx.New("the upgrade cannot be repeated") - } - - ttl = 1 * time.Second - if ttl > 0 { - e.WorkTime = ttl - } +// InitWaitEvent 初始化 wait event 必须调用才拥有等待特性 +func (e *Event) InitWaitEvent() { + e.muClose.Lock() + defer e.muClose.Unlock() e.waitResult = make(chan interface{}) - e.eventOnType = waitEvent - - return e.waitResult, nil + e.eventStatus = waitEventState } -// GetTtlTimer 只对 wait Event 有效 -func (e *Event) GetTtlTimer() (*time.Timer, error) { - if e.eventOnType != waitEvent { - return nil, errorx.New("cannot be called in normalEvent") +// StartWaitEvent 开始一个等待任务 +func (e *Event) StartWaitEvent(ttl time.Duration) (interface{}, error) { + t := time.NewTimer(ttl) + select { + case <-t.C: + e.muClose.Lock() + defer e.muClose.Unlock() + if e.eventStatus == workEventState { + return <-e.waitResult, e.err + } + + e.eventStatus = closeEventState + return nil, errorx.TimeOutErr() + + case result := <-e.waitResult: + return result, e.err + } +} + +func (e *Event) ExecWorkAndSendResult(work EventWorkFunc) (interface{}, error) { + e.muClose.Lock() + defer e.muClose.Unlock() + if e.eventStatus != waitEventState { + return nil, errorx.New("not wait status, exec err") } - timer := time.NewTimer(e.WorkTime) - return timer, nil + e.eventStatus = workEventState + + res, err := work() + e.waitResult <- res + e.err = err + + close(e.waitResult) + e.eventStatus = closeEventState + return res, err } func NewEvent(eventName string) *Event { return &Event{ eventName: eventName, - eventOnType: normalEvent, + eventStatus: defaultEventState, } } -// Close 关闭结束事件 -func (e *Event) Close() { - close(e.waitResult) -} - type Driver struct { maxQueueSize int queue chan *Event