test(event2): add event2 test

This commit is contained in:
bandl 2021-11-11 22:24:28 +08:00
parent 8b8fd58c09
commit 3f3b208db1
1 changed files with 152 additions and 0 deletions

152
pkg/event2/driver_test.go Normal file
View File

@ -0,0 +1,152 @@
package event2
import (
"context"
"fmt"
"strconv"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
)
const testEvent = "1001"
const waitTestEvent = "1002"
// 简单的 单向 event 使用
func Test_EventDriver(t *testing.T) {
driver := NewDriver(2000)
produce := NewProduce(driver)
consumer := NewConsumer(driver)
ctx := context.Background()
wait := sync.WaitGroup{}
wait.Add(30000)
go func() {
for i := 0; i < 30000; i++ {
event := produce.NewEvent(testEvent)
event.SetMsg("k", strconv.Itoa(i))
produce.Call(ctx, event)
}
}()
go func() {
for {
event := consumer.Receive(ctx)
fmt.Println(event.GetMsg("k"))
event.Recovery()
wait.Done()
}
}()
wait.Wait()
}
// 双向 event
func Test_EventDriver_Tow_way(t *testing.T) {
ctx := context.Background()
driver := NewDriver(2000)
produce := NewProduce(driver)
consumer := NewConsumer(driver)
go func() {
for {
event := consumer.Receive(ctx)
work, ok := event.GetValue(WorkFuncEventKey)
if !ok {
panic("get work key err")
}
workFunc, ok := work.(EventWorkFunc)
if !ok {
panic("work func err")
}
_, err := event.ExecWorkAndSendResult(workFunc)
require.NoError(t, err)
}
}()
// 一般的 two-way 模式
for i := 0; i < 10000; i++ {
event := produce.NewEvent(waitTestEvent)
event.InitWaitEvent()
event.SetValue(WorkFuncEventKey, EventWorkFunc(func() (interface{}, error) {
return i + 1, nil
}))
produce.Call(ctx, event)
res, err := event.StartWaitEvent(2 * time.Second)
require.NoError(t, err)
require.Equal(t, res, i+1)
event.Recovery()
}
// 挂起模式2 秒左右的执行时间
group := sync.WaitGroup{}
group.Add(5)
for i := 0; i < 5; i++ {
go func(i int) {
event := produce.NewEvent(waitTestEvent)
event.InitWaitEvent()
event.SetValue(WorkFuncEventKey, EventWorkFunc(func() (interface{}, error) {
// 访问 await Work 来发起一个 异步请求操作
return EventAwaitFunc(func() (interface{}, error) {
time.Sleep(time.Second)
return i + 1, nil
}), nil
}))
produce.Call(ctx, event)
res, err := event.StartWaitEvent(2 * time.Second)
require.NoError(t, err)
require.Equal(t, res, i+1)
event.Recovery()
group.Done()
}(i)
}
// 挂起成功不发生超时
for i := 0; i < 10000; i++ {
event := produce.NewEvent(waitTestEvent)
event.InitWaitEvent()
event.SetValue(WorkFuncEventKey, EventWorkFunc(func() (interface{}, error) {
return i + 1, nil
}))
produce.Call(ctx, event)
res, err := event.StartWaitEvent(500 * time.Millisecond)
require.NoError(t, err)
require.Equal(t, res, i+1)
event.Recovery()
}
group.Wait()
// 挂起一个高延迟操作, 保证局部操作还在事件中
group = sync.WaitGroup{}
group.Add(5)
for i := 0; i < 5; i++ {
event := produce.NewEvent(waitTestEvent)
event.InitWaitEvent()
event.SetValue(WorkFuncEventKey, EventWorkFunc(func() (interface{}, error) {
return EventAwaitFunc(func() (interface{}, error) {
// 返回值为 EventWorkFunc 时, 会重新加入末端队列
return EventWorkFunc(func() (interface{}, error) {
return i + 1, nil
}), nil
}), nil
}))
produce.Call(ctx, event)
res, err := event.StartWaitEvent(2 * time.Second)
require.NoError(t, err)
require.Equal(t, res, i+1)
event.Recovery()
group.Done()
fmt.Println(i)
}
group.Wait()
}