From ac4fdd7309419d35c0efb9e6abb74f5c97d2f007 Mon Sep 17 00:00:00 2001 From: bandl <1658002533@qq.com> Date: Tue, 26 Oct 2021 14:39:34 +0800 Subject: [PATCH] feat(event): add event poll --- pkg/event/define.go | 11 ++--- pkg/event/driver.go | 111 +++++++++++++++++++++++--------------------- 2 files changed, 62 insertions(+), 60 deletions(-) diff --git a/pkg/event/define.go b/pkg/event/define.go index 53153bc..33faca5 100644 --- a/pkg/event/define.go +++ b/pkg/event/define.go @@ -4,13 +4,11 @@ import ( "context" ) -type eventType int8 - const ( - defaultEventState = eventType(iota) //默认情况下的状态 - waitEventState // 等待状态 - workEventState //工作状态 - closeEventState //事件关闭状态 + defaultEventState = int32(iota) //默认情况下的状态 + waitEventState // 等待状态 + workEventState //工作状态 + closeEventState //事件关闭状态 ) type EventWorkFunc func() (interface{}, error) @@ -31,6 +29,5 @@ type ProduceInterface interface { type ConsumerInterface interface { Receive(ctx context.Context) *event - NewEvent(string) *event Recovery(*event) } diff --git a/pkg/event/driver.go b/pkg/event/driver.go index 1da81da..51244a4 100644 --- a/pkg/event/driver.go +++ b/pkg/event/driver.go @@ -1,7 +1,7 @@ package event import ( - "sync" + "sync/atomic" "time" "gitee.com/timedb/wheatCache/pkg/errorx" @@ -9,10 +9,18 @@ import ( // 事件 poll 降低 new 对象的频率 type eventPoll struct { - poll chan *event + poll chan *event + maxSize int32 + nowSize *int32 } func (e *eventPoll) getEvent() *event { + issSize := atomic.LoadInt32(e.nowSize) + if issSize < e.maxSize { + atomic.AddInt32(e.nowSize, 1) + return newEvent() + } + return <-e.poll } @@ -23,29 +31,40 @@ func (e *eventPoll) recovery(rEvent *event) { func newEventPoll(maxSize int) *eventPoll { return &eventPoll{ - poll: make(chan *event, maxSize), + poll: make(chan *event, maxSize), + maxSize: int32(maxSize), + nowSize: new(int32), } } type event struct { msgCtx map[string]interface{} eventName string - WorkTime time.Duration // 工作时间 msg map[string]string // 消息 waitResult chan interface{} // 等待返回 err error - ru sync.RWMutex - muClose sync.Mutex //关闭锁 - eventStatus eventType + eventStatus *int32 + ttlManage *time.Timer +} + +func newEvent() *event { + status := defaultEventState + return &event{ + eventStatus: &status, + } } func (e *event) Reset() { + if e.ttlManage != nil { + e.ttlManage.Stop() + } + e.err = nil + + atomic.SwapInt32(e.eventStatus, defaultEventState) } func (e *event) SetMsg(key string, val string) { - e.ru.Lock() - defer e.ru.Unlock() if e.msg == nil { e.msg = make(map[string]string) } @@ -53,8 +72,6 @@ func (e *event) SetMsg(key string, val string) { } func (e *event) GetMsg(key string) string { - e.ru.RLock() - defer e.ru.RUnlock() return e.msg[key] } @@ -64,8 +81,6 @@ func (e *event) GetEventName() string { // SetValue 写入 ctx 传递用参数 func (e *event) SetValue(key string, value interface{}) { - e.ru.Lock() - defer e.ru.Unlock() if e.msgCtx == nil { e.msgCtx = make(map[string]interface{}) } @@ -73,76 +88,65 @@ func (e *event) SetValue(key string, value interface{}) { } func (e *event) GetValue(key string) (interface{}, bool) { - e.ru.RLock() - defer e.ru.RUnlock() val, ok := e.msgCtx[key] return val, ok } // InitWaitEvent 初始化 wait event 必须调用才拥有等待特性 func (e *event) InitWaitEvent() { - e.muClose.Lock() - defer e.muClose.Unlock() - e.waitResult = make(chan interface{}) - e.eventStatus = waitEventState + if e.waitResult == nil || len(e.waitResult) > 0 { + e.waitResult = make(chan interface{}) + } + + // 清理残留 + if e.ttlManage == nil { + e.ttlManage = time.NewTimer(0) + } + e.ttlManage.Stop() + if len(e.ttlManage.C) > 0 { + <-e.ttlManage.C + } + + atomic.CompareAndSwapInt32(e.eventStatus, defaultEventState, waitEventState) } // StartWaitEvent 开始一个等待任务 func (e *event) StartWaitEvent(ttl time.Duration) (interface{}, error) { - t := time.NewTimer(ttl) - select { - case <-t.C: - e.muClose.Lock() - defer e.muClose.Unlock() - if e.eventStatus == workEventState { - return <-e.waitResult, e.err + e.ttlManage.Reset(ttl) + + for { + select { + case <-e.ttlManage.C: + if atomic.CompareAndSwapInt32(e.eventStatus, waitEventState, closeEventState) { + return nil, errorx.TimeOutErr() + } + continue + + case result := <-e.waitResult: + atomic.CompareAndSwapInt32(e.eventStatus, workEventState, closeEventState) + return result, e.err } - - e.eventStatus = closeEventState - return nil, errorx.TimeOutErr() - - case result := <-e.waitResult: - return result, e.err } } func (e *event) ExecWorkAndSendResult(work EventWorkFunc) (interface{}, error) { - e.muClose.Lock() - defer e.muClose.Unlock() - if e.eventStatus != waitEventState { + if !atomic.CompareAndSwapInt32(e.eventStatus, waitEventState, workEventState) { return nil, errorx.New("not wait status, exec err") } - e.eventStatus = workEventState - res, err := work() e.err = err e.waitResult <- res - - close(e.waitResult) - e.eventStatus = closeEventState return res, err } func (e *event) SetResultErr(err error) { - e.muClose.Lock() - defer e.muClose.Unlock() - if e.eventStatus != waitEventState { + if !atomic.CompareAndSwapInt32(e.eventStatus, waitEventState, workEventState) { return } - e.eventStatus = workEventState e.err = err e.waitResult <- nil - close(e.waitResult) - e.eventStatus = closeEventState -} - -func NewEvent(eventName string) *event { - return &event{ - eventName: eventName, - eventStatus: defaultEventState, - } } type Driver struct { @@ -170,6 +174,7 @@ func (d *Driver) NewEvent(name string) *event { return event } +// 任何时候回收事件都应该由 最后使用者回收 func (d *Driver) Recovery(e *event) { d.poll.recovery(e) }