diff --git a/doc/pkg/event/事件驱动使用文档.md b/doc/pkg/event/事件驱动使用文档.md index d490667..37d5127 100644 --- a/doc/pkg/event/事件驱动使用文档.md +++ b/doc/pkg/event/事件驱动使用文档.md @@ -90,63 +90,59 @@ func TestNewConsumer(t *testing.T) { ```go func TestNewConsumer(t *testing.T) { - // 定义一个图书馆 - type Library struct { - driver DriverInterface - } - library := &Library{ - driver: NewDriver(100), - } - ctx := context.Background() +// 定义一个图书馆 +type Library struct { +driver DriverInterface +} +library := &Library{ +driver: NewDriver(100), +} +ctx := context.Background() - // 定义 A - type A struct { - produce ProduceInterface - } +// 定义 A +type A struct { +produce ProduceInterface +} - a := &A{ - produce: NewProduce(library.driver), - } +a := &A{ +produce: NewProduce(library.driver), +} - // 定义 B - type B struct { - consumer ConsumerInterface - } +// 定义 B +type B struct { +consumer ConsumerInterface +} - b := &B{ - consumer: NewConsumer(library.driver), - } +b := &B{ +consumer: NewConsumer(library.driver), +} - // 定义书 S 并且添加一些描述 - book := NewEvent("S") - book.SetMsg("title", "hello world") - book.SetCtxValue("pages", 120) +// 定义书 S 并且添加一些描述 +book := NewEvent("S") +book.SetMsg("title", "hello world") +book.SetValue("pages", 120) - // A 把书 S 放到图书馆 - go func() { - waitMsg, err := book.UpdateWaitEvent(2 * time.Hour) - require.NoError(t, err) - a.produce.Call(ctx, book) +// A 把书 S 放到图书馆 +go func() { +book.InitWaitEvent() +a.produce.Call(ctx, book) - // A 等待 B 的回复, 但是他最多只会等待 B 2个小时 - ttlTime, err := book.GetTtlTimer() - require.NoError(t, err) - select { - case msg := <-waitMsg: - fmt.Println(msg) - case <-ttlTime.C: - fmt.Println("过期不等了") - book.Close() - } - }() +// A 等待 B 的回复, 但是他最多只会等待 B 2个小时 +res, err := book.StartWaitEvent(2 * time.Hour) +require.NoError(t, err) +fmt.Println(res) +}() - // 模拟 B 去图书馆拿书 - book = b.consumer.Receive(ctx) - fmt.Println(book.GetMsg("title")) +// 模拟 B 去图书馆拿书 +book = b.consumer.Receive(ctx) +fmt.Println(book.GetMsg("title")) - // 书完好 - book.SetWaitResult("书完好") - time.Sleep(50 * time.Millisecond) +// 书完好 +book.ExecWorkAndSendResult(func() (interface{}, error) { +// b 检查书 +return "OK", nil +}) +time.Sleep(50 * time.Millisecond) } ``` diff --git a/pkg/errorx/time.go b/pkg/errorx/time.go new file mode 100644 index 0000000..90f603a --- /dev/null +++ b/pkg/errorx/time.go @@ -0,0 +1,5 @@ +package errorx + +func TimeOutErr() error { + return New("time out err") +} diff --git a/pkg/event/define.go b/pkg/event/define.go index 2fad03e..1208c3a 100644 --- a/pkg/event/define.go +++ b/pkg/event/define.go @@ -7,10 +7,14 @@ import ( type eventType int8 const ( - normalEvent = eventType(iota) - waitEvent + defaultEventState = eventType(iota) //默认情况下的状态 + waitEventState // 等待状态 + workEventState //工作状态 + closeEventState //事件关闭状态 ) +type EventWorkFunc func() (interface{}, error) + type DriverInterface interface { Get() *Event Put(event *Event) diff --git a/pkg/event/driver.go b/pkg/event/driver.go index 114f2a8..c6b6838 100644 --- a/pkg/event/driver.go +++ b/pkg/event/driver.go @@ -1,9 +1,10 @@ package event import ( - "gitee.com/timedb/wheatCache/pkg/errorx" "sync" "time" + + "gitee.com/timedb/wheatCache/pkg/errorx" ) type Active func() ([]string, error) // 事件带函数 @@ -14,8 +15,10 @@ type Event struct { WorkTime time.Duration // 工作时间 msg map[string]string // 消息 waitResult chan interface{} // 等待返回 + err error ru sync.RWMutex - eventOnType eventType + muClose sync.Mutex //关闭锁 + eventStatus eventType } func (e *Event) SetMsg(key string, val string) { @@ -33,8 +36,12 @@ func (e *Event) GetMsg(key string) string { return e.msg[key] } -// SetCtxValue 写入 ctx 传递用参数 -func (e *Event) SetCtxValue(key string, value interface{}) { +func (e *Event) GetEventName() string { + return e.eventName +} + +// SetValue 写入 ctx 传递用参数 +func (e *Event) SetValue(key string, value interface{}) { e.ru.Lock() defer e.ru.Unlock() if e.msgCtx == nil { @@ -43,61 +50,69 @@ func (e *Event) SetCtxValue(key string, value interface{}) { e.msgCtx[key] = value } -func (e *Event) GetCtxValue(key string) (interface{}, bool) { +func (e *Event) GetValue(key string) (interface{}, bool) { e.ru.RLock() defer e.ru.RUnlock() val, ok := e.msgCtx[key] return val, ok } -func (e *Event) SetWaitResult(val interface{}) (err error) { - defer func() { - if errChan := recover(); errChan != nil { - err = errorx.New("channel err:%v", errChan) - } - }() - e.waitResult <- val - return nil +func (e *Event) SetErr(err error) { + e.err = err } -// UpdateWaitEvent 升级到 wait Event -func (e *Event) UpdateWaitEvent(ttl time.Duration) (<-chan interface{}, error) { - if e.eventOnType == waitEvent { - return nil, errorx.New("the upgrade cannot be repeated") - } - - ttl = 1 * time.Second - if ttl > 0 { - e.WorkTime = ttl - } +// InitWaitEvent 初始化 wait event 必须调用才拥有等待特性 +func (e *Event) InitWaitEvent() { + e.muClose.Lock() + defer e.muClose.Unlock() e.waitResult = make(chan interface{}) - e.eventOnType = waitEvent - - return e.waitResult, nil + e.eventStatus = waitEventState } -// GetTtlTimer 只对 wait Event 有效 -func (e *Event) GetTtlTimer() (*time.Timer, error) { - if e.eventOnType != waitEvent { - return nil, errorx.New("cannot be called in normalEvent") +// StartWaitEvent 开始一个等待任务 +func (e *Event) StartWaitEvent(ttl time.Duration) (interface{}, error) { + t := time.NewTimer(ttl) + select { + case <-t.C: + e.muClose.Lock() + defer e.muClose.Unlock() + if e.eventStatus == workEventState { + return <-e.waitResult, e.err + } + + e.eventStatus = closeEventState + return nil, errorx.TimeOutErr() + + case result := <-e.waitResult: + return result, e.err + } +} + +func (e *Event) ExecWorkAndSendResult(work EventWorkFunc) (interface{}, error) { + e.muClose.Lock() + defer e.muClose.Unlock() + if e.eventStatus != waitEventState { + return nil, errorx.New("not wait status, exec err") } - timer := time.NewTimer(e.WorkTime) - return timer, nil + e.eventStatus = workEventState + + res, err := work() + e.waitResult <- res + e.err = err + + close(e.waitResult) + e.eventStatus = closeEventState + return res, err } func NewEvent(eventName string) *Event { return &Event{ eventName: eventName, - eventOnType: normalEvent, + eventStatus: defaultEventState, } } -// Close 关闭结束事件 -func (e *Event) Close() { - close(e.waitResult) -} - type Driver struct { maxQueueSize int queue chan *Event diff --git a/pkg/event/event_test.go b/pkg/event/event_test.go index d323282..bf6a6de 100644 --- a/pkg/event/event_test.go +++ b/pkg/event/event_test.go @@ -3,38 +3,12 @@ package event import ( "context" "fmt" - "github.com/stretchr/testify/require" "testing" "time" + + "github.com/stretchr/testify/require" ) -func TestEvent_SetWaitResult(t *testing.T) { - event := &Event{ - waitResult: make(chan interface{}), - } - - c, err := event.UpdateWaitEvent(2 * time.Second) - require.NoError(t, err) - go getValueByChannel(c) - - err = event.SetWaitResult(1) - require.NoError(t, err) - - event.Close() - err = event.SetWaitResult(1) - require.Error(t, err) -} - -func getValueByChannel(c <-chan interface{}) { - for { - if i, isClose := <-c; !isClose { - break - } else { - fmt.Println(i) - } - } -} - const testEvent = "1001" const waitTestEvent = "1002" @@ -52,7 +26,7 @@ func TestEvent_DriverEventTest(t *testing.T) { func produceEvent(t *testing.T, ctx context.Context, v ProduceInterface) { for i := 0; i < 100; i++ { event := NewEvent(testEvent) - event.SetCtxValue("test", i) + event.SetValue("test", i) v.Call(ctx, event) } } @@ -60,7 +34,7 @@ func produceEvent(t *testing.T, ctx context.Context, v ProduceInterface) { func consumerEvent(t *testing.T, ctx context.Context, v ConsumerInterface) { for i := 0; i < 100; i++ { event := v.Receive(ctx) - res, ok := event.GetCtxValue("test") + res, ok := event.GetValue("test") require.True(t, ok) fmt.Println(res) require.Equal(t, res, i) @@ -82,31 +56,26 @@ func TestEvent_SpanWaitEvent(t *testing.T) { func waitProduce(t *testing.T, ctx context.Context, v ProduceInterface) { for i := 0; i < 100; i++ { event := NewEvent(waitTestEvent) - waitChan, err := event.UpdateWaitEvent(2 * time.Second) // 设置事件过期时间, 获取结果等待对象 - require.NoError(t, err) - event.SetCtxValue("test", i) - v.Call(ctx, event) // 推送给 consumer - timer, err := event.GetTtlTimer() + event.InitWaitEvent() + event.SetValue("test", i) + v.Call(ctx, event) // 推送给 consumer + res, err := event.StartWaitEvent(2 * time.Second) // 最多等待 consumer 回复 2s require.NoError(t, err) - select { - case result := <-waitChan: - require.Equal(t, result, fmt.Sprintf("test:%v", i)) - fmt.Println(result) - case <-timer.C: - panic("time out") - } + require.Equal(t, fmt.Sprintf("test:%v", i), res) } } func waitConsumer(t *testing.T, ctx context.Context, v ConsumerInterface) { for i := 0; i < 100; i++ { event := v.Receive(ctx) // 接受 produce 的 event - res, ok := event.GetCtxValue("test") + res, ok := event.GetValue("test") require.True(t, ok) require.Equal(t, res, i) - err := event.SetWaitResult(fmt.Sprintf("test:%v", res)) // 发送返回值给 produce - require.NoError(t, err) + // 发送返回值给 produce + event.ExecWorkAndSendResult(func() (interface{}, error) { + return fmt.Sprintf("test:%v", res), nil + }) } }