package event import ( "sync" "time" "gitee.com/timedb/wheatCache/pkg/errorx" ) type Active func() ([]string, error) // 事件带函数 type Event struct { msgCtx map[string]interface{} eventName string WorkTime time.Duration // 工作时间 msg map[string]string // 消息 waitResult chan interface{} // 等待返回 err error ru sync.RWMutex muClose sync.Mutex //关闭锁 eventStatus eventType } func (e *Event) SetMsg(key string, val string) { e.ru.Lock() defer e.ru.Unlock() if e.msg == nil { e.msg = make(map[string]string) } e.msg[key] = val } func (e *Event) GetMsg(key string) string { e.ru.RLock() defer e.ru.RUnlock() return e.msg[key] } func (e *Event) GetEventName() string { return e.eventName } // SetValue 写入 ctx 传递用参数 func (e *Event) SetValue(key string, value interface{}) { e.ru.Lock() defer e.ru.Unlock() if e.msgCtx == nil { e.msgCtx = make(map[string]interface{}) } e.msgCtx[key] = value } func (e *Event) GetValue(key string) (interface{}, bool) { e.ru.RLock() defer e.ru.RUnlock() val, ok := e.msgCtx[key] return val, ok } func (e *Event) SetErr(err error) { e.err = err } // InitWaitEvent 初始化 wait event 必须调用才拥有等待特性 func (e *Event) InitWaitEvent() { e.muClose.Lock() defer e.muClose.Unlock() e.waitResult = make(chan interface{}) e.eventStatus = waitEventState } // StartWaitEvent 开始一个等待任务 func (e *Event) StartWaitEvent(ttl time.Duration) (interface{}, error) { t := time.NewTimer(ttl) select { case <-t.C: e.muClose.Lock() defer e.muClose.Unlock() if e.eventStatus == workEventState { return <-e.waitResult, e.err } e.eventStatus = closeEventState return nil, errorx.TimeOutErr() case result := <-e.waitResult: return result, e.err } } func (e *Event) ExecWorkAndSendResult(work EventWorkFunc) (interface{}, error) { e.muClose.Lock() defer e.muClose.Unlock() if e.eventStatus != waitEventState { return nil, errorx.New("not wait status, exec err") } e.eventStatus = workEventState res, err := work() e.waitResult <- res e.err = err close(e.waitResult) e.eventStatus = closeEventState return res, err } func NewEvent(eventName string) *Event { return &Event{ eventName: eventName, eventStatus: defaultEventState, } } type Driver struct { maxQueueSize int queue chan *Event } // Get 获取驱动 func (d *Driver) Get() *Event { return <-d.queue } func (d *Driver) Put(event *Event) { d.queue <- event } // NewDriver 新建 Driver func NewDriver(maxSize int) DriverInterface { return &Driver{ maxQueueSize: maxSize, queue: make(chan *Event, maxSize), } }