wheat-cache/doc/pkg/event/事件驱动使用文档.md

3.1 KiB
Raw Blame History

事件驱动使用文档

  • 包目录: pkg/event
  • 使用场景, 异步推送数据

事件驱动基本概念

  • event

    event 是事件驱动的事件部分, 主要负责一些信息的传递, 一共分为,有等待和无等待的事件, 无等待事件 consumer 消费事件后,不会给 produce 回复, 有等待事件, 支持事件消费后, 向 produce 发送一个回复。

  • driver

    driver 是事件的驱动, 主要复制连接 produce 以及 consumer 其中维护了一个 event 的队列。

  • produce

    produce 主要负责 推送 事件, 每个 produce 都需要维护一个 事件驱动通过 driver 来进行数据的推送。

  • consumer

    consumer 负责接收事件, 每个 consumer 都需要注册一个 事件驱动 来获取 event。

应用场景

事件驱动主要用于 异步消息的推送(目前的实现也支持 同步的消息实现), 一般来说 produce 发送 event 后 event 被消费过程会完全于 produce 无关。

无返回 event 使用例子

我们使用 借书为例子使用, 假设 A 要把 书 S 交还给 B 他们约定, A 把书 S 放到图书馆, B 在空闲时去图书馆取出书 S。

func TestNewConsumer(t *testing.T) {
	// 定义一个图书馆
	type Library struct {
		driver DriverInterface
	}
	library := &Library{
		driver: NewDriver(100),
	}
	ctx := context.Background()

	// 定义 A
	type A struct {
		produce ProduceInterface
	}

	a := &A{
		produce: NewProduce(library.driver),
	}

	// 定义 B
	type B struct {
		consumer ConsumerInterface
	}

	b := &B{
		consumer: NewConsumer(library.driver),
	}

	// 定义书 S 并且添加一些描述
	book := NewEvent("S")
	book.SetMsg("title", "hello world")
	book.SetCtxValue("pages", 120)

	// A 把书 S 放到图书馆
	go func() {
		a.produce.Call(ctx, book)
	}()

	// 模拟 B 去图书馆拿书
	book = b.consumer.Receive(ctx)
	fmt.Println(book.GetMsg("title"))
}

有返回event的使用例子

在上述流程的情景下, 我们添加一些条件, B 拿到书以后检查书是否损坏,如果损坏, 需要告诉 A 书损坏情况。

func TestNewConsumer(t *testing.T) {
// 定义一个图书馆
type Library struct {
driver DriverInterface
}
library := &Library{
driver: NewDriver(100),
}
ctx := context.Background()

// 定义 A
type A struct {
produce ProduceInterface
}

a := &A{
produce: NewProduce(library.driver),
}

// 定义 B
type B struct {
consumer ConsumerInterface
}

b := &B{
consumer: NewConsumer(library.driver),
}

// 定义书 S 并且添加一些描述
book := NewEvent("S")
book.SetMsg("title", "hello world")
book.SetValue("pages", 120)

// A 把书 S 放到图书馆
go func() {
book.InitWaitEvent()
a.produce.Call(ctx, book)

// A 等待 B 的回复, 但是他最多只会等待 B 2个小时
res, err := book.StartWaitEvent(2 * time.Hour)
require.NoError(t, err)
fmt.Println(res)
}()

// 模拟 B 去图书馆拿书
book = b.consumer.Receive(ctx)
fmt.Println(book.GetMsg("title"))

// 书完好
book.ExecWorkAndSendResult(func() (interface{}, error) {
// b 检查书
return "OK", nil
})
time.Sleep(50 * time.Millisecond)
}