From d1113821c96d709376b68d8ebf00e0c2997672cd Mon Sep 17 00:00:00 2001 From: bandl <1658002533@qq.com> Date: Thu, 23 Sep 2021 11:06:20 +0800 Subject: [PATCH] test(event): add test event --- pkg/event/event_test.go | 112 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 112 insertions(+) create mode 100644 pkg/event/event_test.go diff --git a/pkg/event/event_test.go b/pkg/event/event_test.go new file mode 100644 index 0000000..d323282 --- /dev/null +++ b/pkg/event/event_test.go @@ -0,0 +1,112 @@ +package event + +import ( + "context" + "fmt" + "github.com/stretchr/testify/require" + "testing" + "time" +) + +func TestEvent_SetWaitResult(t *testing.T) { + event := &Event{ + waitResult: make(chan interface{}), + } + + c, err := event.UpdateWaitEvent(2 * time.Second) + require.NoError(t, err) + go getValueByChannel(c) + + err = event.SetWaitResult(1) + require.NoError(t, err) + + event.Close() + err = event.SetWaitResult(1) + require.Error(t, err) +} + +func getValueByChannel(c <-chan interface{}) { + for { + if i, isClose := <-c; !isClose { + break + } else { + fmt.Println(i) + } + } +} + +const testEvent = "1001" +const waitTestEvent = "1002" + +// 简单 非等待响应模式, 使用 event driver +func TestEvent_DriverEventTest(t *testing.T) { + ctx := context.Background() + driver := NewDriver(500) + produce := NewProduce(driver) + consumer := NewConsumer(driver) + + go produceEvent(t, ctx, produce) + consumerEvent(t, ctx, consumer) +} + +func produceEvent(t *testing.T, ctx context.Context, v ProduceInterface) { + for i := 0; i < 100; i++ { + event := NewEvent(testEvent) + event.SetCtxValue("test", i) + v.Call(ctx, event) + } +} + +func consumerEvent(t *testing.T, ctx context.Context, v ConsumerInterface) { + for i := 0; i < 100; i++ { + event := v.Receive(ctx) + res, ok := event.GetCtxValue("test") + require.True(t, ok) + fmt.Println(res) + require.Equal(t, res, i) + } +} + +// 响应等待用法 +func TestEvent_SpanWaitEvent(t *testing.T) { + ctx := context.Background() + driver := NewDriver(500) + produce := NewProduce(driver) + consumer := NewConsumer(driver) + + go waitConsumer(t, ctx, consumer) + + waitProduce(t, ctx, produce) +} + +func waitProduce(t *testing.T, ctx context.Context, v ProduceInterface) { + for i := 0; i < 100; i++ { + event := NewEvent(waitTestEvent) + waitChan, err := event.UpdateWaitEvent(2 * time.Second) // 设置事件过期时间, 获取结果等待对象 + require.NoError(t, err) + event.SetCtxValue("test", i) + v.Call(ctx, event) // 推送给 consumer + + timer, err := event.GetTtlTimer() + require.NoError(t, err) + select { + case result := <-waitChan: + require.Equal(t, result, fmt.Sprintf("test:%v", i)) + fmt.Println(result) + case <-timer.C: + panic("time out") + } + } +} + +func waitConsumer(t *testing.T, ctx context.Context, v ConsumerInterface) { + for i := 0; i < 100; i++ { + event := v.Receive(ctx) // 接受 produce 的 event + res, ok := event.GetCtxValue("test") + require.True(t, ok) + require.Equal(t, res, i) + + err := event.SetWaitResult(fmt.Sprintf("test:%v", res)) // 发送返回值给 produce + require.NoError(t, err) + } +}