feat(event-pool): add event poll
This commit is contained in:
parent
2a69f393e5
commit
bff937700e
|
@ -6,10 +6,18 @@ type Consumer struct {
|
|||
driver DriverInterface
|
||||
}
|
||||
|
||||
func (c *Consumer) Receive(ctx context.Context) *Event {
|
||||
func (c *Consumer) Receive(ctx context.Context) *event {
|
||||
return c.driver.Get()
|
||||
}
|
||||
|
||||
func (c *Consumer) NewEvent(name string) *event {
|
||||
return c.driver.NewEvent(name)
|
||||
}
|
||||
|
||||
func (c *Consumer) Recovery(e *event) {
|
||||
c.driver.Recovery(e)
|
||||
}
|
||||
|
||||
func NewConsumer(driver DriverInterface) ConsumerInterface {
|
||||
return &Consumer{
|
||||
driver: driver,
|
||||
|
|
|
@ -16,16 +16,21 @@ const (
|
|||
type EventWorkFunc func() (interface{}, error)
|
||||
|
||||
type DriverInterface interface {
|
||||
Get() *Event
|
||||
Put(event *Event)
|
||||
Get() *event
|
||||
Put(*event)
|
||||
GetLength() int
|
||||
NewEvent(string) *event
|
||||
Recovery(*event)
|
||||
}
|
||||
|
||||
type ProduceInterface interface {
|
||||
Call(ctx context.Context, event *Event)
|
||||
Call(context.Context, *event)
|
||||
NewEvent(string) *event
|
||||
Recovery(*event)
|
||||
}
|
||||
|
||||
type ConsumerInterface interface {
|
||||
Receive(ctx context.Context) *Event
|
||||
Receive(ctx context.Context) *event
|
||||
NewEvent(string) *event
|
||||
Recovery(*event)
|
||||
}
|
||||
|
||||
|
|
|
@ -7,9 +7,27 @@ import (
|
|||
"gitee.com/timedb/wheatCache/pkg/errorx"
|
||||
)
|
||||
|
||||
type Active func() ([]string, error) // 事件带函数
|
||||
// 事件 poll 降低 new 对象的频率
|
||||
type eventPoll struct {
|
||||
poll chan *event
|
||||
}
|
||||
|
||||
type Event struct {
|
||||
func (e *eventPoll) getEvent() *event {
|
||||
return <-e.poll
|
||||
}
|
||||
|
||||
func (e *eventPoll) recovery(rEvent *event) {
|
||||
rEvent.Reset()
|
||||
e.poll <- rEvent
|
||||
}
|
||||
|
||||
func newEventPoll(maxSize int) *eventPoll {
|
||||
return &eventPoll{
|
||||
poll: make(chan *event, maxSize),
|
||||
}
|
||||
}
|
||||
|
||||
type event struct {
|
||||
msgCtx map[string]interface{}
|
||||
eventName string
|
||||
WorkTime time.Duration // 工作时间
|
||||
|
@ -21,7 +39,11 @@ type Event struct {
|
|||
eventStatus eventType
|
||||
}
|
||||
|
||||
func (e *Event) SetMsg(key string, val string) {
|
||||
func (e *event) Reset() {
|
||||
|
||||
}
|
||||
|
||||
func (e *event) SetMsg(key string, val string) {
|
||||
e.ru.Lock()
|
||||
defer e.ru.Unlock()
|
||||
if e.msg == nil {
|
||||
|
@ -30,18 +52,18 @@ func (e *Event) SetMsg(key string, val string) {
|
|||
e.msg[key] = val
|
||||
}
|
||||
|
||||
func (e *Event) GetMsg(key string) string {
|
||||
func (e *event) GetMsg(key string) string {
|
||||
e.ru.RLock()
|
||||
defer e.ru.RUnlock()
|
||||
return e.msg[key]
|
||||
}
|
||||
|
||||
func (e *Event) GetEventName() string {
|
||||
func (e *event) GetEventName() string {
|
||||
return e.eventName
|
||||
}
|
||||
|
||||
// SetValue 写入 ctx 传递用参数
|
||||
func (e *Event) SetValue(key string, value interface{}) {
|
||||
func (e *event) SetValue(key string, value interface{}) {
|
||||
e.ru.Lock()
|
||||
defer e.ru.Unlock()
|
||||
if e.msgCtx == nil {
|
||||
|
@ -50,7 +72,7 @@ func (e *Event) SetValue(key string, value interface{}) {
|
|||
e.msgCtx[key] = value
|
||||
}
|
||||
|
||||
func (e *Event) GetValue(key string) (interface{}, bool) {
|
||||
func (e *event) GetValue(key string) (interface{}, bool) {
|
||||
e.ru.RLock()
|
||||
defer e.ru.RUnlock()
|
||||
val, ok := e.msgCtx[key]
|
||||
|
@ -58,7 +80,7 @@ func (e *Event) GetValue(key string) (interface{}, bool) {
|
|||
}
|
||||
|
||||
// InitWaitEvent 初始化 wait event 必须调用才拥有等待特性
|
||||
func (e *Event) InitWaitEvent() {
|
||||
func (e *event) InitWaitEvent() {
|
||||
e.muClose.Lock()
|
||||
defer e.muClose.Unlock()
|
||||
e.waitResult = make(chan interface{})
|
||||
|
@ -66,7 +88,7 @@ func (e *Event) InitWaitEvent() {
|
|||
}
|
||||
|
||||
// StartWaitEvent 开始一个等待任务
|
||||
func (e *Event) StartWaitEvent(ttl time.Duration) (interface{}, error) {
|
||||
func (e *event) StartWaitEvent(ttl time.Duration) (interface{}, error) {
|
||||
t := time.NewTimer(ttl)
|
||||
select {
|
||||
case <-t.C:
|
||||
|
@ -84,7 +106,7 @@ func (e *Event) StartWaitEvent(ttl time.Duration) (interface{}, error) {
|
|||
}
|
||||
}
|
||||
|
||||
func (e *Event) ExecWorkAndSendResult(work EventWorkFunc) (interface{}, error) {
|
||||
func (e *event) ExecWorkAndSendResult(work EventWorkFunc) (interface{}, error) {
|
||||
e.muClose.Lock()
|
||||
defer e.muClose.Unlock()
|
||||
if e.eventStatus != waitEventState {
|
||||
|
@ -102,7 +124,7 @@ func (e *Event) ExecWorkAndSendResult(work EventWorkFunc) (interface{}, error) {
|
|||
return res, err
|
||||
}
|
||||
|
||||
func (e *Event) SetResultErr(err error) {
|
||||
func (e *event) SetResultErr(err error) {
|
||||
e.muClose.Lock()
|
||||
defer e.muClose.Unlock()
|
||||
if e.eventStatus != waitEventState {
|
||||
|
@ -116,8 +138,8 @@ func (e *Event) SetResultErr(err error) {
|
|||
e.eventStatus = closeEventState
|
||||
}
|
||||
|
||||
func NewEvent(eventName string) *Event {
|
||||
return &Event{
|
||||
func NewEvent(eventName string) *event {
|
||||
return &event{
|
||||
eventName: eventName,
|
||||
eventStatus: defaultEventState,
|
||||
}
|
||||
|
@ -125,15 +147,16 @@ func NewEvent(eventName string) *Event {
|
|||
|
||||
type Driver struct {
|
||||
maxQueueSize int
|
||||
queue chan *Event
|
||||
queue chan *event
|
||||
poll *eventPoll
|
||||
}
|
||||
|
||||
// Get 获取驱动
|
||||
func (d *Driver) Get() *Event {
|
||||
func (d *Driver) Get() *event {
|
||||
return <-d.queue
|
||||
}
|
||||
|
||||
func (d *Driver) Put(event *Event) {
|
||||
func (d *Driver) Put(event *event) {
|
||||
d.queue <- event
|
||||
}
|
||||
|
||||
|
@ -141,10 +164,21 @@ func (d *Driver) GetLength() int {
|
|||
return len(d.queue)
|
||||
}
|
||||
|
||||
func (d *Driver) NewEvent(name string) *event {
|
||||
event := d.poll.getEvent()
|
||||
event.eventName = name
|
||||
return event
|
||||
}
|
||||
|
||||
func (d *Driver) Recovery(e *event) {
|
||||
d.poll.recovery(e)
|
||||
}
|
||||
|
||||
// NewDriver 新建 Driver
|
||||
func NewDriver(maxSize int) DriverInterface {
|
||||
return &Driver{
|
||||
maxQueueSize: maxSize,
|
||||
queue: make(chan *Event, maxSize),
|
||||
queue: make(chan *event, maxSize),
|
||||
poll: newEventPoll(maxSize),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,8 +6,16 @@ type Produce struct {
|
|||
driver DriverInterface
|
||||
}
|
||||
|
||||
func (p *Produce) Call(ctx context.Context, event *Event) {
|
||||
p.driver.Put(event)
|
||||
func (p *Produce) NewEvent(name string) *event {
|
||||
return p.driver.NewEvent(name)
|
||||
}
|
||||
|
||||
func (p *Produce) Recovery(e *event) {
|
||||
p.driver.Recovery(e)
|
||||
}
|
||||
|
||||
func (p *Produce) Call(ctx context.Context, e *event) {
|
||||
p.driver.Put(e)
|
||||
}
|
||||
|
||||
func NewProduce(driver DriverInterface) ProduceInterface {
|
||||
|
|
Loading…
Reference in New Issue