feat(event): fix event wait err

This commit is contained in:
bandl 2021-09-28 20:55:09 +08:00
parent 05bd410900
commit 748f61830b
2 changed files with 59 additions and 40 deletions

View File

@ -7,10 +7,14 @@ import (
type eventType int8
const (
normalEvent = eventType(iota)
waitEvent
defaultEventState = eventType(iota) //默认情况下的状态
waitEventState // 等待状态
workEventState //工作状态
closeEventState //事件关闭状态
)
type EventWorkFunc func() (interface{}, error)
type DriverInterface interface {
Get() *Event
Put(event *Event)

View File

@ -1,9 +1,10 @@
package event
import (
"gitee.com/timedb/wheatCache/pkg/errorx"
"sync"
"time"
"gitee.com/timedb/wheatCache/pkg/errorx"
)
type Active func() ([]string, error) // 事件带函数
@ -14,8 +15,10 @@ type Event struct {
WorkTime time.Duration // 工作时间
msg map[string]string // 消息
waitResult chan interface{} // 等待返回
err error
ru sync.RWMutex
eventOnType eventType
muClose sync.Mutex //关闭锁
eventStatus eventType
}
func (e *Event) SetMsg(key string, val string) {
@ -33,8 +36,12 @@ func (e *Event) GetMsg(key string) string {
return e.msg[key]
}
// SetCtxValue 写入 ctx 传递用参数
func (e *Event) SetCtxValue(key string, value interface{}) {
func (e *Event) GetEventName() string {
return e.eventName
}
// SetValue 写入 ctx 传递用参数
func (e *Event) SetValue(key string, value interface{}) {
e.ru.Lock()
defer e.ru.Unlock()
if e.msgCtx == nil {
@ -43,61 +50,69 @@ func (e *Event) SetCtxValue(key string, value interface{}) {
e.msgCtx[key] = value
}
func (e *Event) GetCtxValue(key string) (interface{}, bool) {
func (e *Event) GetValue(key string) (interface{}, bool) {
e.ru.RLock()
defer e.ru.RUnlock()
val, ok := e.msgCtx[key]
return val, ok
}
func (e *Event) SetWaitResult(val interface{}) (err error) {
defer func() {
if errChan := recover(); errChan != nil {
err = errorx.New("channel err:%v", errChan)
}
}()
e.waitResult <- val
return nil
func (e *Event) SetErr(err error) {
e.err = err
}
// UpdateWaitEvent 升级到 wait Event
func (e *Event) UpdateWaitEvent(ttl time.Duration) (<-chan interface{}, error) {
if e.eventOnType == waitEvent {
return nil, errorx.New("the upgrade cannot be repeated")
}
ttl = 1 * time.Second
if ttl > 0 {
e.WorkTime = ttl
}
// InitWaitEvent 初始化 wait event 必须调用才拥有等待特性
func (e *Event) InitWaitEvent() {
e.muClose.Lock()
defer e.muClose.Unlock()
e.waitResult = make(chan interface{})
e.eventOnType = waitEvent
return e.waitResult, nil
e.eventStatus = waitEventState
}
// GetTtlTimer 只对 wait Event 有效
func (e *Event) GetTtlTimer() (*time.Timer, error) {
if e.eventOnType != waitEvent {
return nil, errorx.New("cannot be called in normalEvent")
// StartWaitEvent 开始一个等待任务
func (e *Event) StartWaitEvent(ttl time.Duration) (interface{}, error) {
t := time.NewTimer(ttl)
select {
case <-t.C:
e.muClose.Lock()
defer e.muClose.Unlock()
if e.eventStatus == workEventState {
return <-e.waitResult, e.err
}
e.eventStatus = closeEventState
return nil, errorx.TimeOutErr()
case result := <-e.waitResult:
return result, e.err
}
}
func (e *Event) ExecWorkAndSendResult(work EventWorkFunc) (interface{}, error) {
e.muClose.Lock()
defer e.muClose.Unlock()
if e.eventStatus != waitEventState {
return nil, errorx.New("not wait status, exec err")
}
timer := time.NewTimer(e.WorkTime)
return timer, nil
e.eventStatus = workEventState
res, err := work()
e.waitResult <- res
e.err = err
close(e.waitResult)
e.eventStatus = closeEventState
return res, err
}
func NewEvent(eventName string) *Event {
return &Event{
eventName: eventName,
eventOnType: normalEvent,
eventStatus: defaultEventState,
}
}
// Close 关闭结束事件
func (e *Event) Close() {
close(e.waitResult)
}
type Driver struct {
maxQueueSize int
queue chan *Event