在实际项目中,我们经常需要异步处理事件与数据。比如MVC模型中处理请求的Filter链,又如在nginx中或是linux的iptables中,都会有一个处理链条,来一步步的顺序处理一个请求。此外基于集中存储与分发的模式,实现事件与数据的异步处理,对于提升系统响应程度,实现业务处理的解耦至关重要。本文以eosc(一个高性能中间件开发框架)中的代码为例子,看看如何在我们的实际项目中,实现这样的功能。
代码eosc提供了关于dispatcher的关键实现的两个文件,分别是dispatch.go和data-dispatch.go,具体的代码地址是:https://github.com/eolinker/eosc/tree/main/common/dispatcher。
这两个文件中实现的结构体与接口的关系如图所示:
(资料图片)
dispatcher关键接口与结构体的关系
1、dispatch.go文件在dispatch.go文件中,esco提供了IEvent、CallBackHandler、IListener三个重要的接口。
同时通过CallBackFunc来实现接口CallBackHandler, tListener来实现IListener。
//2个接口type CallBackHandler interface { DataEvent(e IEvent) error}type IListener interface { Leave() Event() <-chan IEvent}
/*CallBackFunc实现了CallBackHandler,同时CallBackFunc又是一个接受IEvent为参数,返回error的函数*/type CallBackFunc func(e IEvent) errorfunc (f CallBackFunc) DataEvent(e IEvent) error { return f(e)}
//实现了IListener接口func (t *tListener) Leave() { t.Once.Do(func() { atomic.StoreUint32(&t.closed, 1) close(t.c) })}func (t *tListener) Event() <-chan IEvent { return t.c}
注意:tListener还提供了一个Handler方法,这个方法的参数与返回结果与CallBackFunc一样,也就是说它实现的Handler方法是一种CallBackFunc,这个在后面的分发处理逻辑的注册中会用到。
func (t *tListener) Handler(e IEvent) error { if atomic.LoadUint32(&t.closed) == 0 { t.c <- e return nil } return ErrorIsClosed}2、data-dispatch.go文件
该文件提供了两种dispatcher创建方法,分别是NewDataDispatchCenter、NewEventDispatchCenter。这两个方法都是创建了DataDispatchCenter结构体(这个结构体后面会讲到),但是启动的处理协程不同,NewDataDispatchCenter启动的是doDataLoop,NewEventDispatchCenter启动的是doEventLoop。
//两种DispatchCenter创建方法func NewDataDispatchCenter() IDispatchCenter { ctx, cancelFunc := context.WithCancel(context.Background()) center := &DataDispatchCenter{ ctx: ctx, cancelFunc: cancelFunc, addChannel: make(chan *_CallbackBox, 10), eventChannel: make(chan IEvent), } go center.doDataLoop() return center}func NewEventDispatchCenter() IDispatchCenter { ctx, cancelFunc := context.WithCancel(context.Background()) center := &DataDispatchCenter{ ctx: ctx, cancelFunc: cancelFunc, addChannel: make(chan *_CallbackBox, 10), eventChannel: make(chan IEvent), } go center.doEventLoop() return center}//DataDispatchCenter 数据广播中心type DataDispatchCenter struct { addChannel chan *_CallbackBox eventChannel chan IEvent ctx context.Context cancelFunc context.CancelFunc}
DataDispatchCenter这个结构体中有两个chan,一个是addChannel,一个是eventChannel。
addChannel | 接受_CallbackBox,这个_CallbackBox提供了逻辑处理Handler |
eventChannel | 接受IEvent,触发 |
NewEventDispatchCenter方法中启动的doEventLoop,逻辑相对简单,创建的channels用于存储addChannel发送过来的_CallbackBox,即事件处理Handler.当eventChannel收到事件后,遍历channels中的每一个_CallbackBox,并调用相应的Handler处理。
doEventLoop状态图
具体代码可以查看:https://github.com/eolinker/eosc/blob/main/common/dispatcher/data-dispatch.go#L48。
doDataLoop逻辑:NewDataDispatchCenter方法中启动的doDataLoop,这个逻辑稍微复杂点。其实它的大致流程和doEventLoop,不同的是每个新增加的_CallbackBox,需要对当前接收并缓存的所有Event键值对进行处理。而doEventLoop是不会的,新增加的_CallbackBox,只会对在它之后接收的Event生效。下面的代码InitEvent(data.GET())很有意思。
首先InitEvent实现了IEvent接口,是一种IEvent。type InitEvent map[string]map[string][]byte (代码链接:https://github.com/eolinker/eosc/blob/main/common/dispatcher/data.go#L88)InitEvent是一个map,可以通过InitEvent(data.GET())初始化。func (d *DataDispatchCenter) doDataLoop() { data := NewMyData(nil) channels := make([]*_CallbackBox, 0, 10) isInit := false for { select { case event, ok := <-d.eventChannel: if ok { isInit = true data.DoEvent(event) next := channels[:0] for _, c := range channels { if err := c.handler(event); err != nil { close(c.closeChan) continue } next = append(next, c) } channels = next } case hbox, ok := <-d.addChannel: { if ok { if !isInit { channels = append(channels, hbox) } else { if err := hbox.handler(InitEvent(data.GET())); err == nil { channels = append(channels, hbox) } } } } } }}应用
创建EventServer。
type EventServer struct { IDispatchCenter}func NewEventServer() *EventServer { es := &EventServer{ IDispatchCenter: NewDataDispatchCenter(), } return es}
定义事件。
type MyEvent struct { namespace string key string event string data []byte}func (m *MyEvent) Namespace() string { return m.namespace}func (m *MyEvent) Event() string { return m.event}func (m *MyEvent) Key() string { return m.key}func (m *MyEvent) Data() []byte { return m.data}
定义Handler并注册。
func Handler(e IEvent) error { //根据自己的业务要求}es.Register(Handler)
发送事件。
es.Send(&MyEvent{ namespace: "a", key: "b", event: "set", data: []byte(fmt.Sprint(index)),})
转载本文可以通过以下二维码关注联系。