如何详细解析Django中利用Celery实现定时任务的过程?

2026-05-27 02:592阅读0评论SEO基础
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计427个文字,预计阅读时间需要2分钟。

如何详细解析Django中利用Celery实现定时任务的过程?

1. 在项目同名目录下创建一个`celery.py`文件,内容如下:

pythonfrom __future__ import absolute_importimport osfrom celery import Celeryfrom datetime import timedeltafrom kombu import Queue

app=Celery('project_name')

设置Celery配置app.conf.update( result_backend='redis://localhost:6379/0', task_serializer='json', accept_content=['json'], result_serializer='json', timezone='UTC', enable_utc=True, task_queues=( Queue('default', exchange='default', routing_key='default'), ), task_routes={ 'tasks.my_task': {'queue': 'my_queue'}, }, task_time_limit=timedelta(seconds=30))

1.首先在项目同名目录下建一个celery.py

from __future__ import absolute_import import os from celery import Celery from datetime import timedelta from kombu import Queue # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'OpsManage.settings') from django.conf import settings app = Celery('OpsManage') # Using a string here means the worker will not have to # pickle the object when using Windows. # 配置celery class Config: BROKER_URL = 'amqp://guest:guest@localhost:5672//' CELERY_RESULT_BACKEND = 'redis://localhost:6379' CELERY_ACCEPT_CONTENT = ['application/json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_TASK_RESULT_EXPIRES = 60 * 60 CELERY_TIMEZONE = 'Asia/Shanghai' CELERY_ENABLE_UTC = True CELERY_ANNOTATIONS = {'*': {'rate_limit': '500/s'}} CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' app.config_from_object(Config) # 到各个APP里自动发现tasks.py文件 app.autodiscover_tasks() #crontab config app.conf.update( CELERYBEAT_SCHEDULE = { # 每隔30s执行一次函数 'every-30-min-add': { 'task': 'apps.tasks.celery_assets.push_host_by_salt_tasks', 'schedule': timedelta(seconds=30) # # 每天凌晨12点 # 'schedule': crontab(minute=0, hour=0) }, }, ) # kombu : Celery 自带的用来收发消息的库, 提供了符合 Python 语言习惯的, 使用 AMQP 协议的高级接口 Queue('transient', routing_key='transient',delivery_mode=1)

2.在settings.py里配置celery

INSTALLED_APPS = [ ...... 'django_celery_beat', 'django_celery_results', ]

3.在项目同名目录下的__init__.py文件里申明celery任务,记得要去检测呀

如何详细解析Django中利用Celery实现定时任务的过程?

# coding:utf-8 from __future__ import absolute_import, unicode_literals # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from celery import app as celery_app __all__ = ['celery_app'] import pymysql pymysql.install_as_MySQLdb()

4.在task.py里执行任务的函数上加@

from celery import task # 定时任务 @task def push_host_by_salt_tasks(): “”“balabala”“” return '这里是定时任务'

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持易盾网络。

本文共计427个文字,预计阅读时间需要2分钟。

如何详细解析Django中利用Celery实现定时任务的过程?

1. 在项目同名目录下创建一个`celery.py`文件,内容如下:

pythonfrom __future__ import absolute_importimport osfrom celery import Celeryfrom datetime import timedeltafrom kombu import Queue

app=Celery('project_name')

设置Celery配置app.conf.update( result_backend='redis://localhost:6379/0', task_serializer='json', accept_content=['json'], result_serializer='json', timezone='UTC', enable_utc=True, task_queues=( Queue('default', exchange='default', routing_key='default'), ), task_routes={ 'tasks.my_task': {'queue': 'my_queue'}, }, task_time_limit=timedelta(seconds=30))

1.首先在项目同名目录下建一个celery.py

from __future__ import absolute_import import os from celery import Celery from datetime import timedelta from kombu import Queue # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'OpsManage.settings') from django.conf import settings app = Celery('OpsManage') # Using a string here means the worker will not have to # pickle the object when using Windows. # 配置celery class Config: BROKER_URL = 'amqp://guest:guest@localhost:5672//' CELERY_RESULT_BACKEND = 'redis://localhost:6379' CELERY_ACCEPT_CONTENT = ['application/json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_TASK_RESULT_EXPIRES = 60 * 60 CELERY_TIMEZONE = 'Asia/Shanghai' CELERY_ENABLE_UTC = True CELERY_ANNOTATIONS = {'*': {'rate_limit': '500/s'}} CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' app.config_from_object(Config) # 到各个APP里自动发现tasks.py文件 app.autodiscover_tasks() #crontab config app.conf.update( CELERYBEAT_SCHEDULE = { # 每隔30s执行一次函数 'every-30-min-add': { 'task': 'apps.tasks.celery_assets.push_host_by_salt_tasks', 'schedule': timedelta(seconds=30) # # 每天凌晨12点 # 'schedule': crontab(minute=0, hour=0) }, }, ) # kombu : Celery 自带的用来收发消息的库, 提供了符合 Python 语言习惯的, 使用 AMQP 协议的高级接口 Queue('transient', routing_key='transient',delivery_mode=1)

2.在settings.py里配置celery

INSTALLED_APPS = [ ...... 'django_celery_beat', 'django_celery_results', ]

3.在项目同名目录下的__init__.py文件里申明celery任务,记得要去检测呀

如何详细解析Django中利用Celery实现定时任务的过程?

# coding:utf-8 from __future__ import absolute_import, unicode_literals # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from celery import app as celery_app __all__ = ['celery_app'] import pymysql pymysql.install_as_MySQLdb()

4.在task.py里执行任务的函数上加@

from celery import task # 定时任务 @task def push_host_by_salt_tasks(): “”“balabala”“” return '这里是定时任务'

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持易盾网络。