V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
craftx
V2EX  ›  Python

有 N 个 asyncio task,要求并发处理,并发量不超过 k,有什么优雅的写法?

  •  
  •   craftx · 2023-05-30 16:57:01 +08:00 · 1544 次点击
    这是一个创建于 561 天前的主题,其中的信息可能已经有所发展或是发生改变。
    15 条回复    2023-06-05 13:36:20 +08:00
    wliansheng
        1
    wliansheng  
       2023-05-30 17:06:33 +08:00
    不知道,楼主可以“抛砖引玉”?
    NessajCN
        2
    NessajCN  
       2023-05-30 17:07:25 +08:00
    啥叫并发量不超过 k ?
    await asyncio.gather([小于 k 个 coro])
    你是说这样?
    seers
        3
    seers  
       2023-05-30 17:07:49 +08:00 via Android
    for 一下?
    alexsunxl
        4
    alexsunxl  
       2023-05-30 17:08:54 +08:00
    k 数量大小的线程池?
    完成一个 task 就回收到池子里。
    思路应该是比较简单的吧
    BBCCBB
        5
    BBCCBB  
       2023-05-30 17:09:49 +08:00
    asyncio 就是并发的?

    再加个 Semaphore. 来控制同时最大请求数.
    centralpark
        6
    centralpark  
       2023-05-30 17:10:16 +08:00
    asyncio.Semaphore 。不过这种问题适合问 ChatGPT 吧……
    Trim21
        7
    Trim21  
       2023-05-30 17:11:23 +08:00
    Semaphore
    jonathanchoo
        8
    jonathanchoo  
       2023-05-30 17:19:23 +08:00
    您可以使用 asyncio 的 asyncio.gather() 方法来实现这个功能。您可以将所有的 asyncio task 放在一个 list 中,然后在 asyncio.gather() 方法中指定 concurrency 参数为 k ,即可实现并发量不超过 k 的并发处理。

    以下是示例代码:

    ```python
    import asyncio

    async def task1():
    # Your code here

    async def task2():
    # Your code here

    # Put all tasks in a list
    tasks = [task1(), task2(), ...]

    async def main():
    # Use asyncio.gather to run tasks concurrently with a maximum concurrency of k
    await asyncio.gather(*tasks, return_exceptions=True, concurrency=k)

    # Run the main function
    asyncio.run(main())
    ```
    craftx
        9
    craftx  
    OP
       2023-05-30 18:05:04 +08:00
    @jonathanchoo
    你好。在官方最新文档中,asyncio.gather 没有 concurrency 这个参数。
    执行抛出异常:TypeError: gather() got an unexpected keyword argument 'concurrency'
    https://docs.python.org/3/library/asyncio-task.html
    iorilu
        10
    iorilu  
       2023-05-30 18:42:55 +08:00
    可以建立若干 worker ,N 个 worker 就控制并发 N 了把

    我记得有这个模式
    zzl22100048
        11
    zzl22100048  
       2023-05-31 09:02:43 +08:00
    创建一个
    sem = asyncio.Semaphore(k)

    async def task():
    async with sem:
    ....

    async def main():
    await asyncio.gather(*[ task() for _ in range(n) ])

    asyncio.run(main())



    如果更喜欢 pipeline 语法,可以用 aiostream
    from aiostream import pipe, stream
    async def main():
    await (
    stream.iterate(range(n))
    | pipe.map(task,task_limit=k)
    )
    ruanimal
        12
    ruanimal  
       2023-05-31 10:16:01 +08:00
    @craftx 他用 gpt 生成的答案吧,可能是 gpt 编的
    photon006
        13
    photon006  
       2023-05-31 11:13:32 +08:00
    node.js 有个 promise 库 bluebird ,map()方法可以传一个参数 concurrency 控制并发量。

    http://bluebirdjs.com/docs/api/promise.map.html
    zyxbcde
        14
    zyxbcde  
       2023-05-31 13:00:38 +08:00
    @craftx 我是习惯把任务列表切成若干个不大于 k 的子列表,然后顺序去 gather 这些子列表,每跑完一个子列表打印个进度,好歹知道自己跑到哪了吧。
    gather 里面从来就没有 concurrency 这个参数,这人用 gpt 生成个错误答案故意恶心人吧。
    craftx
        15
    craftx  
    OP
       2023-06-05 13:36:20 +08:00
    @zzl22100048 采用了 aiostream 的办法。谢谢
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1166 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 23:47 · PVG 07:47 · LAX 15:47 · JFK 18:47
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.