wheat-cache/pkg/event/event_test.go

90 lines
1.5 KiB
Go
Raw Normal View History

2021-09-23 11:06:20 +08:00
package event
import (
"context"
"fmt"
2021-10-26 14:39:50 +08:00
"strconv"
"sync"
2021-09-23 11:06:20 +08:00
"testing"
"time"
2021-09-28 20:55:26 +08:00
"github.com/stretchr/testify/require"
)
2021-09-23 11:06:20 +08:00
const testEvent = "1001"
const waitTestEvent = "1002"
2021-10-26 14:39:50 +08:00
// 简单的 单向 event 使用
func Test_EventDriver(t *testing.T) {
driver := NewDriver(2000)
2021-09-23 11:06:20 +08:00
produce := NewProduce(driver)
consumer := NewConsumer(driver)
2021-10-26 14:39:50 +08:00
ctx := context.Background()
2021-09-23 11:06:20 +08:00
2021-10-26 14:39:50 +08:00
wait := sync.WaitGroup{}
wait.Add(30000)
2021-09-23 11:06:20 +08:00
2021-10-26 14:39:50 +08:00
go func() {
for i := 0; i < 30000; i++ {
event := produce.NewEvent(testEvent)
event.SetMsg("k", strconv.Itoa(i))
produce.Call(ctx, event)
2021-09-23 11:06:20 +08:00
2021-10-26 14:39:50 +08:00
}
}()
2021-09-23 11:06:20 +08:00
2021-10-26 14:39:50 +08:00
go func() {
for {
event := consumer.Receive(ctx)
fmt.Println(event.GetMsg("k"))
consumer.Recovery(event)
wait.Done()
}
}()
2021-09-23 11:06:20 +08:00
2021-10-26 14:39:50 +08:00
wait.Wait()
2021-09-23 11:06:20 +08:00
2021-10-26 14:39:50 +08:00
fmt.Println(*driver.(*Driver).poll.nowSize)
2021-09-23 11:06:20 +08:00
}
2021-10-07 16:30:23 +08:00
2021-10-26 14:39:50 +08:00
// 双向 event
func Test_WaitEventDriver(t *testing.T) {
driver := NewDriver(200)
2021-10-07 16:30:23 +08:00
produce := NewProduce(driver)
consumer := NewConsumer(driver)
2021-10-26 14:39:50 +08:00
ctx := context.Background()
wait := sync.WaitGroup{}
wait.Add(300000)
go func() {
for i := 0; i < 300000; i++ {
event := produce.NewEvent(testEvent)
event.SetMsg("k", strconv.Itoa(i))
event.InitWaitEvent()
produce.Call(ctx, event)
val, err := event.StartWaitEvent(2 * time.Second)
require.NoError(t, err)
fmt.Println(val)
produce.Recovery(event)
wait.Done()
}
}()
2021-10-07 16:30:23 +08:00
go func() {
2021-10-26 14:39:50 +08:00
for {
event := consumer.Receive(ctx)
event.ExecWorkAndSendResult(func() (interface{}, error) {
msg := event.GetMsg("k")
return "hello: " + msg, nil
})
}
2021-10-07 16:30:23 +08:00
}()
2021-10-26 14:39:50 +08:00
wait.Wait()
fmt.Println(*driver.(*Driver).poll.nowSize)
2021-10-07 16:30:23 +08:00
}