Celery/Kombu MongoDB 连接异常调查记录

 1 year ago
source link: https://zdyxry.github.io/2022/05/09/Celery-Kombu-MongoDB-%E8%BF%9E%E6%8E%A5%E5%BC%82%E5%B8%B8%E8%B0%83%E6%9F%A5%E8%AE%B0%E5%BD%95/
Celery/Kombu MongoDB 连接异常调查记录

发表于 2022-05-09


产品组件 JobCenter 使用 Celery 实现异步任务中心,同时会运行 job-center-worker (celery worker) 和 job-center-scheduler(celery beat) 两个进程,使用 MongoDB 作为 Backend 存储 message 等信息(Celery 官方已说明不再维护对 MongoDB 的支持)。其中 MongoDB 配置了 ReplicaSet 保证高可用。

近期 Celery/Kombu 中遇到了 No free channel ids 问题,经过排查在这个 PR 中解决了该问题,在考虑 cherry-pick 的工作量和可维护性考虑,最终将产品中的 celery 和 kombu 组件从 3.x 统一升级到了 4.x 版本。

测试同学反馈近期在进行可靠性测试时,发现将 MongoDB 节点的存储网络 ifdown 会导致 JobCenter hang. 针对该问题进行调查。


先尝试复现该问题,首先尝试 ifdown Primary 节点存储网络,现象复现;尝试 ifdown Secondary 节点存储网络,无法复现;
尝试 stop MongoDB service 替代 ifdown,Primary 或 Secondary 均无法复现。推测与 MongoDB 连接处理有关。

观察现象复现的日志,在存储网络异常时,日志无任何输出,在存储网络恢复正常后,可以看到 Celery 记录在尝试连接 Broker (MongoDB)时发生了异常,尝试重连。

