V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
lolizeppelin
V2EX  ›  Python

taskflow 使用指南

  •  1
     
  •   lolizeppelin · 2017-09-06 19:02:04 +08:00 · 6972 次点击
    这是一个创建于 2643 天前的主题,其中的信息可能已经有所发展或是发生改变。

    taskflow 是 openstack 未确保冗长的系列操作能正确执行的工作流框架

    主要应用场景有如 Cinder 的 create volume 这般复杂、冗长、容易失败, 却又要求保持数据与环境一致的业务逻辑.
    

    由于适用场景非常多,例如游戏行业的常见操作

    停服 备份数据库 升级数据库  升级应用 启动服务器
    

    非常适合用工作流引擎来确保执行

    这个由 openstack 编写的 taskflow 国内相关介绍非常少,官方用户指南内容太多,导致学习这个框架花费需要花费非常多的时间

    这里将会较为详细的介绍 taskflow 的代码和使用

    百度有一篇中文的相关介绍,作者对 taskflow 还是很熟的,但是写得太简要,直接看感觉明白了实际还是一头雾水

    要看懂 taskflow,需要先熟悉两个最基本的概念

    1. 工作流 这玩意最直观的一个应用就是 oa 里审批,上面我们说的游戏业的常见操作也可以作为例子

      停服 备份数据库 升级数据库 升级应用 启动服务器

      上述操作也是一个工作流, 停服成功才能备份,备份成功才能升级 升级应用和备份可以同时进行,升级数据库和升级应用都成功了才能启动服务器 升级失败还要回滚

    2. 有限状态机 推举导读. 工作流就是用状态机来实现流程控制和执行任务的

    上面两条光看懂了还不行,还要写过一轮深入了解,taskflow 使用了 openstack 的通用状态机项目,所以要能看 taskflow 先要熟悉 openstack 的状态机项目 Automaton

    https://pypi.python.org/pypi/automaton/

    为了熟悉状态机项目,我用状态机写了一个tailfile项目,作用就是实现 tail -f 的效果,倒读 N 行和判断文件是否变化都用到了状态机,

    因为状态和事件都比较少,用状态机写这些功能有点杀鸡用牛刀,主要是为了熟悉状态机才专门用 automaton 来写

    熟悉玩 automaton 以后,可以开始来看 taskflow(基于最新版本 2.14)了,

    首先,因为 taskflow 作为通用项目而不是 openstack 内部项目,所以 taskflow 没有使用 oslo 中的一些通用工具

    而且里面同样为了兼容写了一些复杂的接口还用了 futurist 实现异步,我为了和自己的其他几个项目统一起来,基于 taskflow2.14 生成了一个简化过的项目

    https://github.com/lolizeppelin/simpleflow

    代码变化了几个如下部分

    1. 日志用回 oslo_log 的方式
    2. 删除了 sqlalchemy 以外的存储方式,table 的和所有 sql expression 都改为 orm 方式(只支持 mysql ),后来我熟悉 taskflow 以后发现删除其他 backends 只保留 mysql 不是一个正确做法,有缺陷,后续会说明为什么
    3. 因为删除了 sqlalchemy 以外的存储方式, 所以 persitence 中 backends 就么有存在的必要了,直接传 sqlalchemy 的 session 即可
    4. persitence 中的内容和 storage 合并
    5. jobs 和 conductors 删除,这个部分可以和主要的 taskflow 功能无关
    6. Executor 不使用 futurist,自己写了一个简化的 futurist,代码兼容原来的写法,只支持协程,删除多线程和多进程相关支持
    7. 读写锁业复制了过来,用协程实现
    8. 砍掉了 worker based 的 Engine

    因为极大的简化了 futurist 和删掉了 backends 层所以比较好读一些,读代码的时候建议看我简化后的项目

    我们先来看 taskflow 的几个主要单位

    1. Engine

      Engine 是 taskflow 是启动口, 主要工作 创建状态机, 循环状态机, 在指定状态下通过 Executor 执行任务

      Engine 分为好几种 work based 的 Engine 比较特殊我们不看,直接看 action engine

      几种 action engine 其实没有什么区别,通过 Executor 分为

      1. 序列化(阻塞,顺序执行)引擎
      2. 基于线程的并行引擎
      3. 基于协程的并行引擎
      4. 基于多进程的并行引擎

      并行引擎的优势是,当个任务没有顺序关联的情况下可以同时执行多个任务 当然,引擎不影响任务之间的顺序关系,除非你想强制一个一个任务执行,否则都应该使用并行引擎

      Executor 的实现可以参考我简化过的futurist 因为是用 eventlet 实现的,所以需要熟悉 eventlet

    2. Atom 明天继续

    8 条回复    2017-09-29 18:16:00 +08:00
    qq583708076
        1
    qq583708076  
       2017-09-06 20:15:52 +08:00
    lolizeppelin
        2
    lolizeppelin  
    OP
       2017-09-07 11:09:07 +08:00
    原来超过一定时间不能编辑的....

    2. Executor

    前面说了,Engine 会通过 Executor 执行任务,因为如果 Engine 直接执行任务的话,整个状态机的循环会受到正在执行的任务的影响,所以包了一层 Executor 来执行具体的任务(当然具体代码里对 Executor 的应用会更复杂一点,为了扩展和异常处理包了 3 层)

    在 taskflow 的源代码中 Executor 是通过 futurist 库来实现的,而 futurist 又是基于 futures 的,这个库内部实现还是比较复杂的,如果没用过对应库的,建议直接参考我简化的[futurist]( https://github.com/lolizeppelin/simpleutil/blob/master/simpleutil/utils/futurist.py),因为是用 eventlet 实现的,所以需要熟悉 eventlet.

    具体的任务代码(比如备份数据库什么的)在一般情况下可以不处理异常,因为执行任务的代码通过 except Exception 捕获了任务的所有异常.

    特殊异常就是 CancelledError,这个异常是调度到已经取消任务时由 futurist 抛出,在读代码的时候需注意下这个的特殊处理

    3. Scheduler

    这个没什么好说的,Executor 的封装的最上层,最后执行会落实到具体的 Executor 上

    4. storage

    这个是存储接口,后面说到 flow 的时候会详细讲到,storage 的初始化在 Engine 中,一个功能是数据存储的接口,一个功能是作为 flow 的外层封装

    4. Runtime 与 machine

    在看这个之前,如果你还不熟悉状态机,建议先拿前面说的 automaton 练练手,如果已经熟悉状态机但是还没看过 automaton 代码的,建议去看看 automaton 的代码

    machine 就是 Engine 中循环的(automaton)状态机了,一个 engine 只运行一个状态机,初始化代码在 builder.MachineBuilder,MachineBuilder 又是在 Runtime 中调用生成 machine 的,我们先别管 Runtime,先理解一下 taskflow 的状态机

    taskflow 状态机并不复杂,但还不熟悉 taskflow 的时候很容易被高懵.因为 taskflow 用到 networkx 这个图库,而状态机其实就是一个有向图,所以一开始看的时候,很容易以为 taskflow 的状态机会非常复杂要看懂图的相关代码才能搞明白,但实际情况是

    taskflow 的状态机和图无关!因为 taskflow 状态机的状态很少不需要用图来解决状态循环

    那么 taskflow 为什么要用到图库呢,在解决这个疑问前我们先抛开 taskflow,自己用状态机设计一个解决前面——"停服 备份数据库 升级数据库 升级应用 启动服务器" 的工作流

    1. 首先定义停服状态和对应停服状态执行的代码
    2. 定义停服成功的返回,失败的返回,定义进入停服状态的 event (这个是起始时间,event 就是 start )
    3. 定义备份数据库状态对应备份执行代码
    4. 定义进入备份状态的 event (前面的停服成功)
    5. 定义备份成功和备份失败的返回,到目前还简单,备份失败大不了多备份几次直到成功,失败了整个状态机终止都可以影响不大
    6. 定义升级数据库状态对应备份执行代码
    7. 定义进入升级状态的 event (前面的备份成功)
    8. 定义升级成功和备份失败的返回,这里开始坑了,升级失败要回滚了
    9. 发现少了回滚升级失败的状态定义.....增加升级失败回滚失败的定义
    ......
    回滚升级数据库失败....升级应用是失败...回滚升级应用是失败.....启动失败

    设计下来你发现没几个步骤。要定义的状态就越来越多...这也就是状态机复杂以后和图有关的原因了

    taskflow 非常巧妙的避免了复杂化状态机,taskflow 的设计的状态机可以简单的理解只处理 2 个状态就好

    开始....找到任务-执行任务-找到任务....执行任务...终止

    执行任务就是调用 Executor, 至于找下一个任务的工作,就是封装了图库的 flow 的工作了.这样设计状态机状态就很少,具体的状态可以看 MachineBuilder 的注释中有对应表格,对应状态目前粗看一下即可,知道哪个状态是找任务、哪个状态执行任务就可,有些状态涉要看了后面的 retry 相关才比较好理解,至于 flow,这个我们在后面说明

    回头来看 Runtime,MachineBuilder 是由 Runtime 生成的,状态机的有些 callback 最终执行的 Runtime 中的函数,里面会有一些嵌套和封装, Scheduler 的封装就在 Runtime 中,Runtime 可以简单理解为状态机调用其它注入 Scheduler、storage 接口调用的中间件,Runtime 在整体理解 taskflow 的的时候可以不用细看

    第一篇完...请看下一篇介绍 flow atom task retry
    lolizeppelin
        3
    lolizeppelin  
    OP
       2017-09-07 11:15:06 +08:00
    好别扭....有点文字问题都不能修正....不想发了....
    revol
        4
    revol  
       2017-09-07 14:19:29 +08:00
    赞,可以发到自己的博客
    zhujinhe
        6
    zhujinhe  
       2017-09-29 17:52:43 +08:00
    class CallJoe(task.Task):
    def execute(self, joe_number, *args, **kwargs):
    print("CallJoe args", args)
    print("CallJoe kwargs", kwargs)
    print("Calling joe %s." % joe_number)
    这种定义的类, 想实现类似:
    t = CallJoe()
    t.execute('13911111111', "foo", "bar", baz="value_baz", qux="value_qux")
    找了好久也没摸索出怎么把这个 非关键字变长参数传进去. 还请楼主指教.
    lolizeppelin
        7
    lolizeppelin  
    OP
       2017-09-29 18:14:14 +08:00 via Android
    不行

    为什么非要变长呢 打包到 list 或者 dict 里不好嘛
    lolizeppelin
        8
    lolizeppelin  
    OP
       2017-09-29 18:16:00 +08:00 via Android
    具体你看反射出参数的部分怎么实现的就知道为什么不行了
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   950 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 23ms · UTC 22:38 · PVG 06:38 · LAX 14:38 · JFK 17:38
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.