15

如何构建以事件驱动型实时信息系统

 3 years ago
source link: http://developer.51cto.com/art/202009/626771.htm
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

【51CTO.com快译】在如今竞争激烈的商业竞争环境中,数据的处理往往需要具有实时性。如果竞争对手在数据管控上事先采取了行动,那么势必会在业务上取得一定的优势。这也正是我们需要构建能够实时处理信息(数据)的企业级系统的重要原因。在实时处理数据时,我们除了可以将系统设计为以异步的方式,对事件进行操作之外,还可以使用同步的请求响应消息,来构建实时的系统。同时,为了保持此类系统能够高效地使用资源,我们需要了解如何基于事件驱动的方法,来构建实时的信息系统。

jM7F7nv.png!mobile

图:实时信息系统解决方案架构

如上图所示,各类事件通常来源于包括移动和Web应用在内的不同渠道。其中:

  • 接收事件组件,负责在任何给定的时间内处理大量的数据,其速率从每秒上千个事件、到每秒数上百万个事件不等。在接收到事件之后,该组件会将其传递给对应的处理组件。
  • 事件处理组件,负责对事件中带有的数据进行操作,包括:过滤,清理,转换和汇总等。根据组件的行为,事件处理既可以独立于事件接收组件、被单独地执行,也可以作为相关的操作来完成。事件处理组件可以将原始事件存储在各种数据商店(data store)中,以便进行审核(如果未被接收组件处理的话),并且可以根据用例,将计算结果存储在那些单独的数据商店之中。因此,大多数事件都能够被实时处理,并发布到事件发布组件中,以传递给使用者(consumer)。
  • 事件发布组件,负责将处理后的数据实时地推送给使用者。这些使用者既可以是移动或Web应用,又可以是对已处理事件起作用的其他系统。除了这些实时事件的发布之外,在某些用例中,我们还需要通过HTTP通道,以同步、或请求-响应的方式,将处理后的摘要信息发布给移动和Web应用。

上述参考架构可用于那些需要处置现场正在发生的事件,并将其发布到后台应用等多种用例中。例如:在出现紧急情况时,现场人员可以实时地发送有关需求与状况的细节信息,而后台团队则能够毫不拖延地进行必要的物资调配与派送。此外,我们也可以使用此类架构来构建农业的供应链。例如:农民们通过运货车辆将农作物运送到连锁超市。农民可以在农作物准备就绪时,更新其详细信息。而超市后台团队则会实时地从各个位置获取更新,并安排车辆及时地收集农作物,以避免延迟。

使用WSO2和Kafka的参考架构

消息代理是将消息发送者与接收者相分离的组件。目前,市场上有很多消息代理类产品,它们各有优、缺点。其中最流行的当属Kafka、NATS和RabbitMQ。当然,Kafka也可以作为NATS和RabbitMQ的最佳功能性代理。

在此,我们选择Kafka作为事件消息的代理;选择功能丰富、简单且开源的WSO2Streaming Integrator作为事件处理器;使用既支持流媒体、又支持REST风格的WSO2 API Manager,作为事件发布者。当然,这些组件也可以被市场上的其他类似工具所替换。下图展示了构成实时事件驱动型信息系统的各个组件,及其相互连接。

nyMNvuu.png!mobile

图:具有代理和WSO2平台的实时事件驱动型信息系统

在该架构中,事件代理会接收来自移动和Web应用等源头的事件负载。WSO2 Streaming Integrator会处理这些事件,然后将各种结果事件通过WebSocket连接,发布到WSO2 API Manager上。WSO2 API Manager拥有一个公布给网关的WebSocket API,诸如移动和Web之类的consumer(消费者)应用会使用该API,实时地接收各种事件。同时,WSO2 Streaming Integrator可以将原始事件和汇总的结果,通过标准的REST API,从WSO2 API Manager处公布给相关的consumer。下图对上述架构进行了细化。

u6jeUb.png!mobile

图:具有Kafka和WSO2平台详细信息的实时事件驱动型信息系统

