!36 修复 event 的一些 bug

Merge pull request !36 from bandl/fix-event-bug
This commit is contained in:
bandl 2021-09-28 13:09:40 +00:00 committed by Gitee
commit 0677c56492
5 changed files with 122 additions and 133 deletions

View File

@ -90,63 +90,59 @@ func TestNewConsumer(t *testing.T) {
```go ```go
func TestNewConsumer(t *testing.T) { func TestNewConsumer(t *testing.T) {
// 定义一个图书馆 // 定义一个图书馆
type Library struct { type Library struct {
driver DriverInterface driver DriverInterface
} }
library := &Library{ library := &Library{
driver: NewDriver(100), driver: NewDriver(100),
} }
ctx := context.Background() ctx := context.Background()
// 定义 A // 定义 A
type A struct { type A struct {
produce ProduceInterface produce ProduceInterface
} }
a := &A{ a := &A{
produce: NewProduce(library.driver), produce: NewProduce(library.driver),
} }
// 定义 B // 定义 B
type B struct { type B struct {
consumer ConsumerInterface consumer ConsumerInterface
} }
b := &B{ b := &B{
consumer: NewConsumer(library.driver), consumer: NewConsumer(library.driver),
} }
// 定义书 S 并且添加一些描述 // 定义书 S 并且添加一些描述
book := NewEvent("S") book := NewEvent("S")
book.SetMsg("title", "hello world") book.SetMsg("title", "hello world")
book.SetCtxValue("pages", 120) book.SetValue("pages", 120)
// A 把书 S 放到图书馆 // A 把书 S 放到图书馆
go func() { go func() {
waitMsg, err := book.UpdateWaitEvent(2 * time.Hour) book.InitWaitEvent()
require.NoError(t, err) a.produce.Call(ctx, book)
a.produce.Call(ctx, book)
// A 等待 B 的回复, 但是他最多只会等待 B 2个小时 // A 等待 B 的回复, 但是他最多只会等待 B 2个小时
ttlTime, err := book.GetTtlTimer() res, err := book.StartWaitEvent(2 * time.Hour)
require.NoError(t, err) require.NoError(t, err)
select { fmt.Println(res)
case msg := <-waitMsg: }()
fmt.Println(msg)
case <-ttlTime.C:
fmt.Println("过期不等了")
book.Close()
}
}()
// 模拟 B 去图书馆拿书 // 模拟 B 去图书馆拿书
book = b.consumer.Receive(ctx) book = b.consumer.Receive(ctx)
fmt.Println(book.GetMsg("title")) fmt.Println(book.GetMsg("title"))
// 书完好 // 书完好
book.SetWaitResult("书完好") book.ExecWorkAndSendResult(func() (interface{}, error) {
time.Sleep(50 * time.Millisecond) // b 检查书
return "OK", nil
})
time.Sleep(50 * time.Millisecond)
} }
``` ```

5
pkg/errorx/time.go Normal file
View File

@ -0,0 +1,5 @@
package errorx
func TimeOutErr() error {
return New("time out err")
}

View File

@ -7,10 +7,14 @@ import (
type eventType int8 type eventType int8
const ( const (
normalEvent = eventType(iota) defaultEventState = eventType(iota) //默认情况下的状态
waitEvent waitEventState // 等待状态
workEventState //工作状态
closeEventState //事件关闭状态
) )
type EventWorkFunc func() (interface{}, error)
type DriverInterface interface { type DriverInterface interface {
Get() *Event Get() *Event
Put(event *Event) Put(event *Event)

View File

@ -1,9 +1,10 @@
package event package event
import ( import (
"gitee.com/timedb/wheatCache/pkg/errorx"
"sync" "sync"
"time" "time"
"gitee.com/timedb/wheatCache/pkg/errorx"
) )
type Active func() ([]string, error) // 事件带函数 type Active func() ([]string, error) // 事件带函数
@ -14,8 +15,10 @@ type Event struct {
WorkTime time.Duration // 工作时间 WorkTime time.Duration // 工作时间
msg map[string]string // 消息 msg map[string]string // 消息
waitResult chan interface{} // 等待返回 waitResult chan interface{} // 等待返回
err error
ru sync.RWMutex ru sync.RWMutex
eventOnType eventType muClose sync.Mutex //关闭锁
eventStatus eventType
} }
func (e *Event) SetMsg(key string, val string) { func (e *Event) SetMsg(key string, val string) {
@ -33,8 +36,12 @@ func (e *Event) GetMsg(key string) string {
return e.msg[key] return e.msg[key]
} }
// SetCtxValue 写入 ctx 传递用参数 func (e *Event) GetEventName() string {
func (e *Event) SetCtxValue(key string, value interface{}) { return e.eventName
}
// SetValue 写入 ctx 传递用参数
func (e *Event) SetValue(key string, value interface{}) {
e.ru.Lock() e.ru.Lock()
defer e.ru.Unlock() defer e.ru.Unlock()
if e.msgCtx == nil { if e.msgCtx == nil {
@ -43,61 +50,69 @@ func (e *Event) SetCtxValue(key string, value interface{}) {
e.msgCtx[key] = value e.msgCtx[key] = value
} }
func (e *Event) GetCtxValue(key string) (interface{}, bool) { func (e *Event) GetValue(key string) (interface{}, bool) {
e.ru.RLock() e.ru.RLock()
defer e.ru.RUnlock() defer e.ru.RUnlock()
val, ok := e.msgCtx[key] val, ok := e.msgCtx[key]
return val, ok return val, ok
} }
func (e *Event) SetWaitResult(val interface{}) (err error) { func (e *Event) SetErr(err error) {
defer func() { e.err = err
if errChan := recover(); errChan != nil {
err = errorx.New("channel err:%v", errChan)
}
}()
e.waitResult <- val
return nil
} }
// UpdateWaitEvent 升级到 wait Event // InitWaitEvent 初始化 wait event 必须调用才拥有等待特性
func (e *Event) UpdateWaitEvent(ttl time.Duration) (<-chan interface{}, error) { func (e *Event) InitWaitEvent() {
if e.eventOnType == waitEvent { e.muClose.Lock()
return nil, errorx.New("the upgrade cannot be repeated") defer e.muClose.Unlock()
}
ttl = 1 * time.Second
if ttl > 0 {
e.WorkTime = ttl
}
e.waitResult = make(chan interface{}) e.waitResult = make(chan interface{})
e.eventOnType = waitEvent e.eventStatus = waitEventState
return e.waitResult, nil
} }
// GetTtlTimer 只对 wait Event 有效 // StartWaitEvent 开始一个等待任务
func (e *Event) GetTtlTimer() (*time.Timer, error) { func (e *Event) StartWaitEvent(ttl time.Duration) (interface{}, error) {
if e.eventOnType != waitEvent { t := time.NewTimer(ttl)
return nil, errorx.New("cannot be called in normalEvent") select {
case <-t.C:
e.muClose.Lock()
defer e.muClose.Unlock()
if e.eventStatus == workEventState {
return <-e.waitResult, e.err
}
e.eventStatus = closeEventState
return nil, errorx.TimeOutErr()
case result := <-e.waitResult:
return result, e.err
}
}
func (e *Event) ExecWorkAndSendResult(work EventWorkFunc) (interface{}, error) {
e.muClose.Lock()
defer e.muClose.Unlock()
if e.eventStatus != waitEventState {
return nil, errorx.New("not wait status, exec err")
} }
timer := time.NewTimer(e.WorkTime) e.eventStatus = workEventState
return timer, nil
res, err := work()
e.waitResult <- res
e.err = err
close(e.waitResult)
e.eventStatus = closeEventState
return res, err
} }
func NewEvent(eventName string) *Event { func NewEvent(eventName string) *Event {
return &Event{ return &Event{
eventName: eventName, eventName: eventName,
eventOnType: normalEvent, eventStatus: defaultEventState,
} }
} }
// Close 关闭结束事件
func (e *Event) Close() {
close(e.waitResult)
}
type Driver struct { type Driver struct {
maxQueueSize int maxQueueSize int
queue chan *Event queue chan *Event

View File

@ -3,38 +3,12 @@ package event
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/stretchr/testify/require"
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/require"
) )
func TestEvent_SetWaitResult(t *testing.T) {
event := &Event{
waitResult: make(chan interface{}),
}
c, err := event.UpdateWaitEvent(2 * time.Second)
require.NoError(t, err)
go getValueByChannel(c)
err = event.SetWaitResult(1)
require.NoError(t, err)
event.Close()
err = event.SetWaitResult(1)
require.Error(t, err)
}
func getValueByChannel(c <-chan interface{}) {
for {
if i, isClose := <-c; !isClose {
break
} else {
fmt.Println(i)
}
}
}
const testEvent = "1001" const testEvent = "1001"
const waitTestEvent = "1002" const waitTestEvent = "1002"
@ -52,7 +26,7 @@ func TestEvent_DriverEventTest(t *testing.T) {
func produceEvent(t *testing.T, ctx context.Context, v ProduceInterface) { func produceEvent(t *testing.T, ctx context.Context, v ProduceInterface) {
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
event := NewEvent(testEvent) event := NewEvent(testEvent)
event.SetCtxValue("test", i) event.SetValue("test", i)
v.Call(ctx, event) v.Call(ctx, event)
} }
} }
@ -60,7 +34,7 @@ func produceEvent(t *testing.T, ctx context.Context, v ProduceInterface) {
func consumerEvent(t *testing.T, ctx context.Context, v ConsumerInterface) { func consumerEvent(t *testing.T, ctx context.Context, v ConsumerInterface) {
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
event := v.Receive(ctx) event := v.Receive(ctx)
res, ok := event.GetCtxValue("test") res, ok := event.GetValue("test")
require.True(t, ok) require.True(t, ok)
fmt.Println(res) fmt.Println(res)
require.Equal(t, res, i) require.Equal(t, res, i)
@ -82,31 +56,26 @@ func TestEvent_SpanWaitEvent(t *testing.T) {
func waitProduce(t *testing.T, ctx context.Context, v ProduceInterface) { func waitProduce(t *testing.T, ctx context.Context, v ProduceInterface) {
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
event := NewEvent(waitTestEvent) event := NewEvent(waitTestEvent)
waitChan, err := event.UpdateWaitEvent(2 * time.Second) // 设置事件过期时间, 获取结果等待对象
require.NoError(t, err)
event.SetCtxValue("test", i)
v.Call(ctx, event) // 推送给 consumer
timer, err := event.GetTtlTimer() event.InitWaitEvent()
event.SetValue("test", i)
v.Call(ctx, event) // 推送给 consumer
res, err := event.StartWaitEvent(2 * time.Second) // 最多等待 consumer 回复 2s
require.NoError(t, err) require.NoError(t, err)
select { require.Equal(t, fmt.Sprintf("test:%v", i), res)
case result := <-waitChan:
require.Equal(t, result, fmt.Sprintf("test:%v", i))
fmt.Println(result)
case <-timer.C:
panic("time out")
}
} }
} }
func waitConsumer(t *testing.T, ctx context.Context, v ConsumerInterface) { func waitConsumer(t *testing.T, ctx context.Context, v ConsumerInterface) {
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
event := v.Receive(ctx) // 接受 produce 的 event event := v.Receive(ctx) // 接受 produce 的 event
res, ok := event.GetCtxValue("test") res, ok := event.GetValue("test")
require.True(t, ok) require.True(t, ok)
require.Equal(t, res, i) require.Equal(t, res, i)
err := event.SetWaitResult(fmt.Sprintf("test:%v", res)) // 发送返回值给 produce // 发送返回值给 produce
require.NoError(t, err) event.ExecWorkAndSendResult(func() (interface{}, error) {
return fmt.Sprintf("test:%v", res), nil
})
} }
} }