forked from p93542168/wheat-cache
122 lines
2.4 KiB
Go
122 lines
2.4 KiB
Go
|
package event
|
||
|
|
||
|
import (
|
||
|
"gitee.com/timedb/wheatCache/pkg/errorx"
|
||
|
"sync"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
type Active func() ([]string, error) // 事件带函数
|
||
|
|
||
|
type Event struct {
|
||
|
msgCtx map[string]interface{}
|
||
|
eventName string
|
||
|
WorkTime time.Duration // 工作时间
|
||
|
msg map[string]string // 消息
|
||
|
waitResult chan interface{} // 等待返回
|
||
|
ru sync.RWMutex
|
||
|
eventOnType eventType
|
||
|
}
|
||
|
|
||
|
func (e *Event) SetMsg(key string, val string) {
|
||
|
e.ru.Lock()
|
||
|
defer e.ru.Unlock()
|
||
|
if e.msg == nil {
|
||
|
e.msg = make(map[string]string)
|
||
|
}
|
||
|
e.msg[key] = val
|
||
|
}
|
||
|
|
||
|
func (e *Event) GetMsg(key string) string {
|
||
|
e.ru.RLock()
|
||
|
defer e.ru.RUnlock()
|
||
|
return e.msg[key]
|
||
|
}
|
||
|
|
||
|
// SetCtxValue 写入 ctx 传递用参数
|
||
|
func (e *Event) SetCtxValue(key string, value interface{}) {
|
||
|
e.ru.Lock()
|
||
|
defer e.ru.Unlock()
|
||
|
if e.msgCtx == nil {
|
||
|
e.msgCtx = make(map[string]interface{})
|
||
|
}
|
||
|
e.msgCtx[key] = value
|
||
|
}
|
||
|
|
||
|
func (e *Event) GetCtxValue(key string) (interface{}, bool) {
|
||
|
e.ru.RLock()
|
||
|
defer e.ru.RUnlock()
|
||
|
val, ok := e.msgCtx[key]
|
||
|
return val, ok
|
||
|
}
|
||
|
|
||
|
func (e *Event) SetWaitResult(val interface{}) (err error) {
|
||
|
defer func() {
|
||
|
if errChan := recover(); errChan != nil {
|
||
|
err = errorx.New("channel err:%v", errChan)
|
||
|
}
|
||
|
}()
|
||
|
e.waitResult <- val
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// UpdateWaitEvent 升级到 wait Event
|
||
|
func (e *Event) UpdateWaitEvent(ttl time.Duration) (<-chan interface{}, error) {
|
||
|
if e.eventOnType == waitEvent {
|
||
|
return nil, errorx.New("the upgrade cannot be repeated")
|
||
|
}
|
||
|
|
||
|
ttl = 1 * time.Second
|
||
|
if ttl > 0 {
|
||
|
e.WorkTime = ttl
|
||
|
}
|
||
|
e.waitResult = make(chan interface{})
|
||
|
e.eventOnType = waitEvent
|
||
|
|
||
|
return e.waitResult, nil
|
||
|
}
|
||
|
|
||
|
// GetTtlTimer 只对 wait Event 有效
|
||
|
func (e *Event) GetTtlTimer() (*time.Timer, error) {
|
||
|
if e.eventOnType != waitEvent {
|
||
|
return nil, errorx.New("cannot be called in normalEvent")
|
||
|
}
|
||
|
|
||
|
timer := time.NewTimer(e.WorkTime)
|
||
|
return timer, nil
|
||
|
}
|
||
|
|
||
|
func NewEvent(eventName string) *Event {
|
||
|
return &Event{
|
||
|
eventName: eventName,
|
||
|
eventOnType: normalEvent,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Close 关闭结束事件
|
||
|
func (e *Event) Close() {
|
||
|
close(e.waitResult)
|
||
|
}
|
||
|
|
||
|
type Driver struct {
|
||
|
maxQueueSize int
|
||
|
queue chan *Event
|
||
|
}
|
||
|
|
||
|
// Get 获取驱动
|
||
|
func (d *Driver) Get() *Event {
|
||
|
return <-d.queue
|
||
|
}
|
||
|
|
||
|
func (d *Driver) Put(event *Event) {
|
||
|
d.queue <- event
|
||
|
}
|
||
|
|
||
|
// NewDriver 新建 Driver
|
||
|
func NewDriver(maxSize int) DriverInterface {
|
||
|
return &Driver{
|
||
|
maxQueueSize: maxSize,
|
||
|
queue: make(chan *Event, maxSize),
|
||
|
}
|
||
|
}
|