Python消息队列:django+celery+kombu,环境:django1.10,python3.5安装文件:kombu-3.0.37.tar.gz,celery-3.1.25,billiard-3.3.0.23,按照python库的安装方式安装即可。
from celery import Celery import os # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myqueue.settings') from django.conf import settings # noqa app = Celery('myqueue') # Using a string here means the worker will not have to # pickle the object when using Windows. app.config_from_object('django.conf:settings') app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request))
2 在prj/prj/settings.py文件中添加代码:
import djcelery djcelery.setup_loader() TIME_ZONE = 'Asia/Shanghai' BROKER_URL = "django://" CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' CELERY_TIMEZONE = TIME_ZONE
3 修改在prj/prj/settings.py文件中INSTALLED_APPS配置:
INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'zzh', 'djcelery', 'kombu.transport.django' ]
4 在prj/app中添加tasks文件:
import os from celery import Celery from django.conf import settings os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myqueue.settings') app = Celery('tasks', backend='amqp', broker='django://') app.config_from_object('django.conf:settings') app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) @app.task def yunceh(): print (1111111)
5 修改prj/app中的admin.py文件
from django.contrib import admin from kombu.transport.django import models as kombu_models admin.site.register(kombu_models.Message) # Register your models here.
6 在prj/中添加2个bat文件taskBeat.bat和taskWorker.bat,用来分发和运行消息队列
taskBeat.bat:
obj = self.model._default_manager.get(pk=self.model.pk)
to
Model = type(self.model) obj = Model._default_manager.get(pk=self.model.pk)