首页   注册   登录
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python 学习手册
Python Cookbook
Python 基础教程
Python Sites
PyPI - Python Package Index
http://www.simple-is-better.com/
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
宝塔
V2EX  ›  Python

请教一个异步 asyncio 的问题

  •  
  •   meowoo · 349 天前 · 1738 次点击
    这是一个创建于 349 天前的主题,其中的信息可能已经有所发展或是发生改变。

    想写一个分析日志的工具,用 asyncio 处理的时候,想大家帮忙看一下哪里有问题,实际处理日志的时候,readlines 的时候,一个 650m 的文件需要 3 秒多,然后往字典里面加的时候需要 8 秒多,所以一个文件要 12 秒多,

    现在的效果是,多个文件消耗的时间就是 12*n,就是说并没有提升,不是说 await 的时候,会把当前执行的内容挂起,然后执行下一个任务么,可能是我哪里有问题,麻烦大家给看看,代码如下。

    import asyncio

    import re

    import time

    from pathlib import Path

    ip_find = re.compile(r'((25[0-5]|2[0-4]\d|((1\d{2})|([1-9]?\d))).){3}(25[0-5]|2[0-4]\d|((1\d{2})|([1-9]?\d)))')

    ip_database = dict()

    async def ip_address(log):

        """分析 ip"""
        with open(log) as f:
            log_data = f.readlines()
            try:
                for log_ips in log_data:
                    ip_search = ip_find.search(log_ips)
                    if ip_search:
                        ip_database[ip_search.group(0)] = ip_database.get(ip_search.group(0), 0) + 1
            except Exception as e:
                print(e)
    

    async def generator(log):

        await ip_address(log)
    

    if __name__ == '__main__':

        path = Path(r"D:\anlysis_log")
        start = time()
    
        loop = asyncio.get_event_loop()
        tasks = [generator(x) for x in path.iterdir()]
        loop.run_until_complete(asyncio.wait(tasks))
        loop.close()
    
    21 回复  |  直到 2018-12-05 13:46:58 +08:00
        1
    111111111111   349 天前 via Android
    asyncio 挂起的是 io 等待,可不包括正则啊
        2
    meowoo   349 天前
    @111111111111 log_data = f.readlines() 这个不是 io 等待么,读取文件啊,还是说要函数整体
        3
    sunwei0325   349 天前
    用 aiofiles 或者将读取文件的操作放到线程池 /进程池里面
        4
    wwwjfy   349 天前
    - asyncio 的任务是显式的,有 await 才会切回到 event loop,不是在 function 前面加 async 就行。可以试试 https://github.com/Tinche/aiofiles
    - 这里的瓶颈在磁盘 IO,asyncio 作用应该不会太明显;作用更大的地方是多个 IO 并行执行的时候,这里在读磁盘的时候就去那里执行 CPU 操作,另一个任务可能在等网络请求返回
        5
    AlisaDestiny   349 天前
    你没理解 1 楼的意思,他是说你的程序消耗的时间主要在正则查找这里,也就是这条语句:ip_search = ip_find.search(log_ips)。
        6
    meowoo   349 天前
    @sunwei0325 好的谢谢,我看下这个库
        7
    meowoo   349 天前
    @wwwjfy 我原来理解的是如果 await 的函数在坐 io 等待,就会切换 event loop,就是我在 ip_address 中等待读取文件,这个时间应该切换到下一个 event,等待读取后再继续操作,现在看来理解是有问题的,我去看看 aiofiles 库,多谢。
        8
    meowoo   349 天前
    @AlisaDestiny 但是上面文件读取也是在耗时啊,多少会有点儿提升的把
        9
    clearT   349 天前
    asyncio 好像不支持异步的读取文件,即使设置为非阻塞读取模式,所以是不会引起切换的。
    [Asyncio Wiki]( https://github.com/python/asyncio/wiki/ThirdParty#filesystem)
        10
    meowoo   349 天前
    @clearT 看到了 感觉不适合用 python 写了
        11
    congeec   348 天前
    @meowoo 问题不在异 asyncio 不支持异步文件读取。run_in_executor 之类的很容易把同步代码封装成异步代码。
    瓶颈在#4 @wwwjfy 说的磁盘 IO,你就一个磁盘。正则表达式那块儿可以用 concurrent.futures.ProcessPoolExecutor + asyncio.get_event_loop().run_in_executor() 解决,前提是你有多核处理器

    正则那儿可以用 pcre with jit 加速。换个语言也快不到哪儿去
        12
    no1xsyzy   348 天前
    正则写错了一处, "." 表示匹配任何字符,而不是 ip 地址中的分隔点。
        13
    meowoo   348 天前
    @congeec 现在用 aiofiles 是异步读取的文件,用 async for 去异步执行正则匹配的时候,print 的代码也看到是异步的,但是超级慢,比去掉 async 快了 15 倍 以上 ,现在我晕了已经,不知道咋回事。
        14
    meowoo   348 天前
    @no1xsyzy 没错的 前面有 r
        15
    congeec   348 天前
    @meowoo 贴代码啊
        16
    no1xsyzy   347 天前
    @meowoo 那么你连 r 是什么意思都没明白,`r`是拒绝 Python 字符串转义啊,并不会影响正则的意思。
    你要说 `r` 让 "." 不生效(具有正则表达式含义),那么 "\d" "(" ")" "{" "}" "[0-5]" 也不会生效啊。
    >>> import re
    >>> re.compile(r'((25[0-5]|2[0-4]\d|((1\d{2})|([1-9]?\d))).){3}(25[0-5]|2[0-4]\d|((1\d{2})|([1-9]?\d)))').search("3|3|3|
    3")
    <re.Match object; span=(0, 7), match='3|3|3|3'>
        17
    no1xsyzy   347 天前
    另外,不要做干净正则,做个脏正则就行了,
    ((?:\d{1,3}\.){3}\d{1,3})
    在反复出现没搜索到的情况(就是说 [不是] 一行单一个 IP 结束的情况)
        18
    meowoo   347 天前
    @no1xsyzy 受教了,是我想偏了,而日志中又没出现这种情况,让我以为没问题
        19
    meowoo   347 天前
    @congeec 就是把读取文件改成 aiofiles, for 改成 async for
    我看了 debug 之后发现,每次进入 async for 的时候,都会 wait,重新回到 await ip_address(log) 中,然后到 event loop 去看有没有其他等待的 event,然后导致每一行在循环的时候都去 await 一次,结果特别慢,我个人理解,不知道对不对,请指教,async for 不是这样用的么?

    代码如下

    `async def ip_address(log):`

    """分析 ip"""
    async with open(log) as f:
    try:
    async for log_ips in f:
    ip_search = ip_find.search(log_ips)
    if ip_search:
    ip_database[ip_search.group(0)] = ip_database.get(ip_search.group(0), 0) + 1
    except Exception as e:
    print(e)
        20
    congeec   347 天前
    @meowoo 加 async 并不会让你的同步代码变成异步
    好好了解一下异步风格里的”让出执行”权吧
    看看下面的文章。划重点: asyncio.gather(), futures, ProcessPoolExecutor。理解 futures 你就知道怎么用同步风格写异步代码
    https://blog.konpat.me/python-turn-sync-functions-to-async/
    https://docs.python.org/3/library/asyncio-eventloop.html#id14
        21
    meowoo   346 天前
    @congeec 好的我仔细看下 第一次用 有点儿懵
    关于   ·   FAQ   ·   API   ·   我们的愿景   ·   广告投放   ·   感谢   ·   实用小工具   ·   842 人在线   最高记录 5043   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.3 · 23ms · UTC 22:12 · PVG 06:12 · LAX 14:12 · JFK 17:12
    ♥ Do have faith in what you're doing.