Please enable Javascript to view the contents

Celery 处理大文件失败问题排查与解决

 ·  ☕ 3 分钟

1. 一个小需求

经常遇到一些小的需求,但是实现起来并不简单。这里就有一个文件上传的简单需求,分为下面几个步骤:

  1. 用户在页面上传一个大文件
  2. 大文件会被暂存在内网的 Ceph
  3. 后台任务,将 Ceph 中的大文件,下载到 Docker 内
  4. 后台任务,将 Docker 中的大文件,上传到外网的 COS

后台使用的是 Django,采用 Docker 多实例部署。多实例方便扩容,提高服务的并发能力,但是要求实例无状态,有状态的部分需要存储在第三方服务,Ceph 就是其中之一。

直接将文件从本地上传到 COS ,会导致正在上传的文件因为发布而被丢失。

2. 一个大文件引发的 Bug

在测试的过程中,发现小文件可以正常上传;但是上传大于 300MB 的文件时,总是失败。日志如下:

1
2
3
4
5
[2018-10-23 10:11:18] celery: [2018-10-23 10:11:18,114: ERROR/MainProcess] Process 'Worker-20' pid:45 exited with 'signal 9 (SIGKILL)' [time_stamp=2018-10-23 10:11:18,114, worker=MainProcess, levelname=ERROR]
[2018-10-23 10:11:18] celery: [2018-10-23 10:11:18,206: ERROR/MainProcess] Pool callback raised exception: OperationalError(2006, "MySQL server has gone away (error(32, 'Broken pipe'))") [time_stamp=2018-10-23 10:11:18,206, worker=MainProcess, levelname=ERROR]
[2018-10-23 10:11:18] celery: self._execute_command(COMMAND.COM_QUERY, sql)
[2018-10-23 10:11:18] celery: File "/app/.heroku/python/lib/python2.7/site-packages/pymysql/connections.py", line 970, in _execute_command
[2018-10-23 10:11:18] celery: Traceback (most recent call last):

使用的包版本:

1
2
3
Django==1.8.3
celery==3.1.18
django-celery==3.1.16

日志中有两个错误,一个是进程被杀掉,一个是数据库失去连接。最开始将问题定位在 MySQL server has gone away,但一直没有解决问题。

最后,在 Grafana 中查看 Celery Worker 的内存使用率时,发现每次上传大文件,内存使用率就会急剧增加,然后又急剧下降。原来是,内存使用超了,进程被强制 kill。

最后,通过优化内存使用,解决了问题。

3. 一段优化代码

优化之前:

1
2
3
4
5
r = requests.get(self.local_file.url,
                 allow_redirects=True,
                 stream=True,
                 timeout=300)
open(self.local_path, 'wb+').write(r.content)

优化之后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
if not os.path.exists(os.path.dirname(self.local_path)):
    try:
        os.makedirs(os.path.dirname(self.local_path))
    except OSError as exc:
        if exc.errno != errno.EEXIST:
            raise
    r = requests.get(self.local_file.url,
                     allow_redirects=True,
                     stream=True,
                     timeout=300)
with open(self.local_path, 'wb') as f:
        for chunk in r.iter_content(chunk_size=1024 * 512): 
            if chunk:
                f.write(chunk)

优化之前,Celery 将整个文件放在内存,内存使用率暴增。优化之后,先请求响应头,通过迭代器一点一点读取响应体中的文件内容。这大大节省了内存使用。

4. 一个数据库连接异常问题

找到问题,解决之后,并没有结束。这里有一个奇怪的问题,为什么会出现 MySQL server has gone away

同事之前也遇到一个类似的问题,Celery 多进程任务抛出各种数据库异常。原因分析如下:

Celery Worker 在启动时,djcelery 进行了 DB 操作,数据库连接被初始化。
子进程被 fork 出来后,由于完全复制了父进程的内存数据,导致所有 Worker 共享了同一个 MySQL 连接(同一个 socket file)。由于 persistent connections 特性,数据库连接没有被关闭。这个是 djcelery 库与多线程部署的坑。

解决方案是:

不关闭 persistent connections 功能,监听子进程初始化完成和任务开始的信号。在收到信号时,手动强制关闭当前进程中 Django ORM 的连接。

相关实现的代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from django.db import connections

@signals.task_prerun.connect
def task_prerun(**kwargs):
    for conn in connections.all():
        conn.close()

@signals.worker_process_init.connect
def worker_init(**kwargs):
    for conn in connections.all():
        conn.close()

实际上,这里的 signal 9 (SIGKILL)MySQL server has gone away 并不是同一个 Celery Worker 抛出。由于继承自同一个父进程和连接池,当其中一个子进程被 kill 之后,另外一个正在处理任务的进程也会出问题。

5. 一段相关的 Django 代码

在高并发情况下,频繁新建/关闭数据库连接是低效的。Django 的 persistent connections(长连接) 就是为了解决这个问题。

Django 数据库长连接的原理是,在每次创建数据库连接之后,把连接实例放到一个 Theard.local 的实例中维护。每次进行数据库请求时,Django 会去 local 中查找可用的连接,有则复用。当连接,发生异常或者存在时间超过 CONN_MAX_AGE 时,才会被关闭。

CONN_MAX_AGE 参数,在 settings.py 文件中可以配置:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.mysql',
        'NAME': 'mydb',  # 数据库名称
        'USER': 'root',  # 数据库用户名
        'PASSWORD': '',  # 数据库密码
        'HOST': 'localhost',  # 数据库主机,默认为 localhost
        'PORT': '3306',  # 数据库端口
        'CONN_MAX_AGE': 60,  # 0 表示使用完马上关闭,None 表示不关闭
    }
}

再来看看,Django 中如何管理长连接:

django/db/__init_.py

1
2
3
def close_old_connections(**kwargs):
    for conn in connections.all():
        conn.close_if_unusable_or_obsolete()

django/db/backends/base/base.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
def close_if_unusable_or_obsolete(self):
    if self.connection is not None:
        # If the application didn't restore the original autocommit setting,
        # don't take chances, drop the connection.
        if self.get_autocommit() != self.settings_dict['AUTOCOMMIT']:
            self.close()
            return

        # If an exception other than DataError or IntegrityError occurred
        # since the last commit / rollback, check if the connection works.
        if self.errors_occurred:
            if self.is_usable():
                self.errors_occurred = False
            else:
                self.close()
                return

        if self.close_at is not None and time.time() >= self.close_at:
            self.close()
            return

django/db/backends/mysql/base.py

1
2
3
4
5
6
7
def is_usable(self):
    try:
        self.connection.ping()
    except Database.Error:
        return False
    else:
        return True

6. 参考


微信公众号
作者
微信公众号