wheat-cache/pkg/event/driver.go

151 lines
2.9 KiB
Go
Raw Normal View History

2021-09-23 11:05:51 +08:00
package event
import (
"sync"
"time"
2021-09-28 20:55:09 +08:00
"gitee.com/timedb/wheatCache/pkg/errorx"
2021-09-23 11:05:51 +08:00
)
type Active func() ([]string, error) // 事件带函数
type Event struct {
msgCtx map[string]interface{}
eventName string
WorkTime time.Duration // 工作时间
msg map[string]string // 消息
waitResult chan interface{} // 等待返回
2021-09-28 20:55:09 +08:00
err error
2021-09-23 11:05:51 +08:00
ru sync.RWMutex
2021-09-28 20:55:09 +08:00
muClose sync.Mutex //关闭锁
eventStatus eventType
2021-09-23 11:05:51 +08:00
}
func (e *Event) SetMsg(key string, val string) {
e.ru.Lock()
defer e.ru.Unlock()
if e.msg == nil {
e.msg = make(map[string]string)
}
e.msg[key] = val
}
func (e *Event) GetMsg(key string) string {
e.ru.RLock()
defer e.ru.RUnlock()
return e.msg[key]
}
2021-09-28 20:55:09 +08:00
func (e *Event) GetEventName() string {
return e.eventName
}
// SetValue 写入 ctx 传递用参数
func (e *Event) SetValue(key string, value interface{}) {
2021-09-23 11:05:51 +08:00
e.ru.Lock()
defer e.ru.Unlock()
if e.msgCtx == nil {
e.msgCtx = make(map[string]interface{})
}
e.msgCtx[key] = value
}
2021-09-28 20:55:09 +08:00
func (e *Event) GetValue(key string) (interface{}, bool) {
2021-09-23 11:05:51 +08:00
e.ru.RLock()
defer e.ru.RUnlock()
val, ok := e.msgCtx[key]
return val, ok
}
2021-09-28 20:55:09 +08:00
// InitWaitEvent 初始化 wait event 必须调用才拥有等待特性
func (e *Event) InitWaitEvent() {
e.muClose.Lock()
defer e.muClose.Unlock()
2021-09-23 11:05:51 +08:00
e.waitResult = make(chan interface{})
2021-09-28 20:55:09 +08:00
e.eventStatus = waitEventState
}
2021-09-23 11:05:51 +08:00
2021-09-28 20:55:09 +08:00
// StartWaitEvent 开始一个等待任务
func (e *Event) StartWaitEvent(ttl time.Duration) (interface{}, error) {
t := time.NewTimer(ttl)
select {
case <-t.C:
e.muClose.Lock()
defer e.muClose.Unlock()
if e.eventStatus == workEventState {
return <-e.waitResult, e.err
}
e.eventStatus = closeEventState
return nil, errorx.TimeOutErr()
case result := <-e.waitResult:
return result, e.err
}
2021-09-23 11:05:51 +08:00
}
2021-09-28 20:55:09 +08:00
func (e *Event) ExecWorkAndSendResult(work EventWorkFunc) (interface{}, error) {
e.muClose.Lock()
defer e.muClose.Unlock()
if e.eventStatus != waitEventState {
return nil, errorx.New("not wait status, exec err")
2021-09-23 11:05:51 +08:00
}
2021-09-28 20:55:09 +08:00
e.eventStatus = workEventState
res, err := work()
e.err = err
2021-10-07 16:30:23 +08:00
e.waitResult <- res
2021-09-28 20:55:09 +08:00
close(e.waitResult)
e.eventStatus = closeEventState
return res, err
2021-09-23 11:05:51 +08:00
}
2021-10-07 16:30:23 +08:00
func (e *Event) SetResultErr(err error) {
e.muClose.Lock()
defer e.muClose.Unlock()
if e.eventStatus != waitEventState {
return
}
e.eventStatus = workEventState
e.err = err
e.waitResult <- nil
close(e.waitResult)
e.eventStatus = closeEventState
}
2021-09-23 11:05:51 +08:00
func NewEvent(eventName string) *Event {
return &Event{
eventName: eventName,
2021-09-28 20:55:09 +08:00
eventStatus: defaultEventState,
2021-09-23 11:05:51 +08:00
}
}
type Driver struct {
maxQueueSize int
queue chan *Event
}
// Get 获取驱动
func (d *Driver) Get() *Event {
return <-d.queue
}
func (d *Driver) Put(event *Event) {
d.queue <- event
}
func (d *Driver) GetLength() int {
return len(d.queue)
2021-10-09 10:35:03 +08:00
}
2021-09-23 11:05:51 +08:00
// NewDriver 新建 Driver
func NewDriver(maxSize int) DriverInterface {
return &Driver{
maxQueueSize: maxSize,
queue: make(chan *Event, maxSize),
}
}