test(event): add poll test

This commit is contained in:
bandl 2021-10-26 14:39:50 +08:00
parent ac4fdd7309
commit 46b029b339
1 changed files with 66 additions and 76 deletions

View File

@ -3,7 +3,8 @@ package event
import ( import (
"context" "context"
"fmt" "fmt"
"gitee.com/timedb/wheatCache/pkg/errorx" "strconv"
"sync"
"testing" "testing"
"time" "time"
@ -13,87 +14,76 @@ import (
const testEvent = "1001" const testEvent = "1001"
const waitTestEvent = "1002" const waitTestEvent = "1002"
// 简单 非等待响应模式, 使用 event driver // 简单的 单向 event 使用
func TestEvent_DriverEventTest(t *testing.T) { func Test_EventDriver(t *testing.T) {
ctx := context.Background() driver := NewDriver(2000)
driver := NewDriver(500)
produce := NewProduce(driver) produce := NewProduce(driver)
consumer := NewConsumer(driver) consumer := NewConsumer(driver)
go produceEvent(t, ctx, produce)
consumerEvent(t, ctx, consumer)
}
func produceEvent(t *testing.T, ctx context.Context, v ProduceInterface) {
for i := 0; i < 100; i++ {
event := NewEvent(testEvent)
event.SetValue("test", i)
v.Call(ctx, event)
}
}
func consumerEvent(t *testing.T, ctx context.Context, v ConsumerInterface) {
for i := 0; i < 100; i++ {
event := v.Receive(ctx)
res, ok := event.GetValue("test")
require.True(t, ok)
fmt.Println(res)
require.Equal(t, res, i)
}
}
// 响应等待用法
func TestEvent_SpanWaitEvent(t *testing.T) {
ctx := context.Background() ctx := context.Background()
driver := NewDriver(500)
produce := NewProduce(driver)
consumer := NewConsumer(driver)
go waitConsumer(t, ctx, consumer) wait := sync.WaitGroup{}
wait.Add(30000)
waitProduce(t, ctx, produce)
}
func waitProduce(t *testing.T, ctx context.Context, v ProduceInterface) {
for i := 0; i < 100; i++ {
event := NewEvent(waitTestEvent)
event.InitWaitEvent()
event.SetValue("test", i)
v.Call(ctx, event) // 推送给 consumer
res, err := event.StartWaitEvent(2 * time.Second) // 最多等待 consumer 回复 2s
require.NoError(t, err)
require.Equal(t, fmt.Sprintf("test:%v", i), res)
}
}
func waitConsumer(t *testing.T, ctx context.Context, v ConsumerInterface) {
for i := 0; i < 100; i++ {
event := v.Receive(ctx) // 接受 produce 的 event
res, ok := event.GetValue("test")
require.True(t, ok)
require.Equal(t, res, i)
// 发送返回值给 produce
event.ExecWorkAndSendResult(func() (interface{}, error) {
return fmt.Sprintf("test:%v", res), nil
})
}
}
func TestEvent_SetResultErr(t *testing.T) {
ctx := context.Background()
event := NewEvent("dddd")
driver := NewDriver(100)
produce := NewProduce(driver)
consumer := NewConsumer(driver)
go func() { go func() {
event := consumer.Receive(ctx) for i := 0; i < 30000; i++ {
event.SetResultErr(errorx.New("err")) event := produce.NewEvent(testEvent)
event.SetMsg("k", strconv.Itoa(i))
produce.Call(ctx, event)
}
}() }()
event.InitWaitEvent()
produce.Call(ctx, event) go func() {
_, err := event.StartWaitEvent(2 * time.Second) for {
fmt.Println(err) event := consumer.Receive(ctx)
require.Error(t, err) fmt.Println(event.GetMsg("k"))
consumer.Recovery(event)
wait.Done()
}
}()
wait.Wait()
fmt.Println(*driver.(*Driver).poll.nowSize)
}
// 双向 event
func Test_WaitEventDriver(t *testing.T) {
driver := NewDriver(200)
produce := NewProduce(driver)
consumer := NewConsumer(driver)
ctx := context.Background()
wait := sync.WaitGroup{}
wait.Add(300000)
go func() {
for i := 0; i < 300000; i++ {
event := produce.NewEvent(testEvent)
event.SetMsg("k", strconv.Itoa(i))
event.InitWaitEvent()
produce.Call(ctx, event)
val, err := event.StartWaitEvent(2 * time.Second)
require.NoError(t, err)
fmt.Println(val)
produce.Recovery(event)
wait.Done()
}
}()
go func() {
for {
event := consumer.Receive(ctx)
event.ExecWorkAndSendResult(func() (interface{}, error) {
msg := event.GetMsg("k")
return "hello: " + msg, nil
})
}
}()
wait.Wait()
fmt.Println(*driver.(*Driver).poll.nowSize)
} }