diff --git a/makefile b/makefile index ad04c6b..822e612 100644 --- a/makefile +++ b/makefile @@ -3,9 +3,9 @@ STORAGE_PATH = $(BASE_PATH)/storage BASE_OUT = $(BASE_PATH)/bin dcgen: - @make gen-protobuf + @python3 ./shell/gen_protobuf.py @python3 ./shell/proto.py - @make gen-struct + @python3 ./shell/make-struct.py .PHONY : build build: diff --git a/pkg/errorx/err.go b/pkg/errorx/err.go new file mode 100644 index 0000000..364cb52 --- /dev/null +++ b/pkg/errorx/err.go @@ -0,0 +1,12 @@ +package errorx + +import ( + "errors" + "fmt" +) + +// New TODO 添加链路追踪等 @bandl @lgq +func New(msg string, opt ...interface{}) error { + msg = fmt.Sprintf(msg, opt...) + return errors.New(msg) +} diff --git a/pkg/event/consumer.go b/pkg/event/consumer.go new file mode 100644 index 0000000..995c123 --- /dev/null +++ b/pkg/event/consumer.go @@ -0,0 +1,17 @@ +package event + +import "context" + +type Consumer struct { + driver DriverInterface +} + +func (c *Consumer) Receive(ctx context.Context) *Event { + return c.driver.Get() +} + +func NewConsumer(driver DriverInterface) ConsumerInterface { + return &Consumer{ + driver: driver, + } +} diff --git a/pkg/event/define.go b/pkg/event/define.go new file mode 100644 index 0000000..2fad03e --- /dev/null +++ b/pkg/event/define.go @@ -0,0 +1,25 @@ +package event + +import ( + "context" +) + +type eventType int8 + +const ( + normalEvent = eventType(iota) + waitEvent +) + +type DriverInterface interface { + Get() *Event + Put(event *Event) +} + +type ProduceInterface interface { + Call(ctx context.Context, event *Event) +} + +type ConsumerInterface interface { + Receive(ctx context.Context) *Event +} diff --git a/pkg/event/driver.go b/pkg/event/driver.go new file mode 100644 index 0000000..114f2a8 --- /dev/null +++ b/pkg/event/driver.go @@ -0,0 +1,121 @@ +package event + +import ( + "gitee.com/timedb/wheatCache/pkg/errorx" + "sync" + "time" +) + +type Active func() ([]string, error) // 事件带函数 + +type Event struct { + msgCtx map[string]interface{} + eventName string + WorkTime time.Duration // 工作时间 + msg map[string]string // 消息 + waitResult chan interface{} // 等待返回 + ru sync.RWMutex + eventOnType eventType +} + +func (e *Event) SetMsg(key string, val string) { + e.ru.Lock() + defer e.ru.Unlock() + if e.msg == nil { + e.msg = make(map[string]string) + } + e.msg[key] = val +} + +func (e *Event) GetMsg(key string) string { + e.ru.RLock() + defer e.ru.RUnlock() + return e.msg[key] +} + +// SetCtxValue 写入 ctx 传递用参数 +func (e *Event) SetCtxValue(key string, value interface{}) { + e.ru.Lock() + defer e.ru.Unlock() + if e.msgCtx == nil { + e.msgCtx = make(map[string]interface{}) + } + e.msgCtx[key] = value +} + +func (e *Event) GetCtxValue(key string) (interface{}, bool) { + e.ru.RLock() + defer e.ru.RUnlock() + val, ok := e.msgCtx[key] + return val, ok +} + +func (e *Event) SetWaitResult(val interface{}) (err error) { + defer func() { + if errChan := recover(); errChan != nil { + err = errorx.New("channel err:%v", errChan) + } + }() + e.waitResult <- val + return nil +} + +// UpdateWaitEvent 升级到 wait Event +func (e *Event) UpdateWaitEvent(ttl time.Duration) (<-chan interface{}, error) { + if e.eventOnType == waitEvent { + return nil, errorx.New("the upgrade cannot be repeated") + } + + ttl = 1 * time.Second + if ttl > 0 { + e.WorkTime = ttl + } + e.waitResult = make(chan interface{}) + e.eventOnType = waitEvent + + return e.waitResult, nil +} + +// GetTtlTimer 只对 wait Event 有效 +func (e *Event) GetTtlTimer() (*time.Timer, error) { + if e.eventOnType != waitEvent { + return nil, errorx.New("cannot be called in normalEvent") + } + + timer := time.NewTimer(e.WorkTime) + return timer, nil +} + +func NewEvent(eventName string) *Event { + return &Event{ + eventName: eventName, + eventOnType: normalEvent, + } +} + +// Close 关闭结束事件 +func (e *Event) Close() { + close(e.waitResult) +} + +type Driver struct { + maxQueueSize int + queue chan *Event +} + +// Get 获取驱动 +func (d *Driver) Get() *Event { + return <-d.queue +} + +func (d *Driver) Put(event *Event) { + d.queue <- event +} + +// NewDriver 新建 Driver +func NewDriver(maxSize int) DriverInterface { + return &Driver{ + maxQueueSize: maxSize, + queue: make(chan *Event, maxSize), + } +} diff --git a/pkg/event/event_test.go b/pkg/event/event_test.go new file mode 100644 index 0000000..d323282 --- /dev/null +++ b/pkg/event/event_test.go @@ -0,0 +1,112 @@ +package event + +import ( + "context" + "fmt" + "github.com/stretchr/testify/require" + "testing" + "time" +) + +func TestEvent_SetWaitResult(t *testing.T) { + event := &Event{ + waitResult: make(chan interface{}), + } + + c, err := event.UpdateWaitEvent(2 * time.Second) + require.NoError(t, err) + go getValueByChannel(c) + + err = event.SetWaitResult(1) + require.NoError(t, err) + + event.Close() + err = event.SetWaitResult(1) + require.Error(t, err) +} + +func getValueByChannel(c <-chan interface{}) { + for { + if i, isClose := <-c; !isClose { + break + } else { + fmt.Println(i) + } + } +} + +const testEvent = "1001" +const waitTestEvent = "1002" + +// 简单 非等待响应模式, 使用 event driver +func TestEvent_DriverEventTest(t *testing.T) { + ctx := context.Background() + driver := NewDriver(500) + produce := NewProduce(driver) + consumer := NewConsumer(driver) + + go produceEvent(t, ctx, produce) + consumerEvent(t, ctx, consumer) +} + +func produceEvent(t *testing.T, ctx context.Context, v ProduceInterface) { + for i := 0; i < 100; i++ { + event := NewEvent(testEvent) + event.SetCtxValue("test", i) + v.Call(ctx, event) + } +} + +func consumerEvent(t *testing.T, ctx context.Context, v ConsumerInterface) { + for i := 0; i < 100; i++ { + event := v.Receive(ctx) + res, ok := event.GetCtxValue("test") + require.True(t, ok) + fmt.Println(res) + require.Equal(t, res, i) + } +} + +// 响应等待用法 +func TestEvent_SpanWaitEvent(t *testing.T) { + ctx := context.Background() + driver := NewDriver(500) + produce := NewProduce(driver) + consumer := NewConsumer(driver) + + go waitConsumer(t, ctx, consumer) + + waitProduce(t, ctx, produce) +} + +func waitProduce(t *testing.T, ctx context.Context, v ProduceInterface) { + for i := 0; i < 100; i++ { + event := NewEvent(waitTestEvent) + waitChan, err := event.UpdateWaitEvent(2 * time.Second) // 设置事件过期时间, 获取结果等待对象 + require.NoError(t, err) + event.SetCtxValue("test", i) + v.Call(ctx, event) // 推送给 consumer + + timer, err := event.GetTtlTimer() + require.NoError(t, err) + select { + case result := <-waitChan: + require.Equal(t, result, fmt.Sprintf("test:%v", i)) + fmt.Println(result) + case <-timer.C: + panic("time out") + } + } +} + +func waitConsumer(t *testing.T, ctx context.Context, v ConsumerInterface) { + for i := 0; i < 100; i++ { + event := v.Receive(ctx) // 接受 produce 的 event + res, ok := event.GetCtxValue("test") + require.True(t, ok) + require.Equal(t, res, i) + + err := event.SetWaitResult(fmt.Sprintf("test:%v", res)) // 发送返回值给 produce + require.NoError(t, err) + } +} diff --git a/pkg/event/produce.go b/pkg/event/produce.go new file mode 100644 index 0000000..e096188 --- /dev/null +++ b/pkg/event/produce.go @@ -0,0 +1,17 @@ +package event + +import "context" + +type Produce struct { + driver DriverInterface +} + +func (p *Produce) Call(ctx context.Context, event *Event) { + p.driver.Put(event) +} + +func NewProduce(driver DriverInterface) ProduceInterface { + return &Produce{ + driver: driver, + } +}