From 0b8e2a104f35112298456cad5f4779d25a1ca656 Mon Sep 17 00:00:00 2001 From: bandl <1658002533@qq.com> Date: Tue, 28 Sep 2021 20:55:26 +0800 Subject: [PATCH] test(event): update test --- pkg/event/event_test.go | 59 ++++++++++------------------------------- 1 file changed, 14 insertions(+), 45 deletions(-) diff --git a/pkg/event/event_test.go b/pkg/event/event_test.go index d323282..bf6a6de 100644 --- a/pkg/event/event_test.go +++ b/pkg/event/event_test.go @@ -3,38 +3,12 @@ package event import ( "context" "fmt" - "github.com/stretchr/testify/require" "testing" "time" + + "github.com/stretchr/testify/require" ) -func TestEvent_SetWaitResult(t *testing.T) { - event := &Event{ - waitResult: make(chan interface{}), - } - - c, err := event.UpdateWaitEvent(2 * time.Second) - require.NoError(t, err) - go getValueByChannel(c) - - err = event.SetWaitResult(1) - require.NoError(t, err) - - event.Close() - err = event.SetWaitResult(1) - require.Error(t, err) -} - -func getValueByChannel(c <-chan interface{}) { - for { - if i, isClose := <-c; !isClose { - break - } else { - fmt.Println(i) - } - } -} - const testEvent = "1001" const waitTestEvent = "1002" @@ -52,7 +26,7 @@ func TestEvent_DriverEventTest(t *testing.T) { func produceEvent(t *testing.T, ctx context.Context, v ProduceInterface) { for i := 0; i < 100; i++ { event := NewEvent(testEvent) - event.SetCtxValue("test", i) + event.SetValue("test", i) v.Call(ctx, event) } } @@ -60,7 +34,7 @@ func produceEvent(t *testing.T, ctx context.Context, v ProduceInterface) { func consumerEvent(t *testing.T, ctx context.Context, v ConsumerInterface) { for i := 0; i < 100; i++ { event := v.Receive(ctx) - res, ok := event.GetCtxValue("test") + res, ok := event.GetValue("test") require.True(t, ok) fmt.Println(res) require.Equal(t, res, i) @@ -82,31 +56,26 @@ func TestEvent_SpanWaitEvent(t *testing.T) { func waitProduce(t *testing.T, ctx context.Context, v ProduceInterface) { for i := 0; i < 100; i++ { event := NewEvent(waitTestEvent) - waitChan, err := event.UpdateWaitEvent(2 * time.Second) // 设置事件过期时间, 获取结果等待对象 - require.NoError(t, err) - event.SetCtxValue("test", i) - v.Call(ctx, event) // 推送给 consumer - timer, err := event.GetTtlTimer() + event.InitWaitEvent() + event.SetValue("test", i) + v.Call(ctx, event) // 推送给 consumer + res, err := event.StartWaitEvent(2 * time.Second) // 最多等待 consumer 回复 2s require.NoError(t, err) - select { - case result := <-waitChan: - require.Equal(t, result, fmt.Sprintf("test:%v", i)) - fmt.Println(result) - case <-timer.C: - panic("time out") - } + require.Equal(t, fmt.Sprintf("test:%v", i), res) } } func waitConsumer(t *testing.T, ctx context.Context, v ConsumerInterface) { for i := 0; i < 100; i++ { event := v.Receive(ctx) // 接受 produce 的 event - res, ok := event.GetCtxValue("test") + res, ok := event.GetValue("test") require.True(t, ok) require.Equal(t, res, i) - err := event.SetWaitResult(fmt.Sprintf("test:%v", res)) // 发送返回值给 produce - require.NoError(t, err) + // 发送返回值给 produce + event.ExecWorkAndSendResult(func() (interface{}, error) { + return fmt.Sprintf("test:%v", res), nil + }) } }