diff --git a/doc/pkg/event/事件驱动使用文档.md b/doc/pkg/event/事件驱动使用文档.md new file mode 100644 index 0000000..d490667 --- /dev/null +++ b/doc/pkg/event/事件驱动使用文档.md @@ -0,0 +1,154 @@ +### 事件驱动使用文档 + + + +- 包目录: pkg/event +- 使用场景, 异步推送数据 + + + +#### 事件驱动基本概念 + +- event + + event 是事件驱动的事件部分, 主要负责一些信息的传递, 一共分为,有等待和无等待的事件, 无等待事件 consumer 消费事件后,不会给 produce 回复, 有等待事件, 支持事件消费后, 向 produce 发送一个回复。 + +- driver + + driver 是事件的驱动, 主要复制连接 produce, 以及 consumer, 其中维护了一个 event 的队列。 + +- produce + + produce 主要负责 推送 事件, 每个 produce 都需要维护一个 事件驱动通过 driver 来进行数据的推送。 + +- consumer + + consumer 负责接收事件, 每个 consumer 都需要注册一个 事件驱动 来获取 event。 + +#### + +#### 应用场景 + +​ 事件驱动主要用于 异步消息的推送(目前的实现也支持 同步的消息实现), 一般来说 produce 发送 event 后 event 被消费过程会完全于 produce 无关。 + + + +#### 无返回 event 使用例子 + +​ 我们使用 借书为例子使用, 假设 A 要把 书 S 交还给 B, 他们约定, A 把书 S 放到图书馆, B 在空闲时去图书馆取出书 S。 + +```go +func TestNewConsumer(t *testing.T) { + // 定义一个图书馆 + type Library struct { + driver DriverInterface + } + library := &Library{ + driver: NewDriver(100), + } + ctx := context.Background() + + // 定义 A + type A struct { + produce ProduceInterface + } + + a := &A{ + produce: NewProduce(library.driver), + } + + // 定义 B + type B struct { + consumer ConsumerInterface + } + + b := &B{ + consumer: NewConsumer(library.driver), + } + + // 定义书 S 并且添加一些描述 + book := NewEvent("S") + book.SetMsg("title", "hello world") + book.SetCtxValue("pages", 120) + + // A 把书 S 放到图书馆 + go func() { + a.produce.Call(ctx, book) + }() + + // 模拟 B 去图书馆拿书 + book = b.consumer.Receive(ctx) + fmt.Println(book.GetMsg("title")) +} +``` + + + +#### 有返回event的使用例子 + +​ 在上述流程的情景下, 我们添加一些条件, B 拿到书以后检查书是否损坏,如果损坏, 需要告诉 A 书损坏情况。 + +```go +func TestNewConsumer(t *testing.T) { + // 定义一个图书馆 + type Library struct { + driver DriverInterface + } + library := &Library{ + driver: NewDriver(100), + } + ctx := context.Background() + + // 定义 A + type A struct { + produce ProduceInterface + } + + a := &A{ + produce: NewProduce(library.driver), + } + + // 定义 B + type B struct { + consumer ConsumerInterface + } + + b := &B{ + consumer: NewConsumer(library.driver), + } + + // 定义书 S 并且添加一些描述 + book := NewEvent("S") + book.SetMsg("title", "hello world") + book.SetCtxValue("pages", 120) + + // A 把书 S 放到图书馆 + go func() { + waitMsg, err := book.UpdateWaitEvent(2 * time.Hour) + require.NoError(t, err) + a.produce.Call(ctx, book) + + // A 等待 B 的回复, 但是他最多只会等待 B 2个小时 + ttlTime, err := book.GetTtlTimer() + require.NoError(t, err) + select { + case msg := <-waitMsg: + fmt.Println(msg) + case <-ttlTime.C: + fmt.Println("过期不等了") + book.Close() + } + }() + + // 模拟 B 去图书馆拿书 + book = b.consumer.Receive(ctx) + fmt.Println(book.GetMsg("title")) + + // 书完好 + book.SetWaitResult("书完好") + time.Sleep(50 * time.Millisecond) +} +``` + +​ +