test(event): add test event
This commit is contained in:
parent
6d26969cae
commit
d1113821c9
|
@ -0,0 +1,112 @@
|
|||
package event
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/stretchr/testify/require"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestEvent_SetWaitResult(t *testing.T) {
|
||||
event := &Event{
|
||||
waitResult: make(chan interface{}),
|
||||
}
|
||||
|
||||
c, err := event.UpdateWaitEvent(2 * time.Second)
|
||||
require.NoError(t, err)
|
||||
go getValueByChannel(c)
|
||||
|
||||
err = event.SetWaitResult(1)
|
||||
require.NoError(t, err)
|
||||
|
||||
event.Close()
|
||||
err = event.SetWaitResult(1)
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func getValueByChannel(c <-chan interface{}) {
|
||||
for {
|
||||
if i, isClose := <-c; !isClose {
|
||||
break
|
||||
} else {
|
||||
fmt.Println(i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const testEvent = "1001"
|
||||
const waitTestEvent = "1002"
|
||||
|
||||
// 简单 非等待响应模式, 使用 event driver
|
||||
func TestEvent_DriverEventTest(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
driver := NewDriver(500)
|
||||
produce := NewProduce(driver)
|
||||
consumer := NewConsumer(driver)
|
||||
|
||||
go produceEvent(t, ctx, produce)
|
||||
consumerEvent(t, ctx, consumer)
|
||||
}
|
||||
|
||||
func produceEvent(t *testing.T, ctx context.Context, v ProduceInterface) {
|
||||
for i := 0; i < 100; i++ {
|
||||
event := NewEvent(testEvent)
|
||||
event.SetCtxValue("test", i)
|
||||
v.Call(ctx, event)
|
||||
}
|
||||
}
|
||||
|
||||
func consumerEvent(t *testing.T, ctx context.Context, v ConsumerInterface) {
|
||||
for i := 0; i < 100; i++ {
|
||||
event := v.Receive(ctx)
|
||||
res, ok := event.GetCtxValue("test")
|
||||
require.True(t, ok)
|
||||
fmt.Println(res)
|
||||
require.Equal(t, res, i)
|
||||
}
|
||||
}
|
||||
|
||||
// 响应等待用法
|
||||
func TestEvent_SpanWaitEvent(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
driver := NewDriver(500)
|
||||
produce := NewProduce(driver)
|
||||
consumer := NewConsumer(driver)
|
||||
|
||||
go waitConsumer(t, ctx, consumer)
|
||||
|
||||
waitProduce(t, ctx, produce)
|
||||
}
|
||||
|
||||
func waitProduce(t *testing.T, ctx context.Context, v ProduceInterface) {
|
||||
for i := 0; i < 100; i++ {
|
||||
event := NewEvent(waitTestEvent)
|
||||
waitChan, err := event.UpdateWaitEvent(2 * time.Second) // 设置事件过期时间, 获取结果等待对象
|
||||
require.NoError(t, err)
|
||||
event.SetCtxValue("test", i)
|
||||
v.Call(ctx, event) // 推送给 consumer
|
||||
|
||||
timer, err := event.GetTtlTimer()
|
||||
require.NoError(t, err)
|
||||
select {
|
||||
case result := <-waitChan:
|
||||
require.Equal(t, result, fmt.Sprintf("test:%v", i))
|
||||
fmt.Println(result)
|
||||
case <-timer.C:
|
||||
panic("time out")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func waitConsumer(t *testing.T, ctx context.Context, v ConsumerInterface) {
|
||||
for i := 0; i < 100; i++ {
|
||||
event := v.Receive(ctx) // 接受 produce 的 event
|
||||
res, ok := event.GetCtxValue("test")
|
||||
require.True(t, ok)
|
||||
require.Equal(t, res, i)
|
||||
|
||||
err := event.SetWaitResult(fmt.Sprintf("test:%v", res)) // 发送返回值给 produce
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue