feat(event2): add event2

This commit is contained in:
bandl 2021-11-11 22:24:17 +08:00
parent 62d3105273
commit 8b8fd58c09
7 changed files with 324 additions and 4 deletions

5
pkg/errorx/event.go Normal file
View File

@ -0,0 +1,5 @@
package errorx
func EventRecoveryErr() error {
return New("this event has been recycled")
}

21
pkg/event2/consumer.go Normal file
View File

@ -0,0 +1,21 @@
package event2
import "context"
type Consumer struct {
driver DriverInterface
}
func (c *Consumer) Receive(ctx context.Context) *event {
return c.driver.Get()
}
func (c *Consumer) NewEvent(name string) *event {
return c.driver.NewEvent(name)
}
func NewConsumer(driver DriverInterface) ConsumerInterface {
return &Consumer{
driver: driver,
}
}

49
pkg/event2/define.go Normal file
View File

@ -0,0 +1,49 @@
package event2
import (
"context"
)
const (
initEventState = int32(iota) // 初始化状态
waitEventState // 等待状态
workEventState // 工作状态
closeEventState // 事件关闭状态
recoveryEventState // 事件回收状态
)
const (
awaitThread = 3
)
const (
WorkFuncEventKey = "workFunc"
)
// 线程安全
type EventWorkFunc func() (interface{}, error)
// 挂起事件, 线程不安全
type EventAwaitFunc func() (interface{}, error)
// 实际操作
type awaitFunc func() (*event, interface{}, error)
type DriverInterface interface {
Get() *event
Put(*event)
GetLength() int
NewEvent(string) *event
await(awaitFunc)
recovery(e *event)
}
type ProduceInterface interface {
Call(context.Context, *event)
NewEvent(string) *event
}
type ConsumerInterface interface {
Receive(ctx context.Context) *event
}

212
pkg/event2/driver.go Normal file
View File

@ -0,0 +1,212 @@
package event2
import (
"sync/atomic"
"time"
"gitee.com/wheat-os/wheatCache/pkg/errorx"
)
type event struct {
msgCtx map[string]interface{}
eventName string
msg map[string]string // 消息
waitResult chan interface{} // 等待返回
err error
eventStatus int32
ttlManage *time.Timer
parentDriver DriverInterface
}
func (e *event) reset() {
if e.ttlManage != nil {
e.ttlManage.Stop()
if len(e.ttlManage.C) > 0 {
<-e.ttlManage.C
}
}
e.err = nil
// 清空结果
if len(e.waitResult) != 0 {
<-e.waitResult
}
}
func (e *event) Recovery() {
e.parentDriver.recovery(e)
}
func (e *event) SetMsg(key string, val string) {
if e.msg == nil {
e.msg = make(map[string]string)
}
e.msg[key] = val
}
func (e *event) GetMsg(key string) string {
if e.msg == nil {
return ""
}
return e.msg[key]
}
func (e *event) GetEventName() string {
return e.eventName
}
// SetValue 写入 ctx 传递用参数
func (e *event) SetValue(key string, value interface{}) {
if e.msgCtx == nil {
e.msgCtx = make(map[string]interface{})
}
e.msgCtx[key] = value
}
func (e *event) GetValue(key string) (interface{}, bool) {
if e.msgCtx == nil {
return nil, false
}
val, ok := e.msgCtx[key]
return val, ok
}
func (e *event) InitWaitEvent() {
e.reset()
if e.waitResult == nil {
e.waitResult = make(chan interface{})
}
atomic.SwapInt32(&e.eventStatus, waitEventState)
}
// StartWaitEvent 开始一个等待任务
func (e *event) StartWaitEvent(ttl time.Duration) (interface{}, error) {
if e.ttlManage == nil {
e.ttlManage = time.NewTimer(ttl)
} else {
e.ttlManage.Reset(ttl)
}
for {
select {
case <-e.ttlManage.C:
if atomic.CompareAndSwapInt32(&e.eventStatus, waitEventState, closeEventState) {
return nil, errorx.TimeOutErr()
}
continue
case result := <-e.waitResult:
atomic.SwapInt32(&e.eventStatus, closeEventState)
return result, e.err
}
}
}
// 实际执行推送
func (e *event) execWorker(res interface{}, err error) {
switch work := res.(type) {
case EventAwaitFunc:
await := func() (*event, interface{}, error) {
result, err := work()
return e, result, err
}
e.parentDriver.await(await)
case EventWorkFunc:
e.InitWaitEvent()
e.SetValue(WorkFuncEventKey, work)
e.parentDriver.Put(e)
default:
e.err = err
e.waitResult <- res
}
}
func (e *event) ExecWorkAndSendResult(work EventWorkFunc) (interface{}, error) {
if !atomic.CompareAndSwapInt32(&e.eventStatus, waitEventState, workEventState) {
return nil, errorx.New("not wait status, exec err")
}
res, err := work()
e.execWorker(res, err)
return res, err
}
type driver struct {
waitQueue chan awaitFunc
eventQueue chan *event
levelQueue chan *event
// event 池的实现
poll chan *event
maxPoolSize int32
nowPoolSize int32
}
func NewDriver(maxSize int) DriverInterface {
d := &driver{
// pool
maxPoolSize: int32(maxSize),
nowPoolSize: 0,
poll: make(chan *event, maxSize),
// waitQueue 1/3 的挂起指标
waitQueue: make(chan awaitFunc, maxSize/3),
levelQueue: make(chan *event, maxSize/3),
eventQueue: make(chan *event, maxSize),
}
d.awaitWorker()
return d
}
func (d *driver) NewEvent(name string) *event {
issSize := atomic.LoadInt32(&d.nowPoolSize)
if issSize < d.maxPoolSize {
atomic.AddInt32(&d.nowPoolSize, 1)
return d.newEvent(name)
}
e := <-d.poll
e.eventName = name
return e
}
func (d *driver) newEvent(name string) *event {
status := initEventState
return &event{
eventStatus: status,
parentDriver: d,
eventName: name,
}
}
// 先尝试 level
func (d *driver) Get() *event {
if len(d.levelQueue) > 0 {
return <-d.levelQueue
}
return <-d.eventQueue
}
func (d *driver) Put(e *event) {
d.eventQueue <- e
}
func (d *driver) GetLength() int {
return len(d.eventQueue) + len(d.levelQueue)
}
func (d *driver) recovery(e *event) {
atomic.SwapInt32(&e.eventStatus, recoveryEventState)
e.reset()
d.poll <- e
}
// 挂起操作相关
func (d *driver) await(a awaitFunc) {
d.waitQueue <- a
}

21
pkg/event2/produce.go Normal file
View File

@ -0,0 +1,21 @@
package event2
import "context"
type Produce struct {
driver DriverInterface
}
func (p *Produce) NewEvent(name string) *event {
return p.driver.NewEvent(name)
}
func (p *Produce) Call(ctx context.Context, e *event) {
p.driver.Put(e)
}
func NewProduce(driver DriverInterface) ProduceInterface {
return &Produce{
driver: driver,
}
}

13
pkg/event2/worker.go Normal file
View File

@ -0,0 +1,13 @@
package event2
func (d *driver) awaitWorker() {
for i := 0; i < awaitThread; i++ {
go func() {
for {
awaitFunc := <-d.waitQueue
e, res, err := awaitFunc()
e.execWorker(res, err)
}
}()
}
}

View File

@ -14,7 +14,6 @@ const (
OptionEventName = "operateEvent"
CleanEventName = "clearEvent"
TtlEventName = "ttlEvent"
WorkFuncEventKey = "workFunc"
)
var (