4

技术干货 | 如何利用 MongoDB Change Streams 实现数据实时同步?

 1 year ago
source link: https://mongoing.com/archives/82756
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

如何利用 MongoDB Change Streams 实现数据实时同步?当前实时数据同步的应用场景较多,实现方式主要有两种,一是数据库厂家本身提供了实时数据捕获工具,如 Oracle 的 OGG 等;另外一种是实时解析数据库的事务日志,获取到实时变化的数据后进行同步,如 Flink CDC 等。对于 MongoDB 复制集来说,默认情况下,成员间通过 Oplog 实现的数据同步是有延迟的。因此,为了实现数据的实时同步,且能将数据同步到异构系统中,从3.6版本开始,MongoDB 提供了 Change Steams 功能,允许用户非常方便地将实时变更数据同步到下游系统进行处理。其实在3.6版本之前,如果要实现这种实时同步,开发人员也可以通过实时解析复制集 Oplog 里面的日志条目来完成,只不过这种方式需要额外开发代码,实现起来较复杂。实现原理

在应用程序里面,开启数据库或集合上的监听,一旦捕获到数据变更事件,就会产生变更流数据(类型为文档),变更流里面包含具体的动作(如 insert、delete、update 等)和变更的文档,应用程序可以将此变更流数据发送到下游系统,由下游系统进一步处理(如完成下游系统相应数据变更,实现数据实时同步)。

本质上,Change Streams 特性,可以完成与 Kafka 或 RabbitMQ 等消息组件类似的功能,这样当需要将 MongoDB 集群中的数据,向异构系统实时同步时,我们就不需要额外再部署一套类似 Kafka 等消息处理的集群了。

Change Streams 整体流程如下图所示:

Change Streams 整体流程

可以看到直接打开 MongoDB 的 Change Streams 变更流监听,就可以实现向异构下游系统实时同步数据。。实时流数据的格式

复制集与下游系统间的数据同步依赖于实时生成的变更流数据,实时流数据的格式为文档类型,包含如下字段:

{
   _id : { <BSON Object> },        //已打开的变更流标识,可以作为值赋给参数resumeAfter,用来后续恢复此变更流
   "operationType" : "<operation>",        //发生的变更操作类型,如:insert、delete、update等
   "fullDocument" : { <document> },      //变更操作所涉及的完整文档数据,删除操作里面没有这个字段
   "ns" : {   
      "db" : "<database>",         //变更操作发生在哪个数据库上
      "coll" : "<collection>"     //变更操作发生在哪个集合上
   },
   "to" : {   //当操作类型为rename时,才显示这几个字段
      "db" : "<database>",         //变更后的新数据库名
      "coll" : "<collection>"     //变更后的新集合名
   },
   "documentKey" : { "_id" : <value> },  //变更操作所涉文档的_id字段值
   "updateDescription" : {    //修改操作描述
      "updatedFields" : { <document> },  //修改操作修改了什么字段及值
      "removedFields" : [ "<field>", ... ]     //修改操作删除了什么字段及值
   }
   "clusterTime" : <Timestamp>,     //变更操作对应的Oplog日志条目上的时间
   "txnNumber" : <NumberLong>,    //如果变更操作在一个多文档事务里面执行,则显示此字段及值,表示事务的编号
   "lsid" : {          //表示事务所在的Session相关信息
      "id" : <UUID>,  
      "uid" : <BinData> 
   }
}

打开实时数据流

打开一个实时数据流,会返回一个 cursor,变更的数据可以通过循环遍历 cursor 获得,相当于打开一个水龙头,水会源源不断地流过来。

针对不同编程语言的驱动,MongoDB 都提供了相应的 API 来打开实时数据流,下面以 Python 为例子进行说明,如下客户端应用代码:

from pymongo import MongoClient
import pprint
client=MongoClient('mongodb://192.168.85.128:60001,192.168.85.128:60002, 192.168.85.128:60003/?replicaSet=rs0')
db = client.crm
cursor = db.inventory.watch() 
for doc in cursor:
        print(doc)

其中,语句db.inventory.watch()表示打开一个实时变更流,监听集合 inventory 上的任何数据变化。

for 循环语句对游标循环遍历,实时打印变更流里面的文档。

