feat(event): add event driver tools

This commit is contained in:
bandl 2021-09-23 11:05:51 +08:00
parent 36d18c1368
commit 6d26969cae
4 changed files with 180 additions and 0 deletions

17
pkg/event/consumer.go Normal file
View File

@ -0,0 +1,17 @@
package event
import "context"
type Consumer struct {
driver DriverInterface
}
func (c *Consumer) Receive(ctx context.Context) *Event {
return c.driver.Get()
}
func NewConsumer(driver DriverInterface) ConsumerInterface {
return &Consumer{
driver: driver,
}
}

25
pkg/event/define.go Normal file
View File

@ -0,0 +1,25 @@
package event
import (
"context"
)
type eventType int8
const (
normalEvent = eventType(iota)
waitEvent
)
type DriverInterface interface {
Get() *Event
Put(event *Event)
}
type ProduceInterface interface {
Call(ctx context.Context, event *Event)
}
type ConsumerInterface interface {
Receive(ctx context.Context) *Event
}

121
pkg/event/driver.go Normal file
View File

@ -0,0 +1,121 @@
package event
import (
"gitee.com/timedb/wheatCache/pkg/errorx"
"sync"
"time"
)
type Active func() ([]string, error) // 事件带函数
type Event struct {
msgCtx map[string]interface{}
eventName string
WorkTime time.Duration // 工作时间
msg map[string]string // 消息
waitResult chan interface{} // 等待返回
ru sync.RWMutex
eventOnType eventType
}
func (e *Event) SetMsg(key string, val string) {
e.ru.Lock()
defer e.ru.Unlock()
if e.msg == nil {
e.msg = make(map[string]string)
}
e.msg[key] = val
}
func (e *Event) GetMsg(key string) string {
e.ru.RLock()
defer e.ru.RUnlock()
return e.msg[key]
}
// SetCtxValue 写入 ctx 传递用参数
func (e *Event) SetCtxValue(key string, value interface{}) {
e.ru.Lock()
defer e.ru.Unlock()
if e.msgCtx == nil {
e.msgCtx = make(map[string]interface{})
}
e.msgCtx[key] = value
}
func (e *Event) GetCtxValue(key string) (interface{}, bool) {
e.ru.RLock()
defer e.ru.RUnlock()
val, ok := e.msgCtx[key]
return val, ok
}
func (e *Event) SetWaitResult(val interface{}) (err error) {
defer func() {
if errChan := recover(); errChan != nil {
err = errorx.New("channel err:%v", errChan)
}
}()
e.waitResult <- val
return nil
}
// UpdateWaitEvent 升级到 wait Event
func (e *Event) UpdateWaitEvent(ttl time.Duration) (<-chan interface{}, error) {
if e.eventOnType == waitEvent {
return nil, errorx.New("the upgrade cannot be repeated")
}
ttl = 1 * time.Second
if ttl > 0 {
e.WorkTime = ttl
}
e.waitResult = make(chan interface{})
e.eventOnType = waitEvent
return e.waitResult, nil
}
// GetTtlTimer 只对 wait Event 有效
func (e *Event) GetTtlTimer() (*time.Timer, error) {
if e.eventOnType != waitEvent {
return nil, errorx.New("cannot be called in normalEvent")
}
timer := time.NewTimer(e.WorkTime)
return timer, nil
}
func NewEvent(eventName string) *Event {
return &Event{
eventName: eventName,
eventOnType: normalEvent,
}
}
// Close 关闭结束事件
func (e *Event) Close() {
close(e.waitResult)
}
type Driver struct {
maxQueueSize int
queue chan *Event
}
// Get 获取驱动
func (d *Driver) Get() *Event {
return <-d.queue
}
func (d *Driver) Put(event *Event) {
d.queue <- event
}
// NewDriver 新建 Driver
func NewDriver(maxSize int) DriverInterface {
return &Driver{
maxQueueSize: maxSize,
queue: make(chan *Event, maxSize),
}
}

17
pkg/event/produce.go Normal file
View File

@ -0,0 +1,17 @@
package event
import "context"
type Produce struct {
driver DriverInterface
}
func (p *Produce) Call(ctx context.Context, event *Event) {
p.driver.Put(event)
}
func NewProduce(driver DriverInterface) ProduceInterface {
return &Produce{
driver: driver,
}
}