diff --git a/pkg/event/consumer.go b/pkg/event/consumer.go index 995c123..e24155a 100644 --- a/pkg/event/consumer.go +++ b/pkg/event/consumer.go @@ -6,10 +6,18 @@ type Consumer struct { driver DriverInterface } -func (c *Consumer) Receive(ctx context.Context) *Event { +func (c *Consumer) Receive(ctx context.Context) *event { return c.driver.Get() } +func (c *Consumer) NewEvent(name string) *event { + return c.driver.NewEvent(name) +} + +func (c *Consumer) Recovery(e *event) { + c.driver.Recovery(e) +} + func NewConsumer(driver DriverInterface) ConsumerInterface { return &Consumer{ driver: driver, diff --git a/pkg/event/define.go b/pkg/event/define.go index 8921901..53153bc 100644 --- a/pkg/event/define.go +++ b/pkg/event/define.go @@ -16,16 +16,21 @@ const ( type EventWorkFunc func() (interface{}, error) type DriverInterface interface { - Get() *Event - Put(event *Event) + Get() *event + Put(*event) GetLength() int + NewEvent(string) *event + Recovery(*event) } type ProduceInterface interface { - Call(ctx context.Context, event *Event) + Call(context.Context, *event) + NewEvent(string) *event + Recovery(*event) } type ConsumerInterface interface { - Receive(ctx context.Context) *Event + Receive(ctx context.Context) *event + NewEvent(string) *event + Recovery(*event) } - diff --git a/pkg/event/driver.go b/pkg/event/driver.go index 764ff22..1da81da 100644 --- a/pkg/event/driver.go +++ b/pkg/event/driver.go @@ -7,9 +7,27 @@ import ( "gitee.com/timedb/wheatCache/pkg/errorx" ) -type Active func() ([]string, error) // 事件带函数 +// 事件 poll 降低 new 对象的频率 +type eventPoll struct { + poll chan *event +} -type Event struct { +func (e *eventPoll) getEvent() *event { + return <-e.poll +} + +func (e *eventPoll) recovery(rEvent *event) { + rEvent.Reset() + e.poll <- rEvent +} + +func newEventPoll(maxSize int) *eventPoll { + return &eventPoll{ + poll: make(chan *event, maxSize), + } +} + +type event struct { msgCtx map[string]interface{} eventName string WorkTime time.Duration // 工作时间 @@ -21,7 +39,11 @@ type Event struct { eventStatus eventType } -func (e *Event) SetMsg(key string, val string) { +func (e *event) Reset() { + +} + +func (e *event) SetMsg(key string, val string) { e.ru.Lock() defer e.ru.Unlock() if e.msg == nil { @@ -30,18 +52,18 @@ func (e *Event) SetMsg(key string, val string) { e.msg[key] = val } -func (e *Event) GetMsg(key string) string { +func (e *event) GetMsg(key string) string { e.ru.RLock() defer e.ru.RUnlock() return e.msg[key] } -func (e *Event) GetEventName() string { +func (e *event) GetEventName() string { return e.eventName } // SetValue 写入 ctx 传递用参数 -func (e *Event) SetValue(key string, value interface{}) { +func (e *event) SetValue(key string, value interface{}) { e.ru.Lock() defer e.ru.Unlock() if e.msgCtx == nil { @@ -50,7 +72,7 @@ func (e *Event) SetValue(key string, value interface{}) { e.msgCtx[key] = value } -func (e *Event) GetValue(key string) (interface{}, bool) { +func (e *event) GetValue(key string) (interface{}, bool) { e.ru.RLock() defer e.ru.RUnlock() val, ok := e.msgCtx[key] @@ -58,7 +80,7 @@ func (e *Event) GetValue(key string) (interface{}, bool) { } // InitWaitEvent 初始化 wait event 必须调用才拥有等待特性 -func (e *Event) InitWaitEvent() { +func (e *event) InitWaitEvent() { e.muClose.Lock() defer e.muClose.Unlock() e.waitResult = make(chan interface{}) @@ -66,7 +88,7 @@ func (e *Event) InitWaitEvent() { } // StartWaitEvent 开始一个等待任务 -func (e *Event) StartWaitEvent(ttl time.Duration) (interface{}, error) { +func (e *event) StartWaitEvent(ttl time.Duration) (interface{}, error) { t := time.NewTimer(ttl) select { case <-t.C: @@ -84,7 +106,7 @@ func (e *Event) StartWaitEvent(ttl time.Duration) (interface{}, error) { } } -func (e *Event) ExecWorkAndSendResult(work EventWorkFunc) (interface{}, error) { +func (e *event) ExecWorkAndSendResult(work EventWorkFunc) (interface{}, error) { e.muClose.Lock() defer e.muClose.Unlock() if e.eventStatus != waitEventState { @@ -102,7 +124,7 @@ func (e *Event) ExecWorkAndSendResult(work EventWorkFunc) (interface{}, error) { return res, err } -func (e *Event) SetResultErr(err error) { +func (e *event) SetResultErr(err error) { e.muClose.Lock() defer e.muClose.Unlock() if e.eventStatus != waitEventState { @@ -116,8 +138,8 @@ func (e *Event) SetResultErr(err error) { e.eventStatus = closeEventState } -func NewEvent(eventName string) *Event { - return &Event{ +func NewEvent(eventName string) *event { + return &event{ eventName: eventName, eventStatus: defaultEventState, } @@ -125,15 +147,16 @@ func NewEvent(eventName string) *Event { type Driver struct { maxQueueSize int - queue chan *Event + queue chan *event + poll *eventPoll } // Get 获取驱动 -func (d *Driver) Get() *Event { +func (d *Driver) Get() *event { return <-d.queue } -func (d *Driver) Put(event *Event) { +func (d *Driver) Put(event *event) { d.queue <- event } @@ -141,10 +164,21 @@ func (d *Driver) GetLength() int { return len(d.queue) } +func (d *Driver) NewEvent(name string) *event { + event := d.poll.getEvent() + event.eventName = name + return event +} + +func (d *Driver) Recovery(e *event) { + d.poll.recovery(e) +} + // NewDriver 新建 Driver func NewDriver(maxSize int) DriverInterface { return &Driver{ maxQueueSize: maxSize, - queue: make(chan *Event, maxSize), + queue: make(chan *event, maxSize), + poll: newEventPoll(maxSize), } } diff --git a/pkg/event/produce.go b/pkg/event/produce.go index e096188..7394c88 100644 --- a/pkg/event/produce.go +++ b/pkg/event/produce.go @@ -6,8 +6,16 @@ type Produce struct { driver DriverInterface } -func (p *Produce) Call(ctx context.Context, event *Event) { - p.driver.Put(event) +func (p *Produce) NewEvent(name string) *event { + return p.driver.NewEvent(name) +} + +func (p *Produce) Recovery(e *event) { + p.driver.Recovery(e) +} + +func (p *Produce) Call(ctx context.Context, e *event) { + p.driver.Put(e) } func NewProduce(driver DriverInterface) ProduceInterface {