先运行上面的代码,再通过 mongo 连接到复制集,模拟向 inventory 集合插入、修改、删除数据,观察上面的代码是否能实时输出流数据。

插入数据语句如下:

rs0:PRIMARY> db.inventory.insert({ "_id" : 20, "model" : "SIM", "count" : 1000})

如果实时输出如下流数据,说明打开的实时数据流是正确的:

{'operationType': 'insert', 'clusterTime': Timestamp(1594645788, 1), 'ns': {'coll': 'inventory', 'db': 'crm'}, 'documentKey': {'_id': 20.0}, 'fullDocument': {'model': 'SIM', '_id': 20.0, 'count': 1000.0}, '_id': {'_typeBits': b'@', '_data': '825F0C5D1C000000012B022C0100296E5A10040CBC8551DC064D74B3BFCD35FAF2377D461E5F6964002B280004'}}

同理,测试删除数据,如下语句:

rs0:PRIMARY> db.inventory.deleteOne({"_id":20})

也能实时输出如下信息:

{'operationType': 'delete', '_id': {'_typeBits': b'@', '_data': '825F0C5E3A000000012B022C0100296E5A10040CBC8551DC064D74B3BFCD35FAF2377D461E5F6964002B280004'}, 'clusterTime': Timestamp(1594646074, 1), 'ns': {'coll': 'inventory', 'db': 'crm'}, 'documentKey': {'_id': 20.0}}

*注意:删除变更操作,输出流数据不包含字段'fullDocument'

最后,测试下修改数据,如下语句:

rs0:PRIMARY> db.inventory.update({"_id":19},{$set:{"count":2999}})

实时输出如下流数据:

{'operationType': 'update', 'updateDescription': {'removedFields': [], 'updatedFields': {'count': 2999.0}}, 'clusterTime': Timestamp(1594646292, 1), 'ns': {'coll': 'inventory', 'db': 'crm'}, 'documentKey': {'_id': 19.0}, '_id': {'_typeBits': b'@', '_data':'825F0C5F14000000012B022C0100296E5A10040CBC8551DC064D74B3BFCD35FAF2377D461E5F6964002B260004'}}

注意:默认情况下对于 update 操作,输出的实时流数据也不会包含字段'fullDocument';但是可以在打开变更流的方法里传入可选参数full_document= 'updateLookup'实现输出的实时流数据包含'fullDocument'字段及值,如带参数语句:cursor = db.inventory.watch(full_document='updateLookup')

控制实时流数据的输出

在有些场景下,需要控制实时流的输出,希望将不同的流数据传给不同的下游系统进行处理,类似快递公司的包裹分拣系统,将送往不同地方的包裹分开,如下图所示:

e843d779ce099aa.png

MongoDB提供了一种管道模式来处理这些数据流,当流数据经过预先配置好的管道时,数据会依次被管道中的每一个步骤进行处理。

这种数据处理模式与MongoDB自带的管道模式聚集框架类似。

如下代码示例:

from pymongo import MongoClient
import pprint
client= MongoClient('mongodb://192.168.85.128:60001,192.168.85.128:60002, 192.168.85.128:60003/?replicaSet=rs0')
db = client.crm
pipeline = [
    {'$match':{'fullDocument.model':'SIM'}},
    {'$addFields':{'newField':'this is an added field'}}
]
cursor = db.inventory.watch(pipeline=pipeline)
for doc in cursor:
        print(doc)

先构造一个管道,然后在打开实时数据流时传入管道参数。

通过管道参数,从数据流里过滤出满足'fullDocument.model':'SIM'条件的数据流,然后再向数据流添加一个额外的'newField'字段。经过管道处理后的数据流可以被下游系统作进一步处理。

针对 MongoDB 4.2 版本,其它还可被使用的管道操作符有:$project$replaceRoot$replaceWith$redact$set$unset

注意:上面代码对实时数据流的处理只是简单的循环打印,如果需将数据实时同步到其它系统中,如 MySQL、Hbase 等,需要应用开发人员进一步编写相应的逻辑代码进行处理。

关于作者:郭远威

MongoDB 中文社区长沙分会主席,资深大数据架构师,著有《大数据存储MongoDB实战指南》一书。通信行业业务架构与数据迁移专家,先后在华为、中兴工作十余年;曾负责实施了海外多个运营商的大数据迁移及 BI 等大数据系统的设计开发。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK