From 46b029b33979b034402a16e2ba01f188bea20f6e Mon Sep 17 00:00:00 2001 From: bandl <1658002533@qq.com> Date: Tue, 26 Oct 2021 14:39:50 +0800 Subject: [PATCH] test(event): add poll test --- pkg/event/event_test.go | 142 +++++++++++++++++++--------------------- 1 file changed, 66 insertions(+), 76 deletions(-) diff --git a/pkg/event/event_test.go b/pkg/event/event_test.go index 8da5b4c..bd61d84 100644 --- a/pkg/event/event_test.go +++ b/pkg/event/event_test.go @@ -3,7 +3,8 @@ package event import ( "context" "fmt" - "gitee.com/timedb/wheatCache/pkg/errorx" + "strconv" + "sync" "testing" "time" @@ -13,87 +14,76 @@ import ( const testEvent = "1001" const waitTestEvent = "1002" -// 简单 非等待响应模式, 使用 event driver -func TestEvent_DriverEventTest(t *testing.T) { - ctx := context.Background() - driver := NewDriver(500) +// 简单的 单向 event 使用 +func Test_EventDriver(t *testing.T) { + driver := NewDriver(2000) produce := NewProduce(driver) consumer := NewConsumer(driver) - go produceEvent(t, ctx, produce) - consumerEvent(t, ctx, consumer) -} - -func produceEvent(t *testing.T, ctx context.Context, v ProduceInterface) { - for i := 0; i < 100; i++ { - event := NewEvent(testEvent) - event.SetValue("test", i) - v.Call(ctx, event) - } -} - -func consumerEvent(t *testing.T, ctx context.Context, v ConsumerInterface) { - for i := 0; i < 100; i++ { - event := v.Receive(ctx) - res, ok := event.GetValue("test") - require.True(t, ok) - fmt.Println(res) - require.Equal(t, res, i) - } -} - -// 响应等待用法 -func TestEvent_SpanWaitEvent(t *testing.T) { ctx := context.Background() - driver := NewDriver(500) - produce := NewProduce(driver) - consumer := NewConsumer(driver) - go waitConsumer(t, ctx, consumer) + wait := sync.WaitGroup{} + wait.Add(30000) - waitProduce(t, ctx, produce) -} - -func waitProduce(t *testing.T, ctx context.Context, v ProduceInterface) { - for i := 0; i < 100; i++ { - event := NewEvent(waitTestEvent) - - event.InitWaitEvent() - event.SetValue("test", i) - v.Call(ctx, event) // 推送给 consumer - res, err := event.StartWaitEvent(2 * time.Second) // 最多等待 consumer 回复 2s - require.NoError(t, err) - require.Equal(t, fmt.Sprintf("test:%v", i), res) - } -} - -func waitConsumer(t *testing.T, ctx context.Context, v ConsumerInterface) { - for i := 0; i < 100; i++ { - event := v.Receive(ctx) // 接受 produce 的 event - res, ok := event.GetValue("test") - require.True(t, ok) - require.Equal(t, res, i) - - // 发送返回值给 produce - event.ExecWorkAndSendResult(func() (interface{}, error) { - return fmt.Sprintf("test:%v", res), nil - }) - } -} - -func TestEvent_SetResultErr(t *testing.T) { - ctx := context.Background() - event := NewEvent("dddd") - driver := NewDriver(100) - produce := NewProduce(driver) - consumer := NewConsumer(driver) go func() { - event := consumer.Receive(ctx) - event.SetResultErr(errorx.New("err")) + for i := 0; i < 30000; i++ { + event := produce.NewEvent(testEvent) + event.SetMsg("k", strconv.Itoa(i)) + produce.Call(ctx, event) + + } }() - event.InitWaitEvent() - produce.Call(ctx, event) - _, err := event.StartWaitEvent(2 * time.Second) - fmt.Println(err) - require.Error(t, err) + + go func() { + for { + event := consumer.Receive(ctx) + fmt.Println(event.GetMsg("k")) + consumer.Recovery(event) + wait.Done() + } + }() + + wait.Wait() + + fmt.Println(*driver.(*Driver).poll.nowSize) +} + +// 双向 event +func Test_WaitEventDriver(t *testing.T) { + driver := NewDriver(200) + produce := NewProduce(driver) + consumer := NewConsumer(driver) + + ctx := context.Background() + + wait := sync.WaitGroup{} + wait.Add(300000) + + go func() { + for i := 0; i < 300000; i++ { + event := produce.NewEvent(testEvent) + event.SetMsg("k", strconv.Itoa(i)) + event.InitWaitEvent() + produce.Call(ctx, event) + val, err := event.StartWaitEvent(2 * time.Second) + require.NoError(t, err) + fmt.Println(val) + produce.Recovery(event) + wait.Done() + } + }() + + go func() { + for { + event := consumer.Receive(ctx) + event.ExecWorkAndSendResult(func() (interface{}, error) { + msg := event.GetMsg("k") + return "hello: " + msg, nil + }) + } + }() + + wait.Wait() + + fmt.Println(*driver.(*Driver).poll.nowSize) }