feat(event): add event poll

This commit is contained in:
bandl 2021-10-26 14:39:34 +08:00
parent bff937700e
commit ac4fdd7309
2 changed files with 62 additions and 60 deletions

View File

@ -4,13 +4,11 @@ import (
"context"
)
type eventType int8
const (
defaultEventState = eventType(iota) //默认情况下的状态
waitEventState // 等待状态
workEventState //工作状态
closeEventState //事件关闭状态
defaultEventState = int32(iota) //默认情况下的状态
waitEventState // 等待状态
workEventState //工作状态
closeEventState //事件关闭状态
)
type EventWorkFunc func() (interface{}, error)
@ -31,6 +29,5 @@ type ProduceInterface interface {
type ConsumerInterface interface {
Receive(ctx context.Context) *event
NewEvent(string) *event
Recovery(*event)
}

View File

@ -1,7 +1,7 @@
package event
import (
"sync"
"sync/atomic"
"time"
"gitee.com/timedb/wheatCache/pkg/errorx"
@ -9,10 +9,18 @@ import (
// 事件 poll 降低 new 对象的频率
type eventPoll struct {
poll chan *event
poll chan *event
maxSize int32
nowSize *int32
}
func (e *eventPoll) getEvent() *event {
issSize := atomic.LoadInt32(e.nowSize)
if issSize < e.maxSize {
atomic.AddInt32(e.nowSize, 1)
return newEvent()
}
return <-e.poll
}
@ -23,29 +31,40 @@ func (e *eventPoll) recovery(rEvent *event) {
func newEventPoll(maxSize int) *eventPoll {
return &eventPoll{
poll: make(chan *event, maxSize),
poll: make(chan *event, maxSize),
maxSize: int32(maxSize),
nowSize: new(int32),
}
}
type event struct {
msgCtx map[string]interface{}
eventName string
WorkTime time.Duration // 工作时间
msg map[string]string // 消息
waitResult chan interface{} // 等待返回
err error
ru sync.RWMutex
muClose sync.Mutex //关闭锁
eventStatus eventType
eventStatus *int32
ttlManage *time.Timer
}
func newEvent() *event {
status := defaultEventState
return &event{
eventStatus: &status,
}
}
func (e *event) Reset() {
if e.ttlManage != nil {
e.ttlManage.Stop()
}
e.err = nil
atomic.SwapInt32(e.eventStatus, defaultEventState)
}
func (e *event) SetMsg(key string, val string) {
e.ru.Lock()
defer e.ru.Unlock()
if e.msg == nil {
e.msg = make(map[string]string)
}
@ -53,8 +72,6 @@ func (e *event) SetMsg(key string, val string) {
}
func (e *event) GetMsg(key string) string {
e.ru.RLock()
defer e.ru.RUnlock()
return e.msg[key]
}
@ -64,8 +81,6 @@ func (e *event) GetEventName() string {
// SetValue 写入 ctx 传递用参数
func (e *event) SetValue(key string, value interface{}) {
e.ru.Lock()
defer e.ru.Unlock()
if e.msgCtx == nil {
e.msgCtx = make(map[string]interface{})
}
@ -73,76 +88,65 @@ func (e *event) SetValue(key string, value interface{}) {
}
func (e *event) GetValue(key string) (interface{}, bool) {
e.ru.RLock()
defer e.ru.RUnlock()
val, ok := e.msgCtx[key]
return val, ok
}
// InitWaitEvent 初始化 wait event 必须调用才拥有等待特性
func (e *event) InitWaitEvent() {
e.muClose.Lock()
defer e.muClose.Unlock()
e.waitResult = make(chan interface{})
e.eventStatus = waitEventState
if e.waitResult == nil || len(e.waitResult) > 0 {
e.waitResult = make(chan interface{})
}
// 清理残留
if e.ttlManage == nil {
e.ttlManage = time.NewTimer(0)
}
e.ttlManage.Stop()
if len(e.ttlManage.C) > 0 {
<-e.ttlManage.C
}
atomic.CompareAndSwapInt32(e.eventStatus, defaultEventState, waitEventState)
}
// StartWaitEvent 开始一个等待任务
func (e *event) StartWaitEvent(ttl time.Duration) (interface{}, error) {
t := time.NewTimer(ttl)
select {
case <-t.C:
e.muClose.Lock()
defer e.muClose.Unlock()
if e.eventStatus == workEventState {
return <-e.waitResult, e.err
e.ttlManage.Reset(ttl)
for {
select {
case <-e.ttlManage.C:
if atomic.CompareAndSwapInt32(e.eventStatus, waitEventState, closeEventState) {
return nil, errorx.TimeOutErr()
}
continue
case result := <-e.waitResult:
atomic.CompareAndSwapInt32(e.eventStatus, workEventState, closeEventState)
return result, e.err
}
e.eventStatus = closeEventState
return nil, errorx.TimeOutErr()
case result := <-e.waitResult:
return result, e.err
}
}
func (e *event) ExecWorkAndSendResult(work EventWorkFunc) (interface{}, error) {
e.muClose.Lock()
defer e.muClose.Unlock()
if e.eventStatus != waitEventState {
if !atomic.CompareAndSwapInt32(e.eventStatus, waitEventState, workEventState) {
return nil, errorx.New("not wait status, exec err")
}
e.eventStatus = workEventState
res, err := work()
e.err = err
e.waitResult <- res
close(e.waitResult)
e.eventStatus = closeEventState
return res, err
}
func (e *event) SetResultErr(err error) {
e.muClose.Lock()
defer e.muClose.Unlock()
if e.eventStatus != waitEventState {
if !atomic.CompareAndSwapInt32(e.eventStatus, waitEventState, workEventState) {
return
}
e.eventStatus = workEventState
e.err = err
e.waitResult <- nil
close(e.waitResult)
e.eventStatus = closeEventState
}
func NewEvent(eventName string) *event {
return &event{
eventName: eventName,
eventStatus: defaultEventState,
}
}
type Driver struct {
@ -170,6 +174,7 @@ func (d *Driver) NewEvent(name string) *event {
return event
}
// 任何时候回收事件都应该由 最后使用者回收
func (d *Driver) Recovery(e *event) {
d.poll.recovery(e)
}