V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
Nitroethane
V2EX  ›  Python

asyncio 中 loop.run_forever() 方法导致 100% CPU usage

  •  
  •   Nitroethane · 2022-04-07 16:51:46 +08:00 · 2692 次点击
    这是一个创建于 956 天前的主题,其中的信息可能已经有所发展或是发生改变。

    python 版本:3.10.1 或 3.10.2
    代码:

    def main():
        log_listener = setup_logging(log_filename)
    
        e = asyncio.Event()
        consumer = Consumer(e)
    
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
        for sig in signals:
            loop.add_signal_handler(
                sig, lambda s=sig: asyncio.create_task(shutdown(s, loop, consumer))
            )
    
        tasks = consumer.run()
        try:
            for name, task in tasks.items():
                loop.create_task(task, name=name)
            loop.run_forever()
        finally:
            loop.close()
            log_listener.stop()
    

    consumer.run() 方法会返回一个 Dict[str, Coroutine] 类型的字典。最初以为是自己的 coroutine 实现有问题导致 high CPU 。然后将 tasks 中的 coroutine 一个个移除,最后 tasks 返回空的情况下也是 100% CPU 。
    用 cProfile 看了下:

       Ordered by: cumulative time
       List reduced from 3038 to 10 due to restriction <10>
    
       ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        423/1    0.005    0.000   44.862   44.862 {built-in method builtins.exec}
            1    0.000    0.000   44.862   44.862 myscript.py:1(<module>)
            1    0.000    0.000   44.219   44.219 myscript.py:54(main)
            1    0.000    0.000   44.017   44.017 /home/xxx/.pyenv/versions/3.10.1/lib/python3.10/asyncio/base_events.py:582(run_forever)
            3    0.000    0.000   44.016   14.672 /home/xxx/.pyenv/versions/3.10.1/lib/python3.10/asyncio/base_events.py:1806(_run_once)
            3    0.000    0.000   43.983   14.661 /home/xxx/.pyenv/versions/3.10.1/lib/python3.10/selectors.py:452(select)
            3   43.983   14.661   43.983   14.661 {method 'poll' of 'select.epoll' objects}
        484/4    0.006    0.000    0.642    0.161 <frozen importlib._bootstrap>:1022(_find_and_load)
        483/4    0.003    0.000    0.642    0.161 <frozen importlib._bootstrap>:987(_find_and_load_unlocked)
        447/5    0.003    0.000    0.641    0.128 <frozen importlib._bootstrap>:664(_load_unlocked)
    

    有 v 友遇到类似情况的吗?还是说我的用法有问题。

    第 1 条附言  ·  2022-04-07 17:31:25 +08:00
        def _start_kafka_client(self) -> None:
            logging.debug(f"[_start_kafka_client] id(event)={id(self._event)}")
            i = 0
            try:
                while not self._event.is_set():
                    msg_packs = self._kafka_client.poll(
                        timeout_ms=1000,
                        max_records=5000,
                    )
                    if not msg_packs:
                        continue
                    # msgs is of type list containerd with ConsumerRecords
                    tp: kafka.TopicPartition
                    msgs: List[ConsumerRecord]
                    for tp, msgs in msg_packs.items():
                        self._data_queue.put_nowait(msgs)
                        i += len(msgs)
                        if i % 1000 == 0:
                            logging.info(f"count of msgs: {i}")
            except asyncio.QueueFull:
                logging.debug("data queue is full")
                time.sleep(1)
            except Exception as e:
                logging.error(f"error: {e}")
            finally:
                logging.debug("_start_kafka_client terminates")
    
    
    第 2 条附言  ·  2022-04-07 17:31:42 +08:00
    ``` python
    async def import_data(self):
    logging.debug(f"id(event)={id(self._event)}")
    while not self._event.is_set():
    try:
    if not self._template_created:
    await asyncio.sleep(1)

    entry: List[ConsumerRecord] = self._data_queue.get_nowait()
    entries = [
    each_entry
    for each in entry
    for each_entry in self.reconstruct_entries_list(each.value)
    ]

    await self._es_client.bulk(
    index=self.get_index_name(), operations=entries
    )
    except asyncio.QueueEmpty:
    await asyncio.sleep(8)
    continue
    except Exception as e:
    logging.error(f"[*] error: {e}")
    except asyncio.CancelledError:
    logging.debug("import_data is cancelled")
    break
    logging.debug("import_data terminates")

    def run(self) -> Dict[str, Coroutine]:
    return {
    "template_task": self._check_template(),
    "kafka_task": asyncio.to_thread(self._start_kafka_client),
    "es_task": self.import_data(),
    "monitor_task": self.monitor(),
    }

    async def monitor(self):
    try:
    while not self._event.is_set():
    tasks = asyncio.all_tasks()
    for task in tasks:
    print(f"[*] task {task.get_name()} done? {task.done()}")
    await asyncio.sleep(8)
    finally:
    logging.debug("monitor finished")
    ```
    8 条回复    2022-04-12 23:35:44 +08:00
    netcan
        1
    netcan  
       2022-04-07 17:20:05 +08:00
    tasks 为空的话,`loop.run_forever()`直接就返回了,建议上完整代码,例如 Consumer
    Nitroethane
        2
    Nitroethane  
    OP
       2022-04-07 17:32:18 +08:00
    @netcan #1 已添加。就是从 kafka 接收数据,解析之后写到 es 里
    makerbi
        3
    makerbi  
       2022-04-07 18:24:09 +08:00
    应该是 while 里没有 sleep 的原因吧
    time.sleep(0.1)也好过完全没有 sleep
    jenlors
        4
    jenlors  
       2022-04-07 18:32:26 +08:00
    从 kafka 读取消息的时候 block 设置为 True ,直接阻塞循环
    Nitroethane
        5
    Nitroethane  
    OP
       2022-04-07 19:41:39 +08:00
    @makerbi #3 是读 kafka 数据的那个 while 循环吗?我以为用 poll() 方法的时候设置个 1s 的 timeout 也算 sleep 。我试试加个 sleep 有没有效果。

    @jenlors #4 之前不直接阻塞的原因是不知道怎样在终止脚本运行的时候优雅地退出。今天发现在阻塞的情况下应该是可以通过捕获 CanceledError 来实现优雅退出 coroutine 。多谢回复,我试试
    Richard14
        6
    Richard14  
       2022-04-10 07:35:22 +08:00
    问题太长,且缺乏最小实现,1L 代码里很多不明实现的东西,实在是不想看。而且问题给人感觉很像 AB 问题
    Nitroethane
        7
    Nitroethane  
    OP
       2022-04-10 10:28:50 +08:00
    @Richard14 #6 我寻思也没强制你看呀😁,不想看就别回复呀 :)
    lolizeppelin
        8
    lolizeppelin  
       2022-04-12 23:35:44 +08:00
    不建议 sleep 0.1 ,sleep 0.001 都不适合。
    一般来说都是通过监听事件 fd 来实现 sleep 的同时能及时响应
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   3294 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 00:39 · PVG 08:39 · LAX 16:39 · JFK 19:39
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.