利用celery实现异步ssh任务
以下内容适用于有一定flask和celery基础的同学,如果还不知道他们是什么,请戳这里 Flask中文文档 以及 celery 以及 flask-celery.
直奔主题,先看个最简单的例子,
通过以下代码就可以把你想执行的SSH任务丢进任务队列,让消费者去执行即可.
那么问题来了!!! 打印出来的结果,我该去哪里查看?
在你什么都没有做的情况下,你只能从:
1.你的celery日志里面可以看到, 以下是我打印出来的日志.
2.从你所使用的消息队列(我使用的是redis)中, 通过你的任务ID来查询。
疑问: 如何知道你的任务ID?
123 #: 酱紫获取你的任务IDjob = run_base_ssh_task.apply_async(args=[ip, cmd])print("我的任务ID是:%s" % job.id)
然后利用redis客户端,进入到你的redis,通过你刚刚的任务ID来查询相应的结果
好,相信大家也不会用这么蹩脚的方法去查看结果.
让我们来看一下其他的解决方法.
方法:异步记录数据库
一般情况下,我们会想办法把日志记录到数据库中,以便日后的查询.
让我们来看下面一段改进过的代码.我们新建了一个表TaskLog用来保存任务的日志记录.
我们在任务中,创建一个日志记录对象,用来保存任务执行的结果,并保存进数据库.
但是上面的代码要想运行成功,必须在celery消费者启动的代码中,推入flask应用上下文
因为数据库的连接信息保存在flask app的应用上下文中.
所以你才能在celery的程序中,使用db.session进行数据库操作.
目前这个方法的确能够实现我们记录日志的功能,利用任务ID作为唯一的标志.
任务在执行中,连接数据库.把相应的日志结果记录到数据库中去.
但是这样有个瑕疵,我们必须在每个异步任务中,加入对日志记录的代码
有什么方法能够让每个任务在创建的时候,自动记录日志对象.任务完成的时候,自动相应的结果吗?
接下来我们可能需要对我们的代码再进行一点改造.
重载CELERY回调函数
废话少说,我们直接看代码
代码中,我们自定义了一个任务模板,模板继承了原生的Task类, 并对相应的任务调用函数 以及相关回调函数进行了重载.
使其实现我们需要重复调用的功能.
相关TASK的回调函数,参考官方文档celery-handlers.
以上便是如果在异步任务中进行日志记录的功能.大家可以试一试.
上述代码仅供参考,直接复制下来可能无法正确运行,请大家自行根据环境改写.
不过在此我也提出几个问题.
- 任务消费者如果与flask app不在一个机器上,将要注意那些地方
- 利用消费者直接连接数据库,写入日志.这种行为是否合理
- 是否利用celery的任务链,或者利用rpc远程调用来实现日志记录功能会更加合理.
以上问题后续更新.