V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
xhatt510
V2EX  ›  程序员

求助!关于流程引擎框架和执行自定义用户代码

  •  
  •   xhatt510 · 31 天前 · 1193 次点击

    公司有个项目,主要需求就是 能够执行用户上传的插件(就是 pyton2 代码),并且按照一定的流程执行,然后要支持集群模式。

    先说问题:

    1. 写插件的人调试困难,基本上属于黑盒了
    2. 改插件代码之后再重新执行,特别的麻烦
    3. 我还需要写检查代码语法的函数(虽然利用 ast 模块已经实现了),比如:必须定义 main 函数,并且仅有一个入参。但是使用中还是有其他的一些问题,比如一些我没考虑到的情况出现。就会导致后面执行的时候报错或者根本无法载入进去。
    4. 插件可能会出现内存泄露的情况,导致服务区器存吃满。别人每次都会直接找我们,虽然是因为插件导致的。可能需要能自定监测插件是否出现内存泄露的办法。

    具体是:

    有个 web 界面上传插件,然后在界面上面配置几个插件执行。这个任务会按照设置好的周期执行。

    插件的流程是具体的 为:查询数据--处理数据--发送数据

    查询到的数据每次大概有 1000 条,占用 10 兆左右的内存,每天大概有 7000 万条数据
    现在我的做法:
    1. 用户上传插件之后,各个机器收到 kafka 消息,然后用importlib将插件动态导入进内存中 以便后面调用
    2. 单独有个进程去执行查询数据的脚本
    3. 将查询到的数据通过 kafka 发送至处理数据的进程
    4. 处理数据的进程收到 kafka 消息 根据消息内要执行的处理数据插件发送数据插件 来分别按照流程调用插件
    5. 最后需要将最后一步的数据写进 ES ,并且通过 kafka 进行通知回调
    上面为什么要用 importlib 载入到内存中,是因为一开始尝试用命令行直接 python xxx.py 执行插件。但是这个调用行为属于高频操作,每天大概要调用几百万次。每次重新开启 python 虚拟机,速度根本就跟不上。所以后来改成直接载入到内存,通过函数调用。

    求助大佬们,有没有开源的框架能够胜任这种情况的?已经尽可能精简了,希望大佬们留下一点点意见。

    11 条回复    2024-03-27 20:21:59 +08:00
    musi
        1
    musi  
       31 天前
    打开一下思路
    引入 faas ,想稳定就云厂商,想自建就找开源的
    然后就变成了两个服务之间的交互 rpc/http 看喜好
    xhatt510
        2
    xhatt510  
    OP
       31 天前
    @musi 多谢大佬留言,我先去看看 faas 。
    然后我们这个要给客户线下物理机部署。不能上云
    Dongxiaohao
        3
    Dongxiaohao  
       31 天前
    没写过你这种流程引擎,但是有类似的户上传 Python 算法,给服务端去执行;
    Python 那边起了一个 Flask 的 webserver ,提供 deploy 和 uninstall 和调用算法 predict 方法的接口。
    大致流程就是 web 服务器部署算法之后,Java 把算法文件传给 Flask ,Flask 动态加载这个 Python 算法,存在内存中。
    然后等待 Java 调用 predict 的请求就行了。
    要和算法编写人员约定好算法文件的部分格式,比如主类名称,和方法名,不能乱写。
    foolishcrab
        4
    foolishcrab  
       31 天前 via iPhone
    你看一下 perfect hq
    SmiteChow
        5
    SmiteChow  
       31 天前
    核心问题:动态代码应该动态编译执行,而不是存为文件。以下是实战中的代码片段,供参考
    ```
    code = compile(python_code, run_file_path, 'exec')
    space = globals()
    space['__builtins__'].update({
    'asql_runtime': self.runtime,
    'asql_types': data_types,
    'asql_stdlib': stdlib,
    })
    func = FunctionType(code.co_consts[0], space)
    ```

    其他的问题:
    1. 调试,可以提供命令行 sdk
    2. 动态编译执行已解决
    3. 具体问题具体分析,语法检查目前实现方式没问题
    4. 单独开进程去跑才能控制开销,一下是实战中的代码片段,供参考
    ```
    recv_end, send_end = multiprocessing.Pipe(False)

    process = multiprocessing.Process(target=self.execute_python_method, args=(send_end, func_name, *args))
    process.start()

    bag = {'process': process, 'timeout': False}
    timer = threading.Timer(self.configure.hook_method_max_execute_time, self.terminate_python_method, args=(bag,))
    timer.start()

    process.join()

    # 超时结束
    if bag['timeout']:
    raise HookMethodExecuteTimeOut(func_name)

    # 在规定时间内结束
    timer.cancel()
    result, ok = recv_end.recv()
    ```
    xhatt510
        6
    xhatt510  
    OP
       31 天前
    @SmiteChow 感谢大佬留言
    xhatt510
        7
    xhatt510  
    OP
       31 天前
    @foolishcrab 好的,我去看看
    xhatt510
        8
    xhatt510  
    OP
       31 天前
    @Dongxiaohao 估计和我这个差不多的实现方式
    imaple
        9
    imaple  
       31 天前
    听着好像 xxljob 就能解决, 定时调度自定义 python 代码
    xhatt510
        10
    xhatt510  
    OP
       31 天前
    @imaple 多谢大佬,我去看看
    ryulxy
        11
    ryulxy  
       31 天前
    我做过的事情和这个有点像,是 C++开发的引擎里嵌入 Python 虚拟机载入用户编写的 Python 脚本执行,可以动态载入 Python 脚本,脚本修改后可以热重载不用重启,然后捕获 Python 脚本编译阶段的报错不运行,和游戏引擎脚本差不多
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   2512 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 28ms · UTC 15:47 · PVG 23:47 · LAX 08:47 · JFK 11:47
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.