wheat-cache/pkg/event/driver.go

155 lines
3.0 KiB
Go

package event
import (
"sync"
"time"
"gitee.com/timedb/wheatCache/pkg/errorx"
)
type Active func() ([]string, error) // 事件带函数
type Event struct {
msgCtx map[string]interface{}
eventName string
WorkTime time.Duration // 工作时间
msg map[string]string // 消息
waitResult chan interface{} // 等待返回
err error
ru sync.RWMutex
muClose sync.Mutex //关闭锁
eventStatus eventType
}
func (e *Event) SetMsg(key string, val string) {
e.ru.Lock()
defer e.ru.Unlock()
if e.msg == nil {
e.msg = make(map[string]string)
}
e.msg[key] = val
}
func (e *Event) GetMsg(key string) string {
e.ru.RLock()
defer e.ru.RUnlock()
return e.msg[key]
}
func (e *Event) GetEventName() string {
return e.eventName
}
// SetValue 写入 ctx 传递用参数
func (e *Event) SetValue(key string, value interface{}) {
e.ru.Lock()
defer e.ru.Unlock()
if e.msgCtx == nil {
e.msgCtx = make(map[string]interface{})
}
e.msgCtx[key] = value
}
func (e *Event) GetValue(key string) (interface{}, bool) {
e.ru.RLock()
defer e.ru.RUnlock()
val, ok := e.msgCtx[key]
return val, ok
}
// InitWaitEvent 初始化 wait event 必须调用才拥有等待特性
func (e *Event) InitWaitEvent() {
e.muClose.Lock()
defer e.muClose.Unlock()
e.waitResult = make(chan interface{})
e.eventStatus = waitEventState
}
// StartWaitEvent 开始一个等待任务
func (e *Event) StartWaitEvent(ttl time.Duration) (interface{}, error) {
t := time.NewTimer(ttl)
select {
case <-t.C:
e.muClose.Lock()
defer e.muClose.Unlock()
if e.eventStatus == workEventState {
return <-e.waitResult, e.err
}
e.eventStatus = closeEventState
return nil, errorx.TimeOutErr()
case result := <-e.waitResult:
return result, e.err
}
}
func (e *Event) ExecWorkAndSendResult(work EventWorkFunc) (interface{}, error) {
e.muClose.Lock()
defer e.muClose.Unlock()
if e.eventStatus != waitEventState {
return nil, errorx.New("not wait status, exec err")
}
e.eventStatus = workEventState
res, err := work()
e.err = err
e.waitResult <- res
close(e.waitResult)
e.eventStatus = closeEventState
return res, err
}
func (e *Event) SetResultErr(err error) {
e.muClose.Lock()
defer e.muClose.Unlock()
if e.eventStatus != waitEventState {
return
}
e.eventStatus = workEventState
e.err = err
e.waitResult <- nil
close(e.waitResult)
e.eventStatus = closeEventState
}
func NewEvent(eventName string) *Event {
return &Event{
eventName: eventName,
eventStatus: defaultEventState,
}
}
type Driver struct {
maxQueueSize int
queue chan *Event
}
// Get 获取驱动
func (d *Driver) Get() *Event {
return <-d.queue
}
func (d *Driver) Put(event *Event) {
d.queue <- event
}
func (d *Driver) GetLengthJudge() bool {
// 自动对当前的最大io数量进行判断
if len(d.queue) > d.maxQueueSize * 1/2{
return true
}
return false
}
// NewDriver 新建 Driver
func NewDriver(maxSize int) DriverInterface {
return &Driver{
maxQueueSize: maxSize,
queue: make(chan *Event, maxSize),
}
}