forked from p53841790/wheat-cache
commit
f4327aa5b6
|
@ -0,0 +1,154 @@
|
|||
### 事件驱动使用文档
|
||||
|
||||
|
||||
|
||||
- 包目录: pkg/event
|
||||
- 使用场景, 异步推送数据
|
||||
|
||||
|
||||
|
||||
#### 事件驱动基本概念
|
||||
|
||||
- event
|
||||
|
||||
event 是事件驱动的事件部分, 主要负责一些信息的传递, 一共分为,有等待和无等待的事件, 无等待事件 consumer 消费事件后,不会给 produce 回复, 有等待事件, 支持事件消费后, 向 produce 发送一个回复。
|
||||
|
||||
- driver
|
||||
|
||||
driver 是事件的驱动, 主要复制连接 produce, 以及 consumer, 其中维护了一个 event 的队列。
|
||||
|
||||
- produce
|
||||
|
||||
produce 主要负责 推送 事件, 每个 produce 都需要维护一个 事件驱动通过 driver 来进行数据的推送。
|
||||
|
||||
- consumer
|
||||
|
||||
consumer 负责接收事件, 每个 consumer 都需要注册一个 事件驱动 来获取 event。
|
||||
|
||||
####
|
||||
|
||||
#### 应用场景
|
||||
|
||||
事件驱动主要用于 异步消息的推送(目前的实现也支持 同步的消息实现), 一般来说 produce 发送 event 后 event 被消费过程会完全于 produce 无关。
|
||||
|
||||
|
||||
|
||||
#### 无返回 event 使用例子
|
||||
|
||||
我们使用 借书为例子使用, 假设 A 要把 书 S 交还给 B, 他们约定, A 把书 S 放到图书馆, B 在空闲时去图书馆取出书 S。
|
||||
|
||||
```go
|
||||
func TestNewConsumer(t *testing.T) {
|
||||
// 定义一个图书馆
|
||||
type Library struct {
|
||||
driver DriverInterface
|
||||
}
|
||||
library := &Library{
|
||||
driver: NewDriver(100),
|
||||
}
|
||||
ctx := context.Background()
|
||||
|
||||
// 定义 A
|
||||
type A struct {
|
||||
produce ProduceInterface
|
||||
}
|
||||
|
||||
a := &A{
|
||||
produce: NewProduce(library.driver),
|
||||
}
|
||||
|
||||
// 定义 B
|
||||
type B struct {
|
||||
consumer ConsumerInterface
|
||||
}
|
||||
|
||||
b := &B{
|
||||
consumer: NewConsumer(library.driver),
|
||||
}
|
||||
|
||||
// 定义书 S 并且添加一些描述
|
||||
book := NewEvent("S")
|
||||
book.SetMsg("title", "hello world")
|
||||
book.SetCtxValue("pages", 120)
|
||||
|
||||
// A 把书 S 放到图书馆
|
||||
go func() {
|
||||
a.produce.Call(ctx, book)
|
||||
}()
|
||||
|
||||
// 模拟 B 去图书馆拿书
|
||||
book = b.consumer.Receive(ctx)
|
||||
fmt.Println(book.GetMsg("title"))
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
|
||||
#### 有返回event的使用例子
|
||||
|
||||
在上述流程的情景下, 我们添加一些条件, B 拿到书以后检查书是否损坏,如果损坏, 需要告诉 A 书损坏情况。
|
||||
|
||||
```go
|
||||
func TestNewConsumer(t *testing.T) {
|
||||
// 定义一个图书馆
|
||||
type Library struct {
|
||||
driver DriverInterface
|
||||
}
|
||||
library := &Library{
|
||||
driver: NewDriver(100),
|
||||
}
|
||||
ctx := context.Background()
|
||||
|
||||
// 定义 A
|
||||
type A struct {
|
||||
produce ProduceInterface
|
||||
}
|
||||
|
||||
a := &A{
|
||||
produce: NewProduce(library.driver),
|
||||
}
|
||||
|
||||
// 定义 B
|
||||
type B struct {
|
||||
consumer ConsumerInterface
|
||||
}
|
||||
|
||||
b := &B{
|
||||
consumer: NewConsumer(library.driver),
|
||||
}
|
||||
|
||||
// 定义书 S 并且添加一些描述
|
||||
book := NewEvent("S")
|
||||
book.SetMsg("title", "hello world")
|
||||
book.SetCtxValue("pages", 120)
|
||||
|
||||
// A 把书 S 放到图书馆
|
||||
go func() {
|
||||
waitMsg, err := book.UpdateWaitEvent(2 * time.Hour)
|
||||
require.NoError(t, err)
|
||||
a.produce.Call(ctx, book)
|
||||
|
||||
// A 等待 B 的回复, 但是他最多只会等待 B 2个小时
|
||||
ttlTime, err := book.GetTtlTimer()
|
||||
require.NoError(t, err)
|
||||
select {
|
||||
case msg := <-waitMsg:
|
||||
fmt.Println(msg)
|
||||
case <-ttlTime.C:
|
||||
fmt.Println("过期不等了")
|
||||
book.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
// 模拟 B 去图书馆拿书
|
||||
book = b.consumer.Receive(ctx)
|
||||
fmt.Println(book.GetMsg("title"))
|
||||
|
||||
// 书完好
|
||||
book.SetWaitResult("书完好")
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue