27

Change Stream源码解读

 3 years ago
source link: https://mongoing.com/archives/75336
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

nMZnEn6.png!mobile

MongoDB从3.6开始推出了 Change Stream 功能,提供实时的增量数据流功能,为同步、分析、监控、推送等多种场景使用带来福音。4.0中引入的混合逻辑时钟,可以支持分片集群在不关闭balancer的情况下,吐出的增量数据在即使发生move chunk发生的情况下,还能够保证数据的因果一致性。不但如此,随着4.0.7开始推出的 High Water Mark 功能,使得返回的change stream cursor包括 Post Batch Resume Token ,更好的解决Change Stream中ResumeToken推进的问题。关于Change Stream的功能解读,网上可以找到比较多的资料,比如张友东的这篇解读介绍了Change Stream与oplog拉取的对比以及基本的使用。本文将主要侧重从 内核源码层面 进行解读,主要介绍分片集群版下Change Stream在mongos和mongod上都执行了哪些操作。此外,由于4.0开始MongoDB使用了混合逻辑时钟,从而保证了move chunk的因果一致性,所以本文还会先简单介绍一下MongoDB中混合逻辑时钟的原理。

文本主要内容将会覆盖以下几块:

  1. Change Stream基本功能
  2. 混合逻辑时钟
  3. Change Stream具体处理流程
  4. 总结

名词解释:

  • oplog:MongoDB增量记录,每次修改操作都会产生一条oplog。
  • event:Change Stream中返回的一条记录,表示一次变更。
  • mongos:分片集群中的proxy层。
  • shard/mongod:本文中2个概念一致,都表示分片集群的单个分片。
  • ResumeToken:当前oplog/event的位点,Change Stream根据这个进行断点续传。
  • PBRT:Post Batch Resume Token。MongoDB中最新的ResumeToken。
  • HLC:Hybrid Logical Clock。混合逻辑时钟。
  • Pipeline stage:表示aggregate操作过程中的各个阶段。
  • 时钟/时间戳/混合逻辑时钟/HLC:本文中这几个概念一致,都表示时间戳。

1. Change Stream基本功能

Change Stream的功能主要就一个:推送实时的增量变更数据流。也就是说,MongoDB上进行的所有DML操作(插入、删除、修改)以及部分DDL操作(删表、删库)都可以被推送出来。那么,基于这个功能,就可以实现很多功能,例如:

  • 分析。比如我们需要对MongoDB的数据进行分析,不断拉出用户的更新,推送到下游的分析平台。

  • 迁移/同步/备份。比如把A数据库热迁移到B数据库,数据库形态可以是副本集、集群版。

  • 推送。比如我们用手机地图查看我们需要等待的公交离我们还有几站,我们希望每次公交位置都自动告知用户,而不是我们自己每次去不断刷新主动拉取。

  • 监控。比如有些表是敏感表,我们希望这些表的变更都能告知使用方,防止攻击&误操作。

目前, MongoShake 从v2.4版本开始支持从Change Stream对接,从而,用户可以非常灵活的基于Change Stream来实现以上多种需求。

1.1 Change Stream event各个字段解释  

在使用 watch 开始监听整个数据库/db/表以后,一旦有符合条件的变更,Change Stream将会吐出一条event代表一次表更(插入/删除/修改/删除/删表等),下面是一条event的具体字段解析:

{
  _id : { // 存储元信息
       "_data" : <BinData|hex string> // resumeToken,用于断点续传
  },
   "operationType" : "<operation>", // 包括:insert, delete, replace, update, drop, rename, dropDatabase, invalidate
   "fullDocument" : { <document> }, // 修改后的数据,出现在insert, replace, delete, update. 相当于oplog中的o字段
   "ns" : { // db和collection的信息
       "db" : "<database>",
       "coll" : "<collection"
  },
   "to" : { // 只在operationType==rename的时候有效,表示改名以后的ns
       "db" : "<database>",
       "coll" : "<collection"
  },
   "documentKey" : { "_id" : <value> }, // 相当于o2字段。出现在insert, replace, delete, update。正常只包含_id,对于sharded collection,还包括shard key。
   "updateDescription" : { // 只在operationType==update的时候出现,相当于是增量的修改,出现在$set和$uset场景,区别于replace的整个文档替换。
       "updatedFields" : { <document> }, // 更新的field的值
       "removedFields" : [ "<field>", ... ] // 删除的field列表
  }
   "clusterTime" : <Timestamp>, // 逻辑时钟,相当于ts字段
   "txnNumber" : <NumberLong>, // 相当于oplog里面的txnNumber,只在事务里面出现。事务号在一个事务里面单调递增
   "lsid" : { // logic session id,请求所在的session的id。相当于oplog的lsid字段。
       "id" : <UUID>,
       "uid" : <BinData>
  }
}

Change Stream的event本身是从oplog翻译过来的,所以其中的字段跟oplog字段比较类似。可以看到每一条change stream event都包括一个ResumeToken用于断点续传。

2. 混合逻辑时钟(Hybrid Logical Clock,简称HLC)

本小节会介绍MongoDB关于HLC的使用情况,HLC的具体概念大家感兴趣的话可以查看下面几篇论文:

混合逻辑时钟是解决分布式场景 事件定序 的问题。也就是2个具有 happened-before关系 的事件,他们的混合逻辑时钟将具有大小关系。MongoDB中的混合逻辑时钟为64bit,包括了高位的物理时钟32bit,和低位计数32bit。比如oplog中的ts字段,以及请求/回复消息中的ClusterTime字段等,都是混合逻辑时钟。下面给出了oplog含有ts字段的部分结构:

{
   "ts" : Timestamp(1571389994, 1), // 混合逻辑时钟  
   "t" : NumberLong(1),
   "v" : 2,
   "op" : "i",
  ...
}

HLC的比较是64位一起比较的:

  • Timestamp(1000, 2) > Timestamp(1000, 1)
  • Timestamp(1001, 1) > Timestamp(1000, 5)

在MongoDB中,写请求是会 推进 HLC的,所有请求和请求的返回都是会 跟踪 HLC的。写请求 推进 举例:mongos1的HLC为Timestamp(1002, 1),mongos1发送一条写请求给mongod1。那么mongod1收到该写请求后,查看本地物理时钟(假设是1005),那么对比收到的消息的高位跟本地物理时钟,发现是本地的大,就更新mongod1时钟为Timestamp(1005, 0)。mongod1写完毕后,会把本地的时钟携带到回复消息中发送回给mongos1,然后mongos1 跟踪 该HLC从Timestamp(1002, 1)变成Timestamp(1005, 0)。另外一种情况,最开始mongos1的HLC分别为Timestamp(1002, 1),mongod1收到mongos1的写请求后,本地HLC是1001小于携带过来的请求的1002,则高位为1002,并且递增低位时钟到Timestamp(1002, 2);同理mongod1回复给mongos1的消息会把mongos1的HLC更新为Timestamp(1002, 2)。读请求同样会 跟踪 mongos1的HLC为Timestamp(1002, 1),mongos1发送一条读请求给mongod1。那么mongod1收到该请求后,如果本地时钟小于mongos1的,则更新本地时钟为Timestamp(1002, 1);如果本地时钟大,则不会进行更新。MongoDB中的HLC应用比较多,比如解决change stream全局定序、session内的read own write、跨shard访问时钟对齐、分布式事务时钟对齐等问题。这里只介绍change stream如何解决move chunk的乱序问题:

2.1 move chunk乱序问题 

如果不关闭balancer,在move chunk的情况下,shard并发拉取会产生什么问题? Qva6RnZ.png!mobile 以上图为例,最开始 {_id:1} 位于shard1上面,用户执行了更新操作修改为 {_id:1, a:1} 。然后,发生了move chunk操作,对应 {_id:1} 的chunk从shard1上挪到了shard2,然后shard2上执行了更新操作: {_id:1, a:3} ,最后a的结果是3。但是通常情况下,对于同步工具来说,拉取不同shard是一个并发的过程,以MongoShake举例,假设此时线程1拉取shard1,线程2拉取shard2,由于shard1的cpu/带宽/内存/网络io等多种原因,导致shard2的拉取进度快于shard1了,先拉取ts=102进行回放产生的结果是 {_id:1, a:3} ,然后是ts=100,最后 {_id:1} 对应的结果就是 {_id:1, a:1} 。这显然是不符合预期,破坏了因果序,所以MongoShake中,用户如果采用oplog进行拉取,那么对于源端MongoDB是分片集群,必须 关闭balancer 以规避这种情况。而在Change Stream过程中,mongos本身会对拉取到的event进行排序,从而保证了因果一致性。下面章节会具体介绍内部处理细节。

JZb2EzY.png!mobile

3. Change Stream具体处理流程

客户端发送的Change Stream命令是基于aggregate框架实现的,只不过添加了一个特殊的$changestream stage:

"pipeline" : [
{
   "$changeStream" : {
     "fullDocument" : "default",
     "startAtOperationTime" : Timestamp(1580867312, 3)
  }
}
],

change stream的断点续传是根据ResumeToken来进行的,下面会具体介绍其用途,这里介绍一下ResumeToken的结构:

struct ResumeTokenData {
  Timestamp clusterTime; //HLC
  int version = 0; // 版本号,4.0.7以前为0,4.0.7增加了PBRT,版本号为1
  size_t applyOpsIndex = 0; // applyOps内部的index
  Value documentKey; // 包含_id和shardKey
  boost::optional<UUID> uuid; //表的uuid
};

由于ResumeToken维持不同的版本字段version,所以在版本升级后,有可能出现不同版本 token 无法识别的问题,所以尽量要让 MongoDB Server 所有组件(各个mongod,config-server、mongos)都保持相同的内核版本。下面以分片集群为例,介绍客户端、mongos、mongod对Change Stream的处理流程,其中mongos又分为消息分发和聚合2个步骤。

3.1 客户端 

客户端的行为比较简单,用户可以指定一些配置项来创建change stream:

  • ResumeAfter。根据指定的token进行断点续传。

  • StartAfter。根据指定的token进行断点续传,与ResumeAfter不同的是,StartAfter支持从invalidate event中进行恢复。例如,监听的collection被删除了就会返回invalidate event。

  • StartAtOperationTime。根据指定的时间戳进行启动/断点续传。

  • FullDocument。正常情况下对于$set/$unset,只返回部分修改的字段,但如果FullDocument设置为updateLookup,则会返回整个更新后的文档。

  • 其他配置:batchSize、MaxAwaitTime等。

Change Stream创建后,driver将会发送封装第一个stage为 $changestream 的aggreate命令给MongoDB,服务端建立成功后返回cursor给客户端。客户端调用getNext获取下一条event事件,对应driver实现是查看当前缓存队列中是否还有event,有则拉出第一条直接返回给上层,没有则发送getMore命令,然后对于返回的一批event进行缓存用于下次返回。driver缓存数据的同时还会存储PBRT(resumeToken),用于处理网络抖动等连接断开情况下的自动断点续传;同时,上层应用也可以根据这个PBRT进行位点的推进以及断点续传。下面是MongoDB对于aggregate请求和getMore请求的返回的具体消息结构:

/**
* Response to a successful aggregate.
*/
{
  ok: 1,
  cursor: {
      ns: String,
      id: Int64,
      firstBatch: Array<ChangeStreamDocument>, // 返回的batch数据数组
      /**
      * postBatchResumeToken is returned in MongoDB 4.0.7 and later.
      */
      postBatchResumeToken: Document // PBRT
  },
  operationTime: Timestamp,
   $clusterTime: Document,
}

/**
* Response to a successful getMore.
*/
{
  ok: 1,
  cursor: {
      ns: String,
      id: Int64,
      nextBatch: Array<ChangeStreamDocument> // 返回的batch数据数组
      /**
      * postBatchResumeToken is returned in MongoDB 4.0.7 and later.
      */
      postBatchResumeToken: Document // PBRT
  },
  operationTime: Timestamp,
   $clusterTime: Document,
}

3.2 mongos分发 

mongos第一次收到Change Stream命令,调用dispatchShardPipeline构建 $changestream aggregate分发到 所有的 shard。目前mongos做法,无论shard上是否有对应客户端指定的db/collection,都会进行分发。这个是为了后续用户发起shardCollection,movePrimaryShard等操作的考虑。这个构建的mongod的$changestream aggregate命令跟mongos本身收到的基本一样,但是额外添加了几个选项方便mongod逻辑处理,例如: {fromMongos: true, needsMerge: true, mergeByPBRT: true} 等。这样,mongos上就构建了到各个shard的cursor。接着,根据这些mongod对应的cursor构建mergePipeline,处理各个shard返回change stream event的合并逻辑。构建mergePipeline的过程,主要就是串行添加各个stage的过程(3.4将会详细介绍各个stage的功能):

  • DocumentSourceMergeCursors 。合并各个shard返回的cursor。
  • DocumentSourceUpdateOnAddShard 。处理新增shard的情况。
  • DocumentSourceCloseCursor 。处理invalidate事件。
  • DocumentSourceLookupChangePostImage 。如果是event类型是update,且设置了 FullDocument=updateLoopup ,则会执行额外的query。

对于aggregate的pipeline来说,就是一个个串行stage,上游的输出作为下游的输入。所以最后,ClusterCursorManager缓存这个mergePipeline的client cluster cursor,下一次针对这个tailable的cursor一旦用户发起了getMore请求,就相当于 mergePipeline串行 跑一遍上述各个stage。

3.3 mongod 

shard收到mongos发送的$changestream aggregate请求后,构建 shardPipeline ,同样也包括多个stage:

DocumentSourceOplogMatch
DocumentSourceChangeStreamTransform
DocumentSourceCheckInvalidate
DocumentSourceShardCheckResumability

下面依次介绍各个stage做的工作:

3.3.1 DocumentSourceOplogMatch

构建oplog query cursor,其filter比较复杂,总结来说,就是过滤moveChunk,noop oplog等无关oplog,找到当前change stream所关注的oplog。下面就是具体匹配的filter:

$and:[
  {"ts": {$gte: startFrom}}, // 如果指定了resumeAfter则从这个startFrom开始,否则是$gtlastAppliedTs
  {$or: 
      [
          // opMatch
          {
               "ns": inputNamespace,
               $or: [ 
                  {"op": {$ne: "n"}}, // 不为noop, 表示是普通的CRUD操作
                  {"op": "n", "o2.type": "migrateChunkToNewShard"}, // chunk mirgrate到新的shard上
              ],
          } ,
          // commandMatch,DDL:
          {
               "op": "c",
               $or: [
                  // commandsOnTargetDb
                   $and: [
                      {"ns": inputDb.$cmd}, //如果不是指定全局则用这个,否则是正则匹配所有除local, admin, config之外的db
                       $or: [
                          {o.drop: kRegexAllCollections}, // kRegexAllCollections表示匹配所有的collection,
                          {o.dropDatabase: {$exist: true}}, //drop database操作
                          {o.renameCollection: inputNamespace},
                      ]
                  ]
                  // renameDropTarget
                  {o.to: inputNamespace},
              ]
          }
          // applyOps事务
          {
               "op": "c",
               "lsid": {$exists: true},
               "txnNumber": {$exists: true},
               "o.applyOps.ns": inputNamespace,
          }
      ]
  },
  {"fromMigrate": {$ne: true}}
]

可以看到,对于DDL来说,目前只支持dropCollection、dropDatabase、renameCollection3个操作,其他类似create/delete index,createColleciton,createDatabase,甚至更为复杂的比如convertToCapped、非事务applyOps等操作都没有支持。不过未来MongoDB有计划完善部分DDL。我们还可以看到,这里对于move chunk的oplog( {fromMigrate: true} )是直接过滤的,因为move chunk oplog本身只是表示挪动过程的目的端插入和源端删除操作,对于Change Stream本身来说没有实际意义,因为数据本身没有发生变更,只是位置上的变化。oplog的时钟是HLC的,从shard1挪到了shard2,则shard1和shard2的时钟都会推进,且具有因果序,所以shard1上move chunk之前的oplog时钟肯定是小于shard2上move chunk之后的oplog。从而直接对这些oplog排序即可保证因果一致性。

3.3.2 DocumentSourceChangeStreamTransform

内部通过调用 applyTransformation 将oplog翻译成对应的change stream event,这一步也是比较消耗资源,需要经历bson序列化/解序列化操作,而又因为整个pipeline本身就是单线程串行处理的过程,所以如果源端用户写入非常大,这一步可能会成为性能瓶颈。未来 MongoDB可能会对这个stage进行优化 ,优化的方向可能:

  1. 多线程保序解析

  2. 整个pipeline多线程化,每个线程处理部分数据

  3. 合并 DocumentSourceChangeStreamTransformDocumentSourceOplogMatch 以减少反复序列化/解序列化的开销。

此外,事务的oplog跟普通oplog处理略有不同,比如要处理applyOps内部嵌套oplog的拆分,以保证事务内部的断点续传。

3.3.3 DocumentSourceCheckInvalidate 

判断是否返回invalidate,以下情况需要返回invalid:

  1. 如果是collection watch,则dropCollection,renameCollection,dropDatabase都会触发invalid。
  2. 如果是db watch,则dropDatabase会触发invalid。

将invalid change stream event返回,返回之前,会进行状态的本地存储,这样下次用户在当前cursor继续getNext将会继续返回该invalid change stream event。

3.3.4 DocumentSourceShardCheckResumability 

这一步主要判断给定的ResumeToken是否可以恢复/启动,以及跳过部分不需要的event,如果用户没有指定,mongos本身会生成一个ResumeToken发送到mongod。通过对比mongos输入的resumeToken和oplog query返回的ResumeToken,来决定当前拿到的event是否需要返回给客户端,这个比较会有以下3种结果:

  1. kCheckNextDoc 。表示mongod拉到的event比较老,需要skip一部分才能得到client token所需要的event。
  2. kFoundToken 。表示mongod拉到的event恰好是mongos/客户端需要的。所以直接返回即可。
  3. kSurpassedToken 。表示mongod拉到的event大于mongos/客户端需要的,也就是说,一部分数据已经丢了。此时,就需要返回错误了。

这里,可能有读者有疑问,为什么多此一举,根据客户端/mongos指定的ResumeToken返回的oplog难道不就是mongos/客户端所需要的吗?答案是否定的。第一个阶段 DocumentSourceOplogMatch 中,oplog中过滤指定位点信息的 只有时间戳 {"ts": {$gte: startFrom}} ,而没有其他更为具体的信息,比如事务applyOps中的txnOpIndex,表示事务内的偏移。举个例子,某事务包含3条语句:insert a = 1; insert a= 2; insert a = 3,返回第二条数据后cursor断开了,重连后根据时间戳恢复,如果是单机事务采用applyOps,那么时间戳都是一致的,断点续传后第一条返回是insert a=1,而客户端需要的是第三条a=3。所以需要skip前面2条。此外,还有一些情况需要处理,比如用户shardCollection,或者4.4中refine了shardKey,那么客户端携带的resumeToken跟event返回的token中documentKey是不一致的,需要一些特殊处理。

3.4 mongos聚合 

mergePipeline分拆各个stage处理聚合。

3.4.1 DocumentSourceMergeCursors 

这一步是用于合并各个shard返回的cursor。下面显示的mergeCursors的对于2个shard示例:

{
   $mergeCursors: {
      lsid: {
          id: BinData(4, "DFFB25F3B9444D0CA4F790FADC73E965"),
          uid: BinData(0,"E3B0C44298FC1C149AFBF4C8996FB92427AE41E4649B934CA495991B7852B855")
      },
       sort: {_id._data: 1}, // _id._data就是ResumeToken
      compareWholeSortKey: false,
      remotes: [
          {
              shardId: "shard1",
              hostAndPort: "xx.xx.xx.xx:33001",
              cursorResponse: {
                  cursor: {
                      id: 7258869757099183534,
                      ns: "zz.$cmd.aggregate",
                      firstBatch: [],
                      postBatchResumeToken: {
                          _data: "825F1BF44A000000032B0229296E04"
                      }
                  },
                   $_internalLatestOplogTimestamp: Timestamp(1595667530, 3),
                  ok: 1
              }
          },
          {
              shardId: "shard2",
              hostAndPort: "xx.xx.xx.xx:33004",
              cursorResponse: {
                  cursor: {
                      id: 6627263387104649979,
                      ns: "zz.$cmd.aggregate",
                      firstBatch: [],
                      postBatchResumeToken: {
                          _data: "825F1BF44A000000032B0229296E04"
                      }
                  },
                   $_internalLatestOplogTimestamp: Timestamp(1595667530, 3),
                  ok: 1
              }
          }
      ],
      tailableMode: "tailableAndAwaitData",
      nss: "zz.$cmd.aggregate",
      allowPartialResults: false
  }
}

可以看到这里是按照ResumeToken进行排序的( sort: {_id._data: 1} ),这里的 _id._data 就是ResumeToken。ResumeToken的排序就是根据内部字段ts+documentKey+uuid等多个维度进行排序。合并各个cursor的过程就是一个 队列多路归并+小顶堆排序 的流程:

r2y6fqe.png!mobile

vMbqayM.png!mobile mongos上针对每个cursor都维持了一个docBuffer queue存放拉取的change stream event,通过比较函数,每次将会找所有docBuffer queue中队列头部ResumeToken最小的event,比如上图3个队列,队列头部最小的是队列3。然后跟minPromisedSortKey进行比较,如果小于minPromisedSortKey则返回给客户端,否则将会循环等待minPromisedSortKey的推进。那么minPromisedSortKey又是什么?推进规则又是如何?这里需要先介绍一下 Post Batch Resume Token (简称PBRT)。其是4.0.7推出的, 标识当前MongoDB内部最大的oplog的时间戳 。也就是说,如果后面MongoDB一旦产生新的oplog,则时间戳肯定大于PBRT。 对于mongos来说,每个mongod的cursor getMore请求都会返回PBRT,所以这个相当于是mongod给mongos的一个承诺,承诺以后不会返回时间戳小于PBRT的event。 根据这个,Change Stream实现了排序等待的策略。此外,PBRT对于应用层还有另外一个作用: 可以不断推进同步的位点 ,举个例子,我启动一个Change Stream监听A表,但是A表自从10:00以后没有写入,全部都是其余表的写入,那么同步的位点一直停留在10:00。18:00的时候网络断开了,我根据10:00进行恢复,但是由于我的磁盘比较小,oplog只能保留5个小时,也就是说,此时最老的oplog时间戳是13:00,那么就无法恢复了。为了防止这种无法恢复,或者即使恢复也需要大面积扫描的操作(从10:00扫描oplog到18:00),PBRT的作用就体现出来了,客户端只需要不断缓存PBRT,即使A表一直没数据,位点也被不断推进到18:00,从而解决这种oplog丢失和大面积扫描的操作。回答之前的问题,minPromisedSortKey是什么? minPromisedSortKey就是所有shard中最小的PBRTmin{PBRT(mongd1), PBRT(mongod2), ...} 。关于mongos合并mongod数据的过程,下面给出了归纳的伪代码流程:

while (no_data() || data_from_heap_not_less_than_minPromisedSortKey()) { // 如果没有数据,或者堆顶元素大于等于minSortKey,则会一直循环
send_getMore_to_each_shard() // 发送getMore到每个shard
  advance_minPromisedSortKey() // 根据返回的PBRT推进minPromisedSortKey
}

return ready_batch() // 返回一批符合条件的batch

首先会判断是否有数据并且存在数据小于minPromisedSortKey,如果是则不用走内部循环,直接返回符合条件的batch event即可。否则进入循环,循环内部会发送getMore到每个shard,拉取到的数据同样都存入docBuffer队列中,并且根据返回的PBRT推进minPromisedSortKey(即使shard没有数据,超时返回的response也会携带PBRT),minSortKey的值等于所有shard返回的PBRT中最小的值。一旦while循环判断有堆顶元素满足要求:小于等于minSortKey,则退出循环,聚合batch返回给客户端了。这是因为所有的shard都不会产生时间戳比minSortKey更小的event了,这个event也就可以返回了。基于这个逻辑,即使某个shard一直没有产生对应database/collection的event,也会不断推进PBRT,从而使得mongos可以顺利返回数据给客户端,而不会一直block。此外,mongos的PBRT(返回的数据中的最后一条PBRT)也会携带在getMore的response中返回给客户端。上图的例子中,本次的minPromisedSortKey已经推进到了10,则10之前的都可以聚合batch返回了,10之后的还需要继续在各个docuBuffer队列中等待下一次minPromisedSortKey的推进才能返回。

IJ7FFv.png!mobilenARN3ez.png!mobile

到这里,可能会有读者提问,2个shard是否会产生的时钟一致的oplog/event,是的话如何排序?答案是不同的shard是可能会产生时钟一致的oplog/event,这是因为在没有消息交互的情况下,不同shard的HLC是独立推进的。如果产生的event一致,Change Stream只需要按照resumeToken进行排序即可,将会综合ts+documentKey+uuid等多个维度进行排序。换句话说,如果2个shard产生的oplog/event一致,证明这2个oplog是完全并发的,谁先谁后并不重要,只要规则固定即可。细心的读者可能还会发现,这里由于需要等待minPromisedSortKey的推进才能返回数据,所以有一个“等待”的策略,会影响实时性。的确如此,不过通常来说这个延迟是秒级别的,所以对于同步场景来说,基本上是可以接受的。

3.4.2 DocumentSourceUpdateOnAddShard 

这个stage用于有新增shard的时候,构建新的cursor到这个shard。判断的方式是发现返回的event的 operationType==kNewShardDetected 。这个operationType是在 DocumentSourceChangeStreamTransform 里面转换的:

{
   "ns": inputNamespace,
   $or: [ 
      {"op": {$ne: "n"}}, // 不为noop, 表示是普通的CRUD操作
      {"op": "n", "o2.type": "migrateChunkToNewShard"}, // chunk mirgrate到新的shard上
  ],
}

在DocumentSourceOplogMatch的时候会返回 op=n ,但是 o2.type=migrateChunkToNewShard 以表示产生了新的shard并且发生了move chunk的操作。 DocumentSourceChangeStreamTransform 发现有op=n的时候,就返回 operationType=kNewShardDetected

3.4.3 DocumentSourceCloseCursor 

当发送invalidate事件后,则标记 _shouldCloseCursor ,下次再调用getNext会抛出异常,mongos上层捕获后close掉cursor。

3.4.4 DocumentSourceLookupChangeImage 

用户如果设置了 {``fullDocument: updateLookup} ,则对于event的operation类型是update还会进行一次find查找(普通update操作对应的operation是replace,只有$set/$unsert对应的才是update)。如果是位于mongos,则会设置readConcern=majority,并且配置 afterClusterTime 时间戳为返回的event中的resumeToken中的clusterTime,为什么副本集上就没有 afterClusterTime 参数?因为这一步对于副本集,读到的肯定是更新以后的,因为change stream本身吐出来的节点跟当前query的节点一致。最后,调用 lookupSingleDocument 发送find命令进行查找。注意,这里只能够保证最终一致性。如果用户频繁更新可能会发生丢失,例如:用户更新 {_id:1, a:1}{_id:1, a:2} ,然后又更新为 {_id:1, a:3} 。则可能2个change event返回的更新后的值都是 {_id:1, a:3} 。这个跟DynamoDB的Stremas中吐出的event包含真实的修改后数据NewImage和修改前数据OldImage不同,MongoDB只能做到吐出修改后的数据NewImage,而这个NewImage是通过一次额外的query实现的,所以只能保证最终一致性。这个机制产生的另外一个问题就是性能问题,由于update需要进行额外的find,那么返回event的延迟就会增大。

4. 总结

dropCollection
dropDatabase
renameCollection
DocumentSourceChangeStreamTransform

参考:

https://lamport.azurewebsites.net/pubs/time-clocks.pdf

h ttps://cse.buffalo.edu/tech-reports/2014-04.pdf

https://dl.acm.org/doi/pdf/10.1145/3299869.3314049

https://github.com/alibaba/MongoShake

作者介绍

陈星(花名烛昭),阿里云数据库NoSQL团队技术专家,MongoShake/NimoShake/RedisShake作者,当前主要参与阿里云MongoDB的内核以及相关MongoDB产品开发和维护工作。致力于为用户提供更好的云服务以及开源生态产品。

UZ7Jf2N.png!mobile

UBJVzaE.png!mobile

J7NjUbb.png!mobile

eu6fYnr.png!mobile

6zQ3UrE.png!mobile

QFNZreN.png!mobile

RbAriuR.png!mobile

yaiqeea.png!mobileA3iqEnJ.png!mobile

MongoDB中文手册翻译正在进行中,欢迎更多朋友在自己的空闲时间学习并进行文档翻译,您的翻译将由社区专家进行审阅,并拥有署名权更新到中文用户手册和发布到社区微信内容平台。 点击下方图片即可领取翻译任务——

raIbyuv.jpg!mobileNNreQv.png!mobile

更多问题可以添加社区助理小芒果微信(mongoingcom)咨询,进入社区微信交流群请备注“mongo”。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK