flask利用flask-celery实现异步任务

利用celery实现异步ssh任务

以下内容适用于有一定flask和celery基础的同学,如果还不知道他们是什么,请戳这里 Flask中文文档 以及 celery 以及 flask-celery.

直奔主题,先看个最简单的例子,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import paramiko
from app import celery
@celery.task() #: 通过celery.task来把某个函数修饰成异步任务
def run_base_ssh_task(ip, cmd, user='root', password='root'):
'''通过paramiko库远程SSH连接某台主机,执行命令,返回结果'''
try:
ssh = paramiko.SSHClient() #: 创建对象
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) #: 允许连接不在know_hosts文件中的主机
ssh.connect(ipaddr, 22, user, password) #: 通过指定 IP,端口,用户名,密码 进行连接
stdin, stdout, stderr = ssh.exec_command(cmd) #: 执行SSH命令
result_status = stdout.channel.recv_exit_status() #: 获取$?的值
return {
'result': True if result_status == 0 else False, #: 如果 $? == 0 ,则表示执行成功
#: 返回标准输出和错误输出内容
'detail': "##### 标准输出 #####\n%s##### 错误输出 #####\n%s" % \
(stdout.read().decode(), stderr.read().decode())
}
except Exception as e:
#: 返回false,并返回异常信息
return {
'result': False,
'detail': str(e),
}

通过以下代码就可以把你想执行的SSH任务丢进任务队列,让消费者去执行即可.

1
2
3
4
ip = '192.168.1.100' #: 主机IP地址
cmd = 'hostname' #: 输出主机名
#: 利用apply_async函数异步执行任务
run_base_ssh_task.apply_async(args=[ip, cmd])


那么问题来了!!! 打印出来的结果,我该去哪里查看?

在你什么都没有做的情况下,你只能从:
1.你的celery日志里面可以看到, 以下是我打印出来的日志.

1
2
3
4
5
6
7
8
9
10
11
12
[2017-03-18 18:58:35,511: INFO/PoolWorker-2] Connected (version 2.0, client OpenSSH_5.3)
[2017-03-18 18:58:35,859: INFO/PoolWorker-2] Authentication (publickey) failed.
[2017-03-18 18:58:35,946: INFO/PoolWorker-2] Authentication (publickey) failed.
[2017-03-18 18:58:36,078: INFO/PoolWorker-2] Authentication (password) successful!
[2017-03-18 18:58:36,251: WARNING/PoolWorker-2] True
[2017-03-18 18:58:36,252: WARNING/PoolWorker-2] ##### 标准输出 #####
centos138
##### 错误输出 #####
[2017-03-18 18:58:36,266: INFO/PoolWorker-2] Task app.vmctl.models.run_base_ssh_task[2ce5bcde-437d-4adb-be5a-74bedd5bea06] succeeded in 0.8271931620001851s: {'detail': '##### 标准输出 #####
centos138
##### 错误输出 #####
', 'result': True}

2.从你所使用的消息队列(我使用的是redis)中, 通过你的任务ID来查询。

疑问: 如何知道你的任务ID?

1
2
3
#: 酱紫获取你的任务ID
job = run_base_ssh_task.apply_async(args=[ip, cmd])
print("我的任务ID是:%s" % job.id)

然后利用redis客户端,进入到你的redis,通过你刚刚的任务ID来查询相应的结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
$ redis-cli
127.0.0.1:6379>
127.0.0.1:6379> keys *
1) "celery-task-meta-5f058533-228c-48ef-a2bc-9eb17e05fcde"
2) "celery-task-meta-bef55fdb-2687-447c-ba4c-9330fc9a867e"
3) "_kombu.binding.celery"
4) "celery-task-meta-16a6b563-83b7-4761-aa3d-7430a1e30a7d"
5) "_kombu.binding.celery.pidbox"
6) "celery-task-meta-2ce5bcde-437d-4adb-be5a-74bedd5bea06"
7) "_kombu.binding.celeryev"
8) "celery-task-meta-b127149b-fb10-4c9a-b709-793fd2f80a2b"
9) "celery-task-meta-dbecb59e-dd38-4ea3-869d-d2f47c3051db"
10) "unacked_mutex"
127.0.0.1:6379> get celery-task-meta-2ce5bcde-437d-4adb-be5a-74bedd5bea06
"{\"task_id\": \"2ce5bcde-437d-4adb-be5a-74bedd5bea06\", \"status\": \"SUCCESS\", \"children\": [], \"result\": {\"detail\": \"##### \\u6807\\u51c6\\u8f93\\u51fa #####\\ncentos138\\n##### \\u9519\\u8bef\\u8f93\\u51fa #####\\n\", \"result\": true}, \"traceback\": null}"

好,相信大家也不会用这么蹩脚的方法去查看结果.
让我们来看一下其他的解决方法.


方法:异步记录数据库

一般情况下,我们会想办法把日志记录到数据库中,以便日后的查询.
让我们来看下面一段改进过的代码.我们新建了一个表TaskLog用来保存任务的日志记录.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import paramiko
from app import celery, db
class TaskLog(db.Model):
'''用于记录任务日志'''
__tablename__ = 'task_log'
# 主键
id = db.Column(db.Integer, primary_key=True)
# task id
task_id = db.Column(db.String(50), unique=True, nullable=False)
# 正常输出日志
stdout_log_context = db.Column(db.String(1000))
# 错误输出日志
stderr_log_context = db.Column(db.String(1000))
def __str__(self):
return self.task_id
@celery.task(bind=True) #: 通过开启bind=True, 可以传递一个self变量到你的函数中
def run_base_ssh_task(self, ip, cmd, user='root', password='root'):
'''通过paramiko库远程SSH连接某台主机,执行命令,返回结果'''
'''添加一段代码, Tasklog类是用来存储task异步任务的日志记录'''
tasklog = TaskLog()
tasklog.task_id = self.id
try:
ssh = paramiko.SSHClient() #: 创建对象
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) #: 允许连接不在know_hosts文件中的主机
ssh.connect(ipaddr, 22, user, password) #: 通过指定 IP,端口,用户名,密码 进行连接
stdin, stdout, stderr = ssh.exec_command(cmd) #: 执行SSH命令
result_status = stdout.channel.recv_exit_status() #: 获取$?的值
'''记录任务日志'''
stdout_output = stdout.read().decode()
stderr_output = stderr.read().decode()
tasklog.stdout_log_context = stdout_output
tasklog.stderr_log_context = stderr_output
db.session.add(tasklog)
db.session.commit()
return {
'result': True if result_status == 0 else False, #: 如果 $? == 0 ,则表示执行成功
#: 返回标准输出和错误输出内容
'detail': "##### 标准输出 #####\n%s##### 错误输出 #####\n%s" % \
(stdout_output, stderr_output)
}
except Exception as e:
db.session.rollback()
#: 返回false,并返回异常信息
return {
'result': False,
'detail': str(e),
}

我们在任务中,创建一个日志记录对象,用来保存任务执行的结果,并保存进数据库.
但是上面的代码要想运行成功,必须在celery消费者启动的代码中,推入flask应用上下文
因为数据库的连接信息保存在flask app的应用上下文中.
所以你才能在celery的程序中,使用db.session进行数据库操作.

1
2
3
4
5
6
# celery.py
from app import create_app, celery
app = create_app()
# 推入一个 application context,celery后续所有操作都会在这个环境里执行,直到进程退出。
app.app_context().push()

目前这个方法的确能够实现我们记录日志的功能,利用任务ID作为唯一的标志.
任务在执行中,连接数据库.把相应的日志结果记录到数据库中去.

但是这样有个瑕疵,我们必须在每个异步任务中,加入对日志记录的代码
有什么方法能够让每个任务在创建的时候,自动记录日志对象.任务完成的时候,自动相应的结果吗?
接下来我们可能需要对我们的代码再进行一点改造.


重载CELERY回调函数

废话少说,我们直接看代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
'''我们自定义一个task类型,继承了原生的Task'''
class Base_ssh_task(celery.Task):
'''首先我们重载了apply_async函数,这个函数在每次需要把任务丢进消息队里中去执行的时候,都会需要调用的函数.'''
def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
link=None, link_error=None, shadow=None, **options):
#: 我们可以在每次执行任务的时候指定一个task id,如果没有指定,则自己随机生成一个唯一的ID.
task_id = task_id or uuid()
try:
#: 创建一个日志记录对象.
task = TaskLog(task_id=task_id)
db.session.add(task)
db.session.commit()
except Exception as e:
raise Exception("start Base_ssh_task failed, cause by:%s" % e)
return super().apply_async(args=args, kwargs=kwargs, task_id=task_id, producer=producer,
link=link, link_error=link_error, shadow=shadow, **options)
'''然后我们重载了on_success这个回调函数,这个函数会在每次任务执行完成的时候,进行回调.'''
def on_success(self, retval, task_id, args, kwargs):
try:
#: 通过task id找到这个日志记录对象,并把相应的结果记录下来.
task = TaskLog.query.filter_by(task_id=task_id).first()
if task:
task.result = retval.get('result', False)
task.detail = retval.get('detail', '没有detail字段')
db.session.add(task)
db.session.commit()
else:
raise Exception('can not update task result.. Not found task_id %s' % task_id)
except Exception as e:
# logging the error
print(e)
return super().on_success(retval, task_id, args, kwargs)
'''同样我们可以重载on_failure和on_retry回调函数,实现我们的相关功能'''
def on_failure(self, exc, task_id, args, kwargs, einfo):
......
return super().on_failure(exc, task_id, args, kwargs, einfo)
def on_retry(self, exc, task_id, args, kwargs, einfo):
......
return super().on_retry(exc, task_id, args, kwargs, einfo)
'''在自定义任务的时候,只需要base=Base_ssh_task,以自定义的任务为模板就可以了.不需要在每个任务中都加上日志记
录的逻辑了.该任务模板会自定创建日志记录对象,并且在任务完成或者失败等状态自动写入相应的日志.
'''
@celery.task(bind=True, base=Base_ssh_task)
def run_base_ssh_task(self, ip, cmd, user='root', password='root'):
try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(ip, 22, user, password)
stdin, stdout, stderr = ssh.exec_command(cmd)
result_status = stdout.channel.recv_exit_status()
return {
'result': True if result_status == 0 else False,
'detail': "##### 标准输出 #####\n%s##### 错误输出 #####\n%s" % (stdout.read().decode(), stderr.read().decode()),
}
except Exception as e:
return {
'result': False,
'detail': str(e),
}
task = run_base_ssh_task.apply_async(args=['192.168.1.100', 'hostname'])

代码中,我们自定义了一个任务模板,模板继承了原生的Task类, 并对相应的任务调用函数 以及相关回调函数进行了重载.
使其实现我们需要重复调用的功能.

相关TASK的回调函数,参考官方文档celery-handlers.

以上便是如果在异步任务中进行日志记录的功能.大家可以试一试.
上述代码仅供参考,直接复制下来可能无法正确运行,请大家自行根据环境改写.

不过在此我也提出几个问题.

  1. 任务消费者如果与flask app不在一个机器上,将要注意那些地方
  2. 利用消费者直接连接数据库,写入日志.这种行为是否合理
  3. 是否利用celery的任务链,或者利用rpc远程调用来实现日志记录功能会更加合理.

以上问题后续更新.