如上图所示,事件源通过Kafka客户端,将事件发布到Kafka代理中那些可用的topic(主题)处。WSO2 Streaming Integrator不但可以订阅这些topic,还能通过已配置的Kafka源,实时地使用来自Kafka的各种事件。由Siddhi语言编写的各项操作将处理这些事件,并传递给诸如WebSocket之类的事件sink(接收器)。同时,WSO2 SI会按需通过各种数据商店(data stores),将事件存储到对应的数据库表中。

WSO2 API Manager通过WebSocket API来将WebSocket sink的详细信息配置到API的端点上。据此,那些使用WebSocket API的客户端应用将能实时地接收到已经处理的各种事件。

同时,那些已处理的信息和原始事件会被存储到一个通过WSO2 Enterprise Integrator公布了REST数据服务的数据库中。此处的数据服务是通过将WSO2 API Manager作为受保护的REST API予以公布,并通过客户端应用实现同步通信的服务。此外,作为一种能够支持大多数企业系统需求的成熟架构,我们可以通过扩展,来支持诸如:混合集成需求、API管理平台等多种企业用例。

只有WSO2平台,没有Kafka的参考架构

如果贵组织刚开始着手构建实时的事件驱动型信息系统,而且数据负载量并不大的话,那么就可以仅使用WSO2平台,来构建前文提到的精简版架构。下图展示了一种没有消息代理的实现方式。

ABzYzqe.png!mobile

图:具有WSO2平台的实时事件驱动型信息系统

该架构与前文提到的架构之间唯一的区别在于:虽然缺少事件代理,但是客户端应用能够通过HTTP的调用,将事件直接发送到WSO2 Streaming Integrator处。当然,由于该架构没有消息代理,因此WSO2 SI需要将原始事件存储在数据库中,以供各项审核。而它的其余功能则与前文的架构相同。下图展示了该架构的详细组成结构。

myaYbuZ.png!mobile

图:具有WSO2平台详细信息的实时事件驱动型信息系统

如上图所示,WSO2 SI被配置为通过HTTP接口来接收事件。而Siddhi应用中的HTTP源则被配置为通过不同的操作,来处理各种事件,然后发布到WebSocket sink中。同时,各种原始事件通过数据商店被存储在数据库中,并将各种聚合的结果通过不同的数据商店存储到另一个数据表里。除此之外,该系统的其余功能与前文提到的基于代理的实现方式基本一致。

从Kafka到Websocket Siddhi应用的示例代码

下面我们将给出一个Siddhi的应用示例。它能够从Kafka的topic中读取事件,并通过WebSocket服务器,将各种事件发布(或输出)到某个日志sink处。当然,在发布之前,它会对每个事件进行简单地检查(或筛选),以确保其数量小于500。具体代码请参见-- https://gist.github.com/chanakaudaya/efe8dfed2558811f0316a7839dbfef57 。其中,您可以找到有关如何使用Streaming Integrator,来设置Kafka的详细示例。同时,您也可以通过文档链接-- https://ei.docs.wso2.com/zh_CN/latest/streaming-integrator/examples/working-with-kafka/ ,来试运行该Siddhi应用。

如何创建连接到WebSocket端点的WebSocket API

如下图所示,您可以通过WSO2 API Manager的发布者(publisher)接口来创建WebSocket API,并使用WS服务器将这些事件发布到客户端。

NzyUBv.png!mobile

您可以在下图的上部菜单中选择“设计新的WebSocket API”(或“创建API”),然后在下一个窗口中提供详细的信息。

ZBNNRzq.png!mobile

接着,您可以选择“创建并发布”选项,将WebSocket API推送到开发人员的门户(portal)处,以便用户在其中使用有效的OAuth2令牌。

通过参考文档链接-- https://apim.docs.wso2.com/zh-CN/latest/learn/tutorials/create-and-publish-websocket-api/#create-and-publish-a-websocket-api ,您可以逐步了解到如何创建WebSocket API,并能够试运行其客户端的示例。

作为拓展,您还可以从如下链接处,获得有关WSO2的大量代码示例:

原标题:How To Build a Real-Time, Event-Driven Information System ,作者: Chanaka Fernando

【51CTO译稿,合作站点转载请注明原文译者和出处为51CTO.com】

【责任编辑:庞桂玉 TEL:(010)68476606】


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK