!30 feat-event-driver
Merge pull request !30 from bandl/feat-event-manage
This commit is contained in:
commit
98ef750a67
4
makefile
4
makefile
|
@ -3,9 +3,9 @@ STORAGE_PATH = $(BASE_PATH)/storage
|
||||||
BASE_OUT = $(BASE_PATH)/bin
|
BASE_OUT = $(BASE_PATH)/bin
|
||||||
|
|
||||||
dcgen:
|
dcgen:
|
||||||
@make gen-protobuf
|
@python3 ./shell/gen_protobuf.py
|
||||||
@python3 ./shell/proto.py
|
@python3 ./shell/proto.py
|
||||||
@make gen-struct
|
@python3 ./shell/make-struct.py
|
||||||
|
|
||||||
.PHONY : build
|
.PHONY : build
|
||||||
build:
|
build:
|
||||||
|
|
|
@ -0,0 +1,12 @@
|
||||||
|
package errorx
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
)
|
||||||
|
|
||||||
|
// New TODO 添加链路追踪等 @bandl @lgq
|
||||||
|
func New(msg string, opt ...interface{}) error {
|
||||||
|
msg = fmt.Sprintf(msg, opt...)
|
||||||
|
return errors.New(msg)
|
||||||
|
}
|
|
@ -0,0 +1,17 @@
|
||||||
|
package event
|
||||||
|
|
||||||
|
import "context"
|
||||||
|
|
||||||
|
type Consumer struct {
|
||||||
|
driver DriverInterface
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Consumer) Receive(ctx context.Context) *Event {
|
||||||
|
return c.driver.Get()
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewConsumer(driver DriverInterface) ConsumerInterface {
|
||||||
|
return &Consumer{
|
||||||
|
driver: driver,
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,25 @@
|
||||||
|
package event
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
)
|
||||||
|
|
||||||
|
type eventType int8
|
||||||
|
|
||||||
|
const (
|
||||||
|
normalEvent = eventType(iota)
|
||||||
|
waitEvent
|
||||||
|
)
|
||||||
|
|
||||||
|
type DriverInterface interface {
|
||||||
|
Get() *Event
|
||||||
|
Put(event *Event)
|
||||||
|
}
|
||||||
|
|
||||||
|
type ProduceInterface interface {
|
||||||
|
Call(ctx context.Context, event *Event)
|
||||||
|
}
|
||||||
|
|
||||||
|
type ConsumerInterface interface {
|
||||||
|
Receive(ctx context.Context) *Event
|
||||||
|
}
|
|
@ -0,0 +1,121 @@
|
||||||
|
package event
|
||||||
|
|
||||||
|
import (
|
||||||
|
"gitee.com/timedb/wheatCache/pkg/errorx"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Active func() ([]string, error) // 事件带函数
|
||||||
|
|
||||||
|
type Event struct {
|
||||||
|
msgCtx map[string]interface{}
|
||||||
|
eventName string
|
||||||
|
WorkTime time.Duration // 工作时间
|
||||||
|
msg map[string]string // 消息
|
||||||
|
waitResult chan interface{} // 等待返回
|
||||||
|
ru sync.RWMutex
|
||||||
|
eventOnType eventType
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Event) SetMsg(key string, val string) {
|
||||||
|
e.ru.Lock()
|
||||||
|
defer e.ru.Unlock()
|
||||||
|
if e.msg == nil {
|
||||||
|
e.msg = make(map[string]string)
|
||||||
|
}
|
||||||
|
e.msg[key] = val
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Event) GetMsg(key string) string {
|
||||||
|
e.ru.RLock()
|
||||||
|
defer e.ru.RUnlock()
|
||||||
|
return e.msg[key]
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetCtxValue 写入 ctx 传递用参数
|
||||||
|
func (e *Event) SetCtxValue(key string, value interface{}) {
|
||||||
|
e.ru.Lock()
|
||||||
|
defer e.ru.Unlock()
|
||||||
|
if e.msgCtx == nil {
|
||||||
|
e.msgCtx = make(map[string]interface{})
|
||||||
|
}
|
||||||
|
e.msgCtx[key] = value
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Event) GetCtxValue(key string) (interface{}, bool) {
|
||||||
|
e.ru.RLock()
|
||||||
|
defer e.ru.RUnlock()
|
||||||
|
val, ok := e.msgCtx[key]
|
||||||
|
return val, ok
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Event) SetWaitResult(val interface{}) (err error) {
|
||||||
|
defer func() {
|
||||||
|
if errChan := recover(); errChan != nil {
|
||||||
|
err = errorx.New("channel err:%v", errChan)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
e.waitResult <- val
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpdateWaitEvent 升级到 wait Event
|
||||||
|
func (e *Event) UpdateWaitEvent(ttl time.Duration) (<-chan interface{}, error) {
|
||||||
|
if e.eventOnType == waitEvent {
|
||||||
|
return nil, errorx.New("the upgrade cannot be repeated")
|
||||||
|
}
|
||||||
|
|
||||||
|
ttl = 1 * time.Second
|
||||||
|
if ttl > 0 {
|
||||||
|
e.WorkTime = ttl
|
||||||
|
}
|
||||||
|
e.waitResult = make(chan interface{})
|
||||||
|
e.eventOnType = waitEvent
|
||||||
|
|
||||||
|
return e.waitResult, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetTtlTimer 只对 wait Event 有效
|
||||||
|
func (e *Event) GetTtlTimer() (*time.Timer, error) {
|
||||||
|
if e.eventOnType != waitEvent {
|
||||||
|
return nil, errorx.New("cannot be called in normalEvent")
|
||||||
|
}
|
||||||
|
|
||||||
|
timer := time.NewTimer(e.WorkTime)
|
||||||
|
return timer, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewEvent(eventName string) *Event {
|
||||||
|
return &Event{
|
||||||
|
eventName: eventName,
|
||||||
|
eventOnType: normalEvent,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close 关闭结束事件
|
||||||
|
func (e *Event) Close() {
|
||||||
|
close(e.waitResult)
|
||||||
|
}
|
||||||
|
|
||||||
|
type Driver struct {
|
||||||
|
maxQueueSize int
|
||||||
|
queue chan *Event
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get 获取驱动
|
||||||
|
func (d *Driver) Get() *Event {
|
||||||
|
return <-d.queue
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Driver) Put(event *Event) {
|
||||||
|
d.queue <- event
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewDriver 新建 Driver
|
||||||
|
func NewDriver(maxSize int) DriverInterface {
|
||||||
|
return &Driver{
|
||||||
|
maxQueueSize: maxSize,
|
||||||
|
queue: make(chan *Event, maxSize),
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,112 @@
|
||||||
|
package event
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestEvent_SetWaitResult(t *testing.T) {
|
||||||
|
event := &Event{
|
||||||
|
waitResult: make(chan interface{}),
|
||||||
|
}
|
||||||
|
|
||||||
|
c, err := event.UpdateWaitEvent(2 * time.Second)
|
||||||
|
require.NoError(t, err)
|
||||||
|
go getValueByChannel(c)
|
||||||
|
|
||||||
|
err = event.SetWaitResult(1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
event.Close()
|
||||||
|
err = event.SetWaitResult(1)
|
||||||
|
require.Error(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getValueByChannel(c <-chan interface{}) {
|
||||||
|
for {
|
||||||
|
if i, isClose := <-c; !isClose {
|
||||||
|
break
|
||||||
|
} else {
|
||||||
|
fmt.Println(i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const testEvent = "1001"
|
||||||
|
const waitTestEvent = "1002"
|
||||||
|
|
||||||
|
// 简单 非等待响应模式, 使用 event driver
|
||||||
|
func TestEvent_DriverEventTest(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
driver := NewDriver(500)
|
||||||
|
produce := NewProduce(driver)
|
||||||
|
consumer := NewConsumer(driver)
|
||||||
|
|
||||||
|
go produceEvent(t, ctx, produce)
|
||||||
|
consumerEvent(t, ctx, consumer)
|
||||||
|
}
|
||||||
|
|
||||||
|
func produceEvent(t *testing.T, ctx context.Context, v ProduceInterface) {
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
event := NewEvent(testEvent)
|
||||||
|
event.SetCtxValue("test", i)
|
||||||
|
v.Call(ctx, event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func consumerEvent(t *testing.T, ctx context.Context, v ConsumerInterface) {
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
event := v.Receive(ctx)
|
||||||
|
res, ok := event.GetCtxValue("test")
|
||||||
|
require.True(t, ok)
|
||||||
|
fmt.Println(res)
|
||||||
|
require.Equal(t, res, i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 响应等待用法
|
||||||
|
func TestEvent_SpanWaitEvent(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
driver := NewDriver(500)
|
||||||
|
produce := NewProduce(driver)
|
||||||
|
consumer := NewConsumer(driver)
|
||||||
|
|
||||||
|
go waitConsumer(t, ctx, consumer)
|
||||||
|
|
||||||
|
waitProduce(t, ctx, produce)
|
||||||
|
}
|
||||||
|
|
||||||
|
func waitProduce(t *testing.T, ctx context.Context, v ProduceInterface) {
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
event := NewEvent(waitTestEvent)
|
||||||
|
waitChan, err := event.UpdateWaitEvent(2 * time.Second) // 设置事件过期时间, 获取结果等待对象
|
||||||
|
require.NoError(t, err)
|
||||||
|
event.SetCtxValue("test", i)
|
||||||
|
v.Call(ctx, event) // 推送给 consumer
|
||||||
|
|
||||||
|
timer, err := event.GetTtlTimer()
|
||||||
|
require.NoError(t, err)
|
||||||
|
select {
|
||||||
|
case result := <-waitChan:
|
||||||
|
require.Equal(t, result, fmt.Sprintf("test:%v", i))
|
||||||
|
fmt.Println(result)
|
||||||
|
case <-timer.C:
|
||||||
|
panic("time out")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func waitConsumer(t *testing.T, ctx context.Context, v ConsumerInterface) {
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
event := v.Receive(ctx) // 接受 produce 的 event
|
||||||
|
res, ok := event.GetCtxValue("test")
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, res, i)
|
||||||
|
|
||||||
|
err := event.SetWaitResult(fmt.Sprintf("test:%v", res)) // 发送返回值给 produce
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,17 @@
|
||||||
|
package event
|
||||||
|
|
||||||
|
import "context"
|
||||||
|
|
||||||
|
type Produce struct {
|
||||||
|
driver DriverInterface
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Produce) Call(ctx context.Context, event *Event) {
|
||||||
|
p.driver.Put(event)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewProduce(driver DriverInterface) ProduceInterface {
|
||||||
|
return &Produce{
|
||||||
|
driver: driver,
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue