diff --git a/pkg/errorx/event.go b/pkg/errorx/event.go new file mode 100644 index 0000000..f12980d --- /dev/null +++ b/pkg/errorx/event.go @@ -0,0 +1,5 @@ +package errorx + +func EventRecoveryErr() error { + return New("this event has been recycled") +} diff --git a/pkg/event2/consumer.go b/pkg/event2/consumer.go new file mode 100644 index 0000000..61cdf63 --- /dev/null +++ b/pkg/event2/consumer.go @@ -0,0 +1,21 @@ +package event2 + +import "context" + +type Consumer struct { + driver DriverInterface +} + +func (c *Consumer) Receive(ctx context.Context) *event { + return c.driver.Get() +} + +func (c *Consumer) NewEvent(name string) *event { + return c.driver.NewEvent(name) +} + +func NewConsumer(driver DriverInterface) ConsumerInterface { + return &Consumer{ + driver: driver, + } +} diff --git a/pkg/event2/define.go b/pkg/event2/define.go new file mode 100644 index 0000000..86714ab --- /dev/null +++ b/pkg/event2/define.go @@ -0,0 +1,49 @@ +package event2 + +import ( + "context" +) + +const ( + initEventState = int32(iota) // 初始化状态 + waitEventState // 等待状态 + workEventState // 工作状态 + closeEventState // 事件关闭状态 + recoveryEventState // 事件回收状态 +) + +const ( + awaitThread = 3 +) + +const ( + WorkFuncEventKey = "workFunc" +) + +// 线程安全 +type EventWorkFunc func() (interface{}, error) + +// 挂起事件, 线程不安全 +type EventAwaitFunc func() (interface{}, error) + +// 实际操作 +type awaitFunc func() (*event, interface{}, error) + +type DriverInterface interface { + Get() *event + Put(*event) + GetLength() int + NewEvent(string) *event + + await(awaitFunc) + recovery(e *event) +} + +type ProduceInterface interface { + Call(context.Context, *event) + NewEvent(string) *event +} + +type ConsumerInterface interface { + Receive(ctx context.Context) *event +} diff --git a/pkg/event2/driver.go b/pkg/event2/driver.go new file mode 100644 index 0000000..49d57fc --- /dev/null +++ b/pkg/event2/driver.go @@ -0,0 +1,212 @@ +package event2 + +import ( + "sync/atomic" + "time" + + "gitee.com/wheat-os/wheatCache/pkg/errorx" +) + +type event struct { + msgCtx map[string]interface{} + eventName string + msg map[string]string // 消息 + waitResult chan interface{} // 等待返回 + err error + eventStatus int32 + ttlManage *time.Timer + parentDriver DriverInterface +} + +func (e *event) reset() { + if e.ttlManage != nil { + e.ttlManage.Stop() + + if len(e.ttlManage.C) > 0 { + <-e.ttlManage.C + } + } + + e.err = nil + + // 清空结果 + if len(e.waitResult) != 0 { + <-e.waitResult + } +} + +func (e *event) Recovery() { + e.parentDriver.recovery(e) +} + +func (e *event) SetMsg(key string, val string) { + if e.msg == nil { + e.msg = make(map[string]string) + } + e.msg[key] = val +} + +func (e *event) GetMsg(key string) string { + if e.msg == nil { + return "" + } + return e.msg[key] +} + +func (e *event) GetEventName() string { + return e.eventName +} + +// SetValue 写入 ctx 传递用参数 +func (e *event) SetValue(key string, value interface{}) { + if e.msgCtx == nil { + e.msgCtx = make(map[string]interface{}) + } + e.msgCtx[key] = value +} + +func (e *event) GetValue(key string) (interface{}, bool) { + if e.msgCtx == nil { + return nil, false + } + val, ok := e.msgCtx[key] + return val, ok +} + +func (e *event) InitWaitEvent() { + e.reset() + if e.waitResult == nil { + e.waitResult = make(chan interface{}) + } + + atomic.SwapInt32(&e.eventStatus, waitEventState) +} + +// StartWaitEvent 开始一个等待任务 +func (e *event) StartWaitEvent(ttl time.Duration) (interface{}, error) { + if e.ttlManage == nil { + e.ttlManage = time.NewTimer(ttl) + } else { + e.ttlManage.Reset(ttl) + } + + for { + select { + case <-e.ttlManage.C: + if atomic.CompareAndSwapInt32(&e.eventStatus, waitEventState, closeEventState) { + return nil, errorx.TimeOutErr() + } + continue + + case result := <-e.waitResult: + atomic.SwapInt32(&e.eventStatus, closeEventState) + return result, e.err + } + } +} + +// 实际执行推送 +func (e *event) execWorker(res interface{}, err error) { + switch work := res.(type) { + case EventAwaitFunc: + await := func() (*event, interface{}, error) { + result, err := work() + return e, result, err + } + e.parentDriver.await(await) + + case EventWorkFunc: + e.InitWaitEvent() + e.SetValue(WorkFuncEventKey, work) + e.parentDriver.Put(e) + + default: + e.err = err + e.waitResult <- res + } +} + +func (e *event) ExecWorkAndSendResult(work EventWorkFunc) (interface{}, error) { + if !atomic.CompareAndSwapInt32(&e.eventStatus, waitEventState, workEventState) { + return nil, errorx.New("not wait status, exec err") + } + + res, err := work() + e.execWorker(res, err) + return res, err +} + +type driver struct { + waitQueue chan awaitFunc + eventQueue chan *event + levelQueue chan *event + + // event 池的实现 + poll chan *event + maxPoolSize int32 + nowPoolSize int32 +} + +func NewDriver(maxSize int) DriverInterface { + d := &driver{ + // pool + maxPoolSize: int32(maxSize), + nowPoolSize: 0, + poll: make(chan *event, maxSize), + + // waitQueue 1/3 的挂起指标 + waitQueue: make(chan awaitFunc, maxSize/3), + levelQueue: make(chan *event, maxSize/3), + eventQueue: make(chan *event, maxSize), + } + d.awaitWorker() + return d +} + +func (d *driver) NewEvent(name string) *event { + + issSize := atomic.LoadInt32(&d.nowPoolSize) + if issSize < d.maxPoolSize { + atomic.AddInt32(&d.nowPoolSize, 1) + return d.newEvent(name) + } + e := <-d.poll + e.eventName = name + return e +} + +func (d *driver) newEvent(name string) *event { + status := initEventState + return &event{ + eventStatus: status, + parentDriver: d, + eventName: name, + } +} + +// 先尝试 level +func (d *driver) Get() *event { + if len(d.levelQueue) > 0 { + return <-d.levelQueue + } + return <-d.eventQueue +} + +func (d *driver) Put(e *event) { + d.eventQueue <- e +} + +func (d *driver) GetLength() int { + return len(d.eventQueue) + len(d.levelQueue) +} + +func (d *driver) recovery(e *event) { + atomic.SwapInt32(&e.eventStatus, recoveryEventState) + e.reset() + d.poll <- e +} + +// 挂起操作相关 +func (d *driver) await(a awaitFunc) { + d.waitQueue <- a +} diff --git a/pkg/event2/produce.go b/pkg/event2/produce.go new file mode 100644 index 0000000..221533f --- /dev/null +++ b/pkg/event2/produce.go @@ -0,0 +1,21 @@ +package event2 + +import "context" + +type Produce struct { + driver DriverInterface +} + +func (p *Produce) NewEvent(name string) *event { + return p.driver.NewEvent(name) +} + +func (p *Produce) Call(ctx context.Context, e *event) { + p.driver.Put(e) +} + +func NewProduce(driver DriverInterface) ProduceInterface { + return &Produce{ + driver: driver, + } +} diff --git a/pkg/event2/worker.go b/pkg/event2/worker.go new file mode 100644 index 0000000..595a97d --- /dev/null +++ b/pkg/event2/worker.go @@ -0,0 +1,13 @@ +package event2 + +func (d *driver) awaitWorker() { + for i := 0; i < awaitThread; i++ { + go func() { + for { + awaitFunc := <-d.waitQueue + e, res, err := awaitFunc() + e.execWorker(res, err) + } + }() + } +} diff --git a/pkg/lru/define.go b/pkg/lru/define.go index d5d3dd3..f4c1f48 100644 --- a/pkg/lru/define.go +++ b/pkg/lru/define.go @@ -11,10 +11,9 @@ import ( type SingleWorkFunc func() interface{} const ( - OptionEventName = "operateEvent" - CleanEventName = "clearEvent" - TtlEventName = "ttlEvent" - WorkFuncEventKey = "workFunc" + OptionEventName = "operateEvent" + CleanEventName = "clearEvent" + TtlEventName = "ttlEvent" ) var (