From 6d26969cae30a5a009f24425e6b66cee799e70ef Mon Sep 17 00:00:00 2001 From: bandl <1658002533@qq.com> Date: Thu, 23 Sep 2021 11:05:51 +0800 Subject: [PATCH] feat(event): add event driver tools --- pkg/event/consumer.go | 17 ++++++ pkg/event/define.go | 25 +++++++++ pkg/event/driver.go | 121 ++++++++++++++++++++++++++++++++++++++++++ pkg/event/produce.go | 17 ++++++ 4 files changed, 180 insertions(+) create mode 100644 pkg/event/consumer.go create mode 100644 pkg/event/define.go create mode 100644 pkg/event/driver.go create mode 100644 pkg/event/produce.go diff --git a/pkg/event/consumer.go b/pkg/event/consumer.go new file mode 100644 index 0000000..995c123 --- /dev/null +++ b/pkg/event/consumer.go @@ -0,0 +1,17 @@ +package event + +import "context" + +type Consumer struct { + driver DriverInterface +} + +func (c *Consumer) Receive(ctx context.Context) *Event { + return c.driver.Get() +} + +func NewConsumer(driver DriverInterface) ConsumerInterface { + return &Consumer{ + driver: driver, + } +} diff --git a/pkg/event/define.go b/pkg/event/define.go new file mode 100644 index 0000000..2fad03e --- /dev/null +++ b/pkg/event/define.go @@ -0,0 +1,25 @@ +package event + +import ( + "context" +) + +type eventType int8 + +const ( + normalEvent = eventType(iota) + waitEvent +) + +type DriverInterface interface { + Get() *Event + Put(event *Event) +} + +type ProduceInterface interface { + Call(ctx context.Context, event *Event) +} + +type ConsumerInterface interface { + Receive(ctx context.Context) *Event +} diff --git a/pkg/event/driver.go b/pkg/event/driver.go new file mode 100644 index 0000000..114f2a8 --- /dev/null +++ b/pkg/event/driver.go @@ -0,0 +1,121 @@ +package event + +import ( + "gitee.com/timedb/wheatCache/pkg/errorx" + "sync" + "time" +) + +type Active func() ([]string, error) // 事件带函数 + +type Event struct { + msgCtx map[string]interface{} + eventName string + WorkTime time.Duration // 工作时间 + msg map[string]string // 消息 + waitResult chan interface{} // 等待返回 + ru sync.RWMutex + eventOnType eventType +} + +func (e *Event) SetMsg(key string, val string) { + e.ru.Lock() + defer e.ru.Unlock() + if e.msg == nil { + e.msg = make(map[string]string) + } + e.msg[key] = val +} + +func (e *Event) GetMsg(key string) string { + e.ru.RLock() + defer e.ru.RUnlock() + return e.msg[key] +} + +// SetCtxValue 写入 ctx 传递用参数 +func (e *Event) SetCtxValue(key string, value interface{}) { + e.ru.Lock() + defer e.ru.Unlock() + if e.msgCtx == nil { + e.msgCtx = make(map[string]interface{}) + } + e.msgCtx[key] = value +} + +func (e *Event) GetCtxValue(key string) (interface{}, bool) { + e.ru.RLock() + defer e.ru.RUnlock() + val, ok := e.msgCtx[key] + return val, ok +} + +func (e *Event) SetWaitResult(val interface{}) (err error) { + defer func() { + if errChan := recover(); errChan != nil { + err = errorx.New("channel err:%v", errChan) + } + }() + e.waitResult <- val + return nil +} + +// UpdateWaitEvent 升级到 wait Event +func (e *Event) UpdateWaitEvent(ttl time.Duration) (<-chan interface{}, error) { + if e.eventOnType == waitEvent { + return nil, errorx.New("the upgrade cannot be repeated") + } + + ttl = 1 * time.Second + if ttl > 0 { + e.WorkTime = ttl + } + e.waitResult = make(chan interface{}) + e.eventOnType = waitEvent + + return e.waitResult, nil +} + +// GetTtlTimer 只对 wait Event 有效 +func (e *Event) GetTtlTimer() (*time.Timer, error) { + if e.eventOnType != waitEvent { + return nil, errorx.New("cannot be called in normalEvent") + } + + timer := time.NewTimer(e.WorkTime) + return timer, nil +} + +func NewEvent(eventName string) *Event { + return &Event{ + eventName: eventName, + eventOnType: normalEvent, + } +} + +// Close 关闭结束事件 +func (e *Event) Close() { + close(e.waitResult) +} + +type Driver struct { + maxQueueSize int + queue chan *Event +} + +// Get 获取驱动 +func (d *Driver) Get() *Event { + return <-d.queue +} + +func (d *Driver) Put(event *Event) { + d.queue <- event +} + +// NewDriver 新建 Driver +func NewDriver(maxSize int) DriverInterface { + return &Driver{ + maxQueueSize: maxSize, + queue: make(chan *Event, maxSize), + } +} diff --git a/pkg/event/produce.go b/pkg/event/produce.go new file mode 100644 index 0000000..e096188 --- /dev/null +++ b/pkg/event/produce.go @@ -0,0 +1,17 @@ +package event + +import "context" + +type Produce struct { + driver DriverInterface +} + +func (p *Produce) Call(ctx context.Context, event *Event) { + p.driver.Put(event) +} + +func NewProduce(driver DriverInterface) ProduceInterface { + return &Produce{ + driver: driver, + } +}