目录

    1. 一个小需求

    经常遇到一些小的需求,但是实现起来并不简单。这里就有一个文件上传的简单需求,分为下面几个步骤:

    1. 用户在页面上传一个大文件
    2. 大文件会被暂存在内网的 Ceph
    3. 后台任务,将 Ceph 中的大文件,下载到 Docker 内
    4. 后台任务,将 Docker 中的大文件,上传到外网的 COS

    后台使用的是 Django,采用 Docker 多实例部署。多实例方便扩容,提高服务的并发能力,但是要求实例无状态,有状态的部分需要存储在第三方服务,Ceph 就是其中之一。

    直接将文件从本地上传到 COS ,会导致正在上传的文件因为发布而被丢失。

    2. 一个大文件引发的 Bug

    在测试的过程中,发现小文件可以正常上传;但是上传大于 300MB 的文件时,总是失败。日志如下:

    [2018-10-23 10:11:18] celery: [2018-10-23 10:11:18,114: ERROR/MainProcess] Process 'Worker-20' pid:45 exited with 'signal 9 (SIGKILL)' [time_stamp=2018-10-23 10:11:18,114, worker=MainProcess, levelname=ERROR]
    [2018-10-23 10:11:18] celery: [2018-10-23 10:11:18,206: ERROR/MainProcess] Pool callback raised exception: OperationalError(2006, "MySQL server has gone away (error(32, 'Broken pipe'))") [time_stamp=2018-10-23 10:11:18,206, worker=MainProcess, levelname=ERROR]
    [2018-10-23 10:11:18] celery: self._execute_command(COMMAND.COM_QUERY, sql)
    [2018-10-23 10:11:18] celery: File "/app/.heroku/python/lib/python2.7/site-packages/pymysql/connections.py", line 970, in _execute_command
    [2018-10-23 10:11:18] celery: Traceback (most recent call last):
    

    使用的包版本:

    Django==1.8.3
    celery==3.1.18
    django-celery==3.1.16
    

    日志中有两个错误,一个是进程被杀掉,一个是数据库失去连接。最开始将问题定位在 MySQL server has gone away,但一直没有解决问题。

    最后,在 Grafana 中查看 Celery Worker 的内存使用率时,发现每次上传大文件,内存使用率就会急剧增加,然后又急剧下降。原来是,内存使用超了,进程被强制 kill。

    最后,通过优化内存使用,解决了问题。

    3. 一段优化代码

    优化之前:

    r = requests.get(self.local_file.url,
                     allow_redirects=True,
                     stream=True,
                     timeout=300)
    open(self.local_path, 'wb+').write(r.content)
    

    优化之后:

    if not os.path.exists(os.path.dirname(self.local_path)):
        try:
            os.makedirs(os.path.dirname(self.local_path))
        except OSError as exc:
            if exc.errno != errno.EEXIST:
                raise
        r = requests.get(self.local_file.url,
                         allow_redirects=True,
                         stream=True,
                         timeout=300)
    with open(self.local_path, 'wb') as f:
            for chunk in r.iter_content(chunk_size=1024 * 512): 
                if chunk:
                    f.write(chunk)
    

    优化之前,Celery 将整个文件放在内存,内存使用率暴增。优化之后,先请求响应头,通过迭代器一点一点读取响应体中的文件内容。这大大节省了内存使用。

    4. 一个数据库连接异常问题

    找到问题,解决之后,并没有结束。这里有一个奇怪的问题,为什么会出现 MySQL server has gone away

    同事之前也遇到一个类似的问题,Celery 多进程任务抛出各种数据库异常。原因分析如下:

    Celery Worker 在启动时,djcelery 进行了 DB 操作,数据库连接被初始化。
    子进程被 fork 出来后,由于完全复制了父进程的内存数据,导致所有 Worker 共享了同一个 MySQL 连接(同一个 socket file)。由于 persistent connections 特性,数据库连接没有被关闭。这个是 djcelery 库与多线程部署的坑。
    

    解决方案是:

    不关闭 persistent connections 功能,监听子进程初始化完成和任务开始的信号。在收到信号时,手动强制关闭当前进程中 Django ORM 的连接。
    

    相关实现的代码如下:

    from django.db import connections
    
    @signals.task_prerun.connect
    def task_prerun(**kwargs):
        for conn in connections.all():
            conn.close()
    
    @signals.worker_process_init.connect
    def worker_init(**kwargs):
        for conn in connections.all():
            conn.close()
    

    实际上,这里的 signal 9 (SIGKILL)MySQL server has gone away 并不是同一个 Celery Worker 抛出。由于继承自同一个父进程和连接池,当其中一个子进程被 kill 之后,另外一个正在处理任务的进程也会出问题。

    5. 一段相关的 Django 代码

    在高并发情况下,频繁新建/关闭数据库连接是低效的。Django 的 persistent connections(长连接) 就是为了解决这个问题。

    Django 数据库长连接的原理是,在每次创建数据库连接之后,把连接实例放到一个 Theard.local 的实例中维护。每次进行数据库请求时,Django 会去 local 中查找可用的连接,有则复用。当连接,发生异常或者存在时间超过 CONN_MAX_AGE 时,才会被关闭。

    CONN_MAX_AGE 参数,在 settings.py 文件中可以配置:

    DATABASES = {
        'default': {
            'ENGINE': 'django.db.backends.mysql',
            'NAME': 'mydb',  # 数据库名称
            'USER': 'root',  # 数据库用户名
            'PASSWORD': '',  # 数据库密码
            'HOST': 'localhost',  # 数据库主机,默认为 localhost
            'PORT': '3306',  # 数据库端口
            'CONN_MAX_AGE': 60,  # 0 表示使用完马上关闭,None 表示不关闭
        }
    }
    

    再来看看,Django 中如何管理长连接:

    django/db/__init_.py

    def close_old_connections(**kwargs):
        for conn in connections.all():
            conn.close_if_unusable_or_obsolete()
    

    django/db/backends/base/base.py

    def close_if_unusable_or_obsolete(self):
        if self.connection is not None:
            # If the application didn't restore the original autocommit setting,
            # don't take chances, drop the connection.
            if self.get_autocommit() != self.settings_dict['AUTOCOMMIT']:
                self.close()
                return
    
            # If an exception other than DataError or IntegrityError occurred
            # since the last commit / rollback, check if the connection works.
            if self.errors_occurred:
                if self.is_usable():
                    self.errors_occurred = False
                else:
                    self.close()
                    return
    
            if self.close_at is not None and time.time() >= self.close_at:
                self.close()
                return
    

    django/db/backends/mysql/base.py

    def is_usable(self):
        try:
            self.connection.ping()
        except Database.Error:
            return False
        else:
            return True
    

    6. 参考