一个是 asyncio run_forever (我希望它一直运行,维护一个队列)的协程函数, 用 celery 装饰成 task,这样做有效吗,代码像这样的:
import asyncio
improt celery
async def do_some_work(x):
print("Waiting " + str(x))
await asyncio.sleep(x)
def worker():
loop = asyncio.get_event_loop()
for _ in range(_concurrency):
loop.create_task(do_some_work(x))
loop.run_forever()
@app.task
def run(msg):
worker()
1
dingyaguang117 2019-12-09 09:35:57 +08:00
- - 这样做的意义是啥?
|
2
Harlaus OP @dingyaguang117 asyncio 的代码写好了,不想改
|
3
superrichman 2019-12-09 10:05:17 +08:00
没有试过 celery, 我用 apscheduler 的 AsyncIOScheduler 可以和 asyncio 一起使用
|
4
hustlibraco 2019-12-09 10:10:36 +08:00
run_forever 肯定不行,run_until_complete 应该可以,但是这样做很不好,还是不要偷懒了。python 代码改起来也不是很麻烦。
|
5
rogwan 2019-12-09 10:17:10 +08:00 via iPhone
程序是拿来解决问题的,不是拿来玩死自己的。
|
6
forrestshuang 2019-12-09 10:19:13 +08:00
celery 本身就是异步的
|
7
ClericPy 2019-12-09 10:19:47 +08:00
协程可以挂在任何线程上跑, 非主线程上跑事件循环没什么经验的话有一定可能出问题, 还是那句, 何必呢
把多个任务用 gather 合起来, run_forever 换成 run_until_complete 放里面跑吧, 有时候 running 的 Loop 还不让, 就得 new 一个 Loop |
8
Harlaus OP @hustlibraco 也不完全是偷懒的原因,我用 asyncio 和 concurrent.futures 写了一个 work with 队列的代码( https://gitee.com/Harlaus/pipenode/blob/master/pipenode/server.py)见笑了非常粗糙,我想用它集成我写的一些任务,同时我又不想放弃 celery,就酱
|
9
hustlibraco 2019-12-09 10:28:44 +08:00
@Harlaus 你可以把队列的部分去掉,替换成 celery,loop 只能有一个,然后去调度 celery 的 task。
|
10
Harlaus OP @ClericPy
‘把多个任务用 gather 合起来, run_forever 换成 run_until_complete 放里面跑吧, 有时候 running 的 Loop 还不让, 就得 new 一个 Loop' 现在已经是了 |
11
lolizeppelin 2019-12-09 15:22:52 +08:00
至少之前我看 kombu 代码的时候还不支持 asyncio
asyncio 就在大量库支持之前根本没法用,还不如老老实实 eventlet |
12
18620610600 2020-04-23 17:19:03 +08:00
celery 官方要 5.0 才支持 asyncio
我的是这么实现在 celery 中跑 async def 的 ``` import asyncio def run_async(coro): return asyncio.run(coro) @app.task def celery_task(*args, **kwargs): return run_async(async_func(*args, **kwargs)) async def async_func(*args, **kwargs): rv = await sub_func() # do sth return rv async def sub_func(): return 1 # Usage: def view(request): task = celery_task.delay(request) return Response({'task_id': task.id}) ``` |