test(event): update test

This commit is contained in:
bandl 2021-09-28 20:55:26 +08:00
parent 748f61830b
commit 0b8e2a104f
1 changed files with 14 additions and 45 deletions

View File

@ -3,38 +3,12 @@ package event
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/stretchr/testify/require"
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/require"
) )
func TestEvent_SetWaitResult(t *testing.T) {
event := &Event{
waitResult: make(chan interface{}),
}
c, err := event.UpdateWaitEvent(2 * time.Second)
require.NoError(t, err)
go getValueByChannel(c)
err = event.SetWaitResult(1)
require.NoError(t, err)
event.Close()
err = event.SetWaitResult(1)
require.Error(t, err)
}
func getValueByChannel(c <-chan interface{}) {
for {
if i, isClose := <-c; !isClose {
break
} else {
fmt.Println(i)
}
}
}
const testEvent = "1001" const testEvent = "1001"
const waitTestEvent = "1002" const waitTestEvent = "1002"
@ -52,7 +26,7 @@ func TestEvent_DriverEventTest(t *testing.T) {
func produceEvent(t *testing.T, ctx context.Context, v ProduceInterface) { func produceEvent(t *testing.T, ctx context.Context, v ProduceInterface) {
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
event := NewEvent(testEvent) event := NewEvent(testEvent)
event.SetCtxValue("test", i) event.SetValue("test", i)
v.Call(ctx, event) v.Call(ctx, event)
} }
} }
@ -60,7 +34,7 @@ func produceEvent(t *testing.T, ctx context.Context, v ProduceInterface) {
func consumerEvent(t *testing.T, ctx context.Context, v ConsumerInterface) { func consumerEvent(t *testing.T, ctx context.Context, v ConsumerInterface) {
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
event := v.Receive(ctx) event := v.Receive(ctx)
res, ok := event.GetCtxValue("test") res, ok := event.GetValue("test")
require.True(t, ok) require.True(t, ok)
fmt.Println(res) fmt.Println(res)
require.Equal(t, res, i) require.Equal(t, res, i)
@ -82,31 +56,26 @@ func TestEvent_SpanWaitEvent(t *testing.T) {
func waitProduce(t *testing.T, ctx context.Context, v ProduceInterface) { func waitProduce(t *testing.T, ctx context.Context, v ProduceInterface) {
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
event := NewEvent(waitTestEvent) event := NewEvent(waitTestEvent)
waitChan, err := event.UpdateWaitEvent(2 * time.Second) // 设置事件过期时间, 获取结果等待对象
require.NoError(t, err)
event.SetCtxValue("test", i)
v.Call(ctx, event) // 推送给 consumer
timer, err := event.GetTtlTimer() event.InitWaitEvent()
event.SetValue("test", i)
v.Call(ctx, event) // 推送给 consumer
res, err := event.StartWaitEvent(2 * time.Second) // 最多等待 consumer 回复 2s
require.NoError(t, err) require.NoError(t, err)
select { require.Equal(t, fmt.Sprintf("test:%v", i), res)
case result := <-waitChan:
require.Equal(t, result, fmt.Sprintf("test:%v", i))
fmt.Println(result)
case <-timer.C:
panic("time out")
}
} }
} }
func waitConsumer(t *testing.T, ctx context.Context, v ConsumerInterface) { func waitConsumer(t *testing.T, ctx context.Context, v ConsumerInterface) {
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
event := v.Receive(ctx) // 接受 produce 的 event event := v.Receive(ctx) // 接受 produce 的 event
res, ok := event.GetCtxValue("test") res, ok := event.GetValue("test")
require.True(t, ok) require.True(t, ok)
require.Equal(t, res, i) require.Equal(t, res, i)
err := event.SetWaitResult(fmt.Sprintf("test:%v", res)) // 发送返回值给 produce // 发送返回值给 produce
require.NoError(t, err) event.ExecWorkAndSendResult(func() (interface{}, error) {
return fmt.Sprintf("test:%v", res), nil
})
} }
} }