[2022-05-07 10:13:01,362: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
File "/usr/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line 318, in start
File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 119, in start
File "/usr/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line 596, in start
File "/usr/lib/python2.7/site-packages/celery/worker/loops.py", line 121, in synloop
File "/usr/lib/python2.7/site-packages/kombu/connection.py", line 315, in drain_events
return self.transport.drain_events(self.connection, **kwargs)
File "/usr/lib/python2.7/site-packages/kombu/transport/virtual/base.py", line 963, in drain_events
get(self._deliver, timeout=timeout)
File "/usr/lib/python2.7/site-packages/kombu/utils/scheduling.py", line 56, in get
return self.fun(resource, callback, **kwargs)
File "/usr/lib/python2.7/site-packages/kombu/transport/virtual/base.py", line 1001, in _drain_channel
return channel.drain_events(callback=callback, timeout=timeout)
File "/usr/lib/python2.7/site-packages/kombu/transport/virtual/base.py", line 745, in drain_events
return self._poll(self.cycle, callback, timeout=timeout)
File "/usr/lib/python2.7/site-packages/kombu/transport/virtual/base.py", line 402, in _poll
return cycle.get(callback)
File "/usr/lib/python2.7/site-packages/kombu/utils/scheduling.py", line 56, in get
return self.fun(resource, callback, **kwargs)
File "/usr/lib/python2.7/site-packages/kombu/transport/virtual/base.py", line 405, in _get_and_deliver
message = self._get(queue)
File "/usr/lib/python2.7/site-packages/kombu/transport/mongodb.py", line 141, in _get
File "/usr/lib64/python2.7/site-packages/pymongo/collection.py", line 2315, in find_and_modify
File "/usr/lib64/python2.7/site-packages/pymongo/collection.py", line 205, in _command
File "/usr/lib64/python2.7/site-packages/pymongo/pool.py", line 218, in command
File "/usr/lib64/python2.7/site-packages/pymongo/pool.py", line 346, in _raise_connection_failure
raise error
AutoReconnect: connection closed
[2022-05-07 10:13:01,363: WARNING/MainProcess] Restoring 1 unacknowledged message(s)

对应 Celery 代码在 worker/consumer/consumer.py,Blueprint 是 Celery 启动入口,可以看到在 blueprint.start(self) 阶段进行了异常处理,针对 self.connection_errors 来触发重连接。

consumer: Connection to broker lost. \
Trying to re-establish the connection...\
def start(self):
blueprint = self.blueprint
while blueprint.state != CLOSE:
if self.restart_count:
except RestartFreqExceeded as exc:
crit('Frequent restarts detected: %r', exc, exc_info=1)
self.restart_count += 1
except self.connection_errors as exc:
# If we're not retrying connections, no need to catch
# connection errors
if not self.app.conf.broker_connection_retry:
if isinstance(exc, OSError) and exc.errno == errno.EMFILE:
raise # Too many open files
if blueprint.state != CLOSE:
if self.connection:

def on_connection_error_before_connected(self, exc):
error(CONNECTION_ERROR, self.conninfo.as_uri(), exc,
'Trying to reconnect...')

def on_connection_error_after_connected(self, exc):
warn(CONNECTION_RETRY, exc_info=True)
except Exception:

self.connection_errors 对应的其实是 Kombu 中 Transport 定义的 ,可以在 kombu/kombu/transport/mongodb.py 中查看,在当前版本中,定义为 pymongo.errors.ConnectionFailure ,pymongo 中常见的网络连接异常 AutoReconnectNetworkTimeout 均继承自 ConnectionFailure

class Transport(virtual.Transport):
Channel = Channel

can_parse_url = True
polling_interval = 1
default_port = DEFAULT_PORT
connection_errors = (
virtual.Transport.connection_errors + (errors.ConnectionFailure, )

从目前来看,Celery 可以正确处理 kombu 上报的异常,但是在存储网络异常时,Kombu 没有抛出异常,于是问题调查从 Celery 转到 Kombu。


主要看 MongoDB Transport 关于建立连接部分的处理,代码执行路径依次是: client -> _create_client -> _open -> _parse_uri ,其中 _open 是真正建立连接的处理,连接所采用的参数是在 _parse_uri 返回的,_parse_uri 最终调用的是 pymongo.uri_parser.parse_uri

def _open(self, scheme='mongodb://'):
hostname, dbname, options = self._parse_uri(scheme=scheme)

conf = self._prepare_client_options(options)
conf['host'] = hostname

env = _detect_environment()
if env == 'gevent':
from gevent import monkey
elif env == 'eventlet':
from eventlet import monkey_patch

mongoconn = MongoClient(**conf)
database = mongoconn[dbname]

version_str = mongoconn.server_info()['version']
version = tuple(map(int, version_str.split('.')))

if version < (1, 3):
raise VersionMismatch(E_SERVER_VERSION.format(version_str))
elif self.ttl and version < (2, 2):
raise VersionMismatch(E_NO_TTL_INDEXES.format(version_str))

return database

def _parse_uri(self, scheme='mongodb://'):
# See mongodb uri documentation:
# http://docs.mongodb.org/manual/reference/connection-string/
client = self.connection.client
hostname = client.hostname

if not hostname.startswith(scheme):
hostname = scheme + hostname

if not hostname[len(scheme):]:
hostname += self.default_hostname

if client.userid and '@' not in hostname:
head, tail = hostname.split('://')
credentials = client.userid
if client.password:
credentials += ':' + client.password
hostname = head + '://' + credentials + '@' + tail

port = client.port if client.port else self.default_port
parsed = uri_parser.parse_uri(hostname, port)
dbname = parsed['database'] or client.virtual_host
if dbname in ('/', None):
dbname = self.default_database
options = {
'auto_start_request': True,
'ssl': self.ssl,
'connectTimeoutMS': (int(self.connect_timeout * 1000)
if self.connect_timeout else None),

return hostname, dbname, options

假设我们连接参数是 mongodb://, ,那么 pymongo.uri_parser.parse_uri 解析的结果会是: {'username': None, 'nodelist': [('', 27017), ('', 27017)], 'database': 'yiran', 'collection': None, 'password': None, 'options': {}}

socketTimeoutMS在我们的环境中,MongoDB 的 URI 中并没有指定 options,所以 pymongo.uri_parser.parse_uri 结果的 options 为空。最终连接所使用的 options 就是在 _parse_uri 中定义的 options ,其中类变量 connect_timeout 在 Channel 定义为 None,所以最终 Kombu 建立 MongoDB 连接并没有设置 socketTimeoutMS,如果不设置 socketTimeoutMS ,默认是 None,永久等待。当网络出现异常时,直观看到的现象会是 hang 住。

class Channel(virtual.Channel):
"""MongoDB Channel."""

supports_fanout = True

# Mutable container. Shared by all class instances
_fanout_queues = {}

# Options
ssl = False
ttl = False
connect_timeout = None
capped_queue_size = 100000
calc_queue_size = True

Celery 与 Kombu 参数传递

现在观察到连接参数不符合预期,为什么之前的 3.x 版本没有问题?切换到 3.x 分支查看对应的代码,可以看到大体逻辑都是类似的,关于 options 的处理,3.x 存在一行 : options.update(client.transport_options) ,这里的 client 在函数最开始赋值,对应的是 self.connection.clientself.connection 是 Transport 构造传入的参数。

def _parse_uri(self, scheme='mongodb://'):
# See mongodb uri documentation:
# http://docs.mongodb.org/manual/reference/connection-string/
client = self.connection.client
hostname = client.hostname

if not hostname.startswith(scheme):
hostname = scheme + hostname

if not hostname[len(scheme):]:
hostname += DEFAULT_HOST

if client.userid and '@' not in hostname:
head, tail = hostname.split('://')

credentials = client.userid
if client.password:
credentials += ':' + client.password

hostname = head + '://' + credentials + '@' + tail

port = client.port if client.port is not None else DEFAULT_PORT

parsed = uri_parser.parse_uri(hostname, port)

dbname = parsed['database'] or client.virtual_host

if dbname in ('/', None):
dbname = 'kombu_default'

options = {
'auto_start_request': True,
'ssl': client.ssl,
'connectTimeoutMS': (int(client.connect_timeout * 1000)
if client.connect_timeout else None),

return hostname, dbname, options

connection 对应的就是 Kombu 中的 Connection ,Kombu 对外隐藏了 Transport ,Celery 在初始化阶段,会建立连接,调用路径是 celery/app/base.py:Celery._connection -> celery/app/amqp.py:AMQP.Connection -> kombu/connection.py:Connection 。传递参数 transport_options 就是在 Celery app 声明时配置的参数,具体可配置参数可以参考文档: https://docs.celeryq.dev/en/stable/userguide/configuration.html。


"connect": False,
"maxPoolSize": 5 if "worker" in process_cmdline else 2,
"socketTimeoutMS": 5000,
"connectTimeoutMS": 5000,
"serverSelectionTimeoutMS": 5000,
"w": 0,

在 Kombu 使用 MongoDB Transport 时,最终会带有这些参数创建 MongoDB 连接,所以不会出现此问题说描述的现象。

Celery 改动背景

@rmihael 上报了一个问题: Celery events are not removed from MongoDB broker #1047 ,表示在使用 Celery Flower(Celery 监控组件)后,messages 中的事件不会清除,导致占用了大量的 MongoDB 存储空间。该 Issue 中讨论最终决定使用 MongoDB TTL 来解决此问题。

在 Kombu 4.x 开发周期中,@daevaorn 提交了 MongoDB TTL support and refactorings #537 用来支持 MongoDB TTL,在该 PR 中包含了大量与 TTL 无关的 commit,并且包含了一定的重构,commit 如下:

  • Complete unit tests suit for MongoDB transport
  • Optional TTL support for MongoDB transport. AMQP TTL headers: x-messa…
  • Rearrange methods at MongoDB channel class
  • Another MongoDB transport clean up and refactor. Use of transport opt…
  • Opt-out for queue size calculation
  • Use natural sort for more FIFO semantic
  • Fix docstrings

其中 Optional TTL support for MongoDB transport. 是最关键的改动,忽略 TTL 的改动,主要看建立 MongoDB 连接的改动。在 Channel Class 中新增了一些类变量用于标识当前配置,在 _parse_uri 中,将 SSL,connectTImeoutMS 从 client 替换为了 self ,并删除了 options.update(client.transport_options) 。其中删除了 options.update(client.transport_options) 是导致这个问题的关键。

 class Channel(virtual.Channel):
_client = None
supports_fanout = True
+ # Mutable containers. Shared by all class instances
_fanout_queues = {}

+ # Options
+ connect_timeout = None
+ ssl = False
+ capped_queue_size = 100000
+ ttl = False
+ from_transport_options = (
+ virtual.Channel.from_transport_options
+ + ('connect_timeout', 'ssl', 'ttl', 'capped_queue_size'))
def __init__(self, *vargs, **kwargs):
super(Channel, self).__init__(*vargs, **kwargs)


def _parse_uri(self, scheme='mongodb://'):
options = {
'auto_start_request': True,
- 'ssl': client.ssl,
- 'connectTimeoutMS': (int(client.connect_timeout * 1000)
- if client.connect_timeout else None),
+ 'ssl': self.ssl,
+ 'connectTimeoutMS': (int(self.connect_timeout * 1000)
+ if self.connect_timeout else None),
- options.update(client.transport_options)

Celery Kombu 代码管理感觉有些不清晰,在多个分支上想要对比非常困难。必要组件升级大版本进行全集测试是必要的。

