用celery、redis、mongodb在flask中实现异步任务和定时任务
Celery是Python开发的分布式任务调度模块,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, 举几个实例场景中可用的例子:
- 异步发送验证邮件
- 定时每晚统计报表
Celery 在执行任务时需要通过一个消息中间件(RabbitMQ或者Redis)来接收和发送任务消息,以及存储任务结果(RabbitMQ、Redis、MySQL、MongoDB等)
Celery有以下优点:
- 简单:一单熟悉了celery的工作流程后,配置和使用还是比较简单的
- 高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务
- 快速:一个单进程的celery每分钟可处理上百万个任务
- 灵活: 几乎celery的各个组件都可以被扩展及自定制
Celery基本工作流程图
下面开始介绍基本的使用方式
一、安装
这里采用Redis作为消息中间件,MongoDB作为任务结果存储
$ sudo pip install -U "celery[redis]"
$ sudo pip install celery # celery版本4.1或以上
二、创建异步任务
当前目录如下
proj/
| flask_app.py
| celery_task.py
2.1 celery的配置及简单任务
$ vim celery_task.py
from celery import Celery
# configs是自己定义的一字典,包含数据库的密码等等信息
class CeleryConfig:
# 任务消息队列
BROKER_URL = "redis://:{0}@{1}:{2}/{3}".format(
configs["redis"]["mydb"]["PASSWORD"],
configs["redis"]["mydb"]["HOST"],
configs["redis"]["mydb"]["PORT"],
configs["redis"]["mydb"]["DB_CELERY"],
)
# 序列化方式
CELERY_TASK_SERIALIZER = 'msgpack'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json', 'msgpack']
# 结果保存, 这里也可以继续保存到redis,但是断电结果会消失
CELERY_RESULT_BACKEND = 'mongodb://{0}:{1}/{2}'.format(
configs["mongodb"]["mydb"]["HOST"],
configs["mongodb"]["mydb"]["PORT"],
configs["mongodb"]["mydb"]["DB"],
)
CELERY_MONGODB_BACKEND_SETTINGS = {
"taskmeta_collection": "celery" # 表
# "user": ,
# "password":
}
# 完成一定数量后重启该worker
CELERY_WORKER_MAX_TASKS_PER_CHILD = 2000
CELERY_TIMEZONE = "Asia/Shanghai"
ctask = Celery("lawplatform")
ctask.config_from_object(CeleryConfig)
@ctask.task(name="test")
def add(x, y):
return x + y
2.1 在flask使用celery
$ vim flask_app.py
from flask import Flask
from celery_task import add
app = Flask(__name__)
@app.route("/test")
def test():
add.delay(3, 4)
return ''
2.3 运行flask和celery
$ python flask_app.py
$ celery worker -l INFO -A celery_tasks
2.4 测试
接下来访问flask就会触发add的异步任务curl 127.0.0.1:5000/test
三、定时任务
定时任务只需要在异步任务的基础上增加一点就可以
3.1 修改celery_task
$ vim celery_task.py
# 这里是两种不同的定时方式,crontab是基于时间(所以最好要先指定好时区),timedelta是基于时间周期
from celery.schedules import crontab
from datetime import timedelta
...
CeleryConfig.CELERYBEAT_SCHEDULE = {
'every-3am': {
'task': 'test', # 这里就是task的name属性
"schedule": crontab(minute=0, hour=3), # 每天的凌晨3点执行
"args": (1, 3) # 如果任务不需要参数,这里可以省略
},
"test": {
"task": 'test',
"schedule": timedelta(seconds=5),
"args": (1, 3)
}
}
ctask = Celery("lawplatform")
ctask.config_from_object(CeleryConfig)
3.2 执行
$ celery worker -l DEBUG -A celery_tasks -B # 多了一个-B的参数
celery的使用方式就是这么简单,其他还有更高级的方法(bind、retry等等)请参考官网
四、附录
4.1 The syntax of these crontab expressions are very flexible. Some examples:
Example | Meaning |
---|---|
crontab() | Execute every minute. |
crontab(minute=0, hour=0) | Execute daily at midnight. |
crontab(minute=0, hour='*/3' ) |
Execute every three hours: 3am, 6am, 9am, noon, 3pm, 6pm, 9pm. |
crontab(minute=0,hour=’0,3,6,9,12,15,18,21’) | Same as previous. |
crontab(minute='*/15' ) |
Execute every 15 minutes. |
crontab(day_of_week=’sunday’) | Execute every minute (!) at Sundays. |
crontab(minute='*' ,hour='*' , day_of_week=’sun’) |
Same as previous. |
crontab(minute='*/10' ,hour=’3,17,22’, day_of_week=’thu,fri’) |
Execute every ten minutes, but only between 3-4 am, 5-6 pm and 10-11 pm on Thursdays or Fridays. |
crontab(minute=0, hour='*/2,*/3' ) |
Execute every even hour, and every hour divisible by three. This means: at every hour except: 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm |
crontab(minute=0, hour='*/5' ) |
Execute hour divisible by 5. This means that it is triggered at 3pm, not 5pm (since 3pm equals the 24-hour clock value of “15”, which is divisible by 5). |
crontab(minute=0, hour='*/3,8-17' ) |
Execute every hour divisible by 3, and every hour during office hours (8am-5pm). |
crontab(day_of_month=’2’) | Execute on the second day of every month. |
crontab(day_of_month=’2-30/3’) | Execute on every even numbered day. |
crontab(day_of_month=’1-7,15-21’) | Execute on the first and third weeks of the month. |
crontab(day_of_month=’11’,month_of_year=’5’) | Execute on 11th of May every year. |
crontab(month_of_year='*/3' ) |
Execute on the first month of every quarter. |
参考
评论详情
共10条评论