标签分类
技术文章
当前位置:主页 > 计算机编程 > python > Django配置celery(非djcelery)执行异步任务和定时任务

Django配置celery执行异步任务和定时任务详解

  • 发布时间:
  • 作者:码农之家原创
  • 点击:103

Django配置celery(非djcelery)执行异步任务和定时任务

这篇文章主要知识点是关于Django,celery,Django,异步任务,Django,定时任务,Django配置celery(非djcelery)执行异步任务和定时任务,Django中使用Celery的方法示例 的内容,如果大家想对相关知识点有系统深入的学习,可以参阅以下电子书

跟老齐学Python:Django实战
  • 类型:Python大小:151 MB格式:PDF出版:电子工业出版社作者:齐伟
立即下载

更多相关的学习资源可以参阅 网络与数据通信电子书PHP电子书、等栏目。

所有演示均基于Django2.0

celery是一个基于python开发的简单、灵活且可靠的分布式任务队列框架,支持使用任务队列的方式在分布式的机器/进程/线程上执行任务调度。采用典型的生产者-消费者模型,主要由三部分组成:

  • 消息队列broker:broker实际上就是一个MQ队列服务,可以使用redis、rabbitmq等作为broker
  • 处理任务的消费者workers:broker通知worker队列中有任务,worker去队列中取出任务执行,每一个worker就是一个进程
  • 存储结果的backend:执行结果存储在backend,默认也会存储在broker使用的MQ队列服务中,也可以单独配置用何种服务做backend

Django配置celery(非djcelery)执行异步任务和定时任务

异步任务

我的异步使用场景为项目上线:前端web上有个上线按钮,点击按钮后发请求给后端,后端执行上线过程要5分钟,后端在接收到请求后把任务放入队列异步执行,同时马上返回给前端一个任务执行中的结果。若果没有异步执行会怎么样呢?同步的情况就是执行过程中前端一直在等后端返回结果,页面转呀转的就转超时了。

异步任务配置

1.安装rabbitmq,这里我们使用rabbitmq作为broker,安装完成后默认启动了,也不需要其他任何配置

# apt-get install rabbitmq-server

2.安装celery

# pip3 install celery

3.celery用在django项目中,django项目目录结构(简化)如下

website/
|-- deploy
|  |-- admin.py
|  |-- apps.py
|  |-- __init__.py
|  |-- models.py
|  |-- tasks.py
|  |-- tests.py
|  |-- urls.py
|  `-- views.py
|-- manage.py
|-- README
`-- website
  |-- celery.py
  |-- __init__.py
  |-- settings.py
  |-- urls.py
  `-- wsgi.py

4.创建 website/celery.py 主文件

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery, platforms

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'website.settings')

app = Celery('website')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#  should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

# 允许root 用户运行celery
platforms.C_FORCE_ROOT = True

@app.task(bind=True)
def debug_task(self):
  print('Request: {0!r}'.format(self.request))

5.在 website/__init__.py 文件中增加如下内容,确保django启动的时候这个app能够被加载到

from __future__ import absolute_import

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

6.各应用创建tasks.py文件,这里为 deploy/tasks.py

from __future__ import absolute_import
from celery import shared_task

@shared_task
def add(x, y):
  return x + y

注意tasks.py必须建在各app的根目录下,且只能叫tasks.py,不能随意命名

7.views.py中引用使用这个tasks异步处理

from deploy.tasks import add

def post(request):
  result = add.delay(2, 3)
result.ready()
result.get(timeout=1)
result.traceback

8.启动celery

# celery -A website worker -l info

9.这样在调用post这个方法时,里边的add就可以异步处理了

定时任务

定时任务的使用场景就很普遍了,比如我需要定时发送报告给老板~

定时任务配置

1. website/celery.py 文件添加如下配置以支持定时任务crontab

from celery.schedules import crontab

app.conf.update(
  CELERYBEAT_SCHEDULE = {
    'sum-task': {
      'task': 'deploy.tasks.add',
      'schedule': timedelta(seconds=20),
      'args': (5, 6)
    }
    'send-report': {
      'task': 'deploy.tasks.report',
      'schedule': crontab(hour=4, minute=30, day_of_week=1),
    }
  }
)

定义了两个task:

  • 名字为'sum-task'的task,每20秒执行一次add函数,并传了两个参数5和6
  • 名字为'send-report'的task,每周一早上4:30执行report函数

timedelta是datetime中的一个对象,需要 from datetime import timedelta 引入,有如下几个参数

  • days
  • seconds
  • microseconds
  • milliseconds
  • minutes
  • hours

crontab的参数有:

month_of_year
day_of_month
day_of_week
hour
minute

2. deploy/tasks.py 文件添加report方法:

@shared_task
def report():
  return 5

3.启动celery beat,celery启动了一个beat进程一直在不断的判断是否有任务需要执行

# celery -A website beat -l info

Tips

1.如果你同时使用了异步任务和计划任务,有一种更简单的启动方式 celery -A website worker -b -l info ,可同时启动worker和beat

2.如果使用的不是rabbitmq做队列那么需要在主配置文件中 website/celery.py 配置broker和backend,如下:

# redis做MQ配置
app = Celery('website', backend='redis', broker='redis://localhost')
# rabbitmq做MQ配置
app = Celery('website', backend='amqp', broker='amqp://admin:admin@localhost')

3.celery不能用root用户启动的话需要在主配置文件中添加 platforms.C_FORCE_ROOT = True

4.celery在长时间运行后可能出现内存泄漏,需要添加配置 CELERYD_MAX_TASKS_PER_CHILD = 10 ,表示每个worker执行了多少个任务就死掉

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持码农之家。

Django中使用Celery的方法示例

起步

在 《分布式任务队列Celery使用说明》 中介绍了在 Python 中使用 Celery 来实验异步任务和定时任务功能。本文介绍如何在 Django 中使用 Celery。

安装

pip install django-celery

这个命令使用的依赖是 Celery 3.x 的版本,所以会把我之前安装的 4.x 卸载,不过对功能上并没有什么影响。我们也完全可以仅用Celery在django中使用,但使用 django-celery 模块能更好的管理 celery。

使用

可以把有关 Celery 的配置放到 settings.py 里去,但我比较习惯单独一个文件来放,然后在 settings.py 引入进来:

# celery_config.py
import djcelery
import os

os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')
djcelery.setup_loader()

BROKER_URL = 'redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'

# UTC
CELERY_ENABLE_UTC = True
CELERY_TIMEZONE = 'Asia/Shanghai'

CELERY_IMPORTS = (
 'app.tasks',
)

# 有些情况可以防止死锁
CELERY_FORCE_EXECV = True

# 设置并发的worker数量
CELERYD_CONCURRENCY = 4

# 任务发送完成是否需要确认,这一项对性能有一点影响
CELERY_ACKS_LATE = True

# 每个worker执行了多少任务就会销毁,防止内存泄露,默认是无限的
CELERYD_MAX_TASKS_PER_CHILD = 40

# 规定完成任务的时间
CELERYD_TASK_TIME_LIMIT = 15 * 60 # 在15分钟内完成任务,否则执行该任务的worker将被杀死,任务移交给父进程

# 设置默认的队列名称,如果一个消息不符合其他的队列就会放在默认队列里面,如果什么都不设置的话,数据都会发送到默认的队列中
CELERY_DEFAULT_QUEUE = "default"

# 设置详细的队列
CELERY_QUEUES = {
 "default": { # 这是上面指定的默认队列
  "exchange": "default",
  "exchange_type": "direct",
  "routing_key": "default"
 },
 "beat_queue": {
  "exchange": "beat_queue",
  "exchange_type": "direct",
  "routing_key": "beat_queue"
 }

}

配置文件中设置了 CELERY_IMPORTS 导入的任务,所以在django app中创建相应的任务文件:

# app/tasks.py
from celery.task import Task
import time

class TestTask(Task):
 name = 'test-task' # 给任务设置个自定义名称

 def run(self, *args, **kwargs):
  print('start test task')
  time.sleep(4)
  print('args={}, kwargs={}'.format(args, kwargs))
  print('end test task')

在 settings.py 添加:

INSTALLED_APPS = [
 # ...
 'djcelery',
]

# Celery
from learn_django.celery_config import *

触发任务或提交任务可以在view中来调用:

# views.py
from django.http import HttpResponse
from app.tasks import TestTask

def test_task(request):
 # 执行异步任务
 print('start do request')
 t = TestTask()
 t.delay()
 print('end do request')
 return HttpResponse('ok')

启动 woker 的命令是:

python manage.py celery worker -l info

再启动django,访问该view,可以看到任务在worker中被消费了。

定时任务

在celery的配置文件 celery_config.py 文件中添加:

CELERYBEAT_SCHEDULE = {
 'task1-every-1-min': { # 自定义名称
  'task': 'test-task', # 与任务中name名称一致
  'schedule': datetime.timedelta(seconds=5),
  'args': (2, 15),
  'options': {
   'queue': 'beat_queue', # 指定要使用的队列
  }
 },

}

通过 options 的 queque 来指定要使用的队列,这里需要单独的队列是因为,如果所有任务都使用同一队列,对于定时任务来说,任务提交后会位于队列尾部,任务的执行时间会靠后,所以对于定时任务来说,使用单独的队列。

启动 beat:

python manage.py celery beat -l info

监控工具 flower

如果celery中的任务执行失败了,有些场景是需要对这些任务进行监控, flower 是基于 Tornado 开发的web应用。安装用 pip install flower ;启动它可以是:

python manage.py celery flower

# python manage.py celery flower --basic_auth=admin:admin

用浏览器访问 http://localhost:5555 即可查看:

Django中使用Celery的方法示例

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持码农之家。

以上就是本次给大家分享的全部知识点内容总结,大家还可以在下方相关文章里找到vue项目中使用md5加密以及、 儿童python编程入门书籍推、 解决axios.interceptors.respon、 等python文章进一步学习,感谢大家的阅读和支持。

上一篇:Python不使用int()函数把字符串转换为数字实例讲解

下一篇:Python实现基于KNN算法的笔迹识别功能实例代码

展开 +

收起 -

学习笔记
网友NO.581311

详解django+django-celery+celery的整合实战

本篇文章主要是由于计划使用django写一个计划任务出来,可以定时的轮换值班人员名称或者定时执行脚本等功能,百度无数坑之后,终于可以凑合把这套东西部署上。本人英文不好,英文好或者希望深入学习或使用的人,建议去参考官方文档,而且本篇的记录不一定正确,仅仅实现crontab 的功能而已。 希望深入学习的人可以参考 http://docs.jinkan.org/docs/celery/ 。 首先简单介绍一下,Celery 是一个强大的分布式任务队列,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)。它的架构组成如下图 可以看到,Celery 主要包含以下几个模块: 任务模块 Task 包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往任务队列,而定时任务由 Celery Beat 进程周期性地将任务发往任务队列。 消息中间件 Broker Broker,即为任务调度队列,接收任务生产者发来的消息(即任务),将任务存入队列。Celery 本身不提供队列服务,官方推荐使用 RabbitMQ 和 Redis 等。 任务执行单元 Worker Worker 是执行任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执行它。 任务结果存储 Backend Backend 用于存储任务的执行结果,以供查询。同消息中间件……

网友NO.116149

Django中使用Celery的教程详解

Django教程 Python下有许多款不同的 Web 框架。Django是重量级选手中最有代表性的一位。许多成功的网站和APP都基于Django。 Django是一个开放源代码的Web应用框架,由Python写成。 Django遵守BSD版权,初次发布于2005年7月, 并于2008年9月发布了第一个正式版本1.0 。 Django采用了MVC的软件设计模式,即模型M,视图V和控制器C。 一、前言 Celery是一个基于python开发的分布式任务队列,如果不了解请阅读笔者上一篇博文Celery入门与进阶,而做python WEB开发最为流行的框架莫属Django,但是Django的请求处理过程都是同步的无法实现异步任务,若要实现异步任务处理需要通过其他方式(前端的一般解决方案是ajax操作),而后台Celery就是不错的选择。倘若一个用户在执行某些操作需要等待很久才返回,这大大降低了网站的吞吐量。下面将描述Django的请求处理大致流程(图片来源于网络): 请求过程简单说明:浏览器发起请求--请求处理--请求经过中间件--路由映射--视图处理业务逻辑--响应请求(template或response) 二、配置使用 celery很容易集成到Django框架中,当然如果想要实现定时任务的话还需要安装django-celery-beta插件,后面会说明。需要注意的是Celery4.0只支持Django版本=1.8的,如果是小于1.8版本需要使用Celery3.1。 配置 新建立项目taskproj,目录结构(每个……

网友NO.416955

django+xadmin+djcelery实现后台管理定时任务

继上一篇中间表的数据是动态的,图表展示的数据才比较准确。这里用到一个新的模块Djcelery,安装配置步骤如下: 1.安装 redis==2.10.6 celery==3.1.23 django-celery==3.1.17 flower==0.9.2 supervisor==3.3.4 flower用于监控定时任务,supervisor管理进程,可选 2.配置 settings.py中添加以下几行: #最顶头加上from __future__ import absolute_import# celery settingsimport djcelerydjcelery.setup_loader()BROKER_URL = 'redis://localhost:6379'# BROKER_URL = 'redis://:密码@主机地址:端口号/数据库号'CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' # 定时任务CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'CELERY_RESULT_BACKEND = 'redis://localhost:6379'CELERY_ACCEPT_CONTENT = ['application/json']CELERY_TASK_SERIALIZER = 'json'CELERY_RESULT_SERIALIZER = 'json'CELERYD_MAX_TASKS_PER_CHILD = 40CELERY_TIMEZONE = 'Asia/Shanghai'INSTALLED_APPS = [ 'djcelery',# 添加djcelery] 3.注册定时任务的几个表 from __future__ import absolute_import, unicode_literalsfrom djcelery.models import ( TaskState, WorkerState, PeriodicTask, IntervalSchedule, CrontabSchedule,)from xadmin.sites import sitesite.register(IntervalSchedule) # 存储循环任务设置的时间site.register(CrontabSchedule) # 存储定时任务设置的时间site.register(PeriodicTask) # 存储任务site.register(TaskState) # 存储任务执行状态site.register(WorkerState) # 存储执行任务的worker 4.主应用下添……

<
1
>

Copyright 2018-2019 xz577.com 码农之家

版权责任说明