这是一个被我修改过的是生产 /消费者模型,由于需要消费者返回数据,所以不得已用了两个 Queue,第二个用于把结果写回去,最后在 main 函数里取出返回,下面的代码在我的 python3.7 和 python3.8 环境都可以正常运行,先上代码,请大家过目。
import asyncio
import pandas as pd
import random
import re
async def crawler(u):
"""
模拟爬虫返回结果
:param u: fake url
:return:
"""
i = int(re.search(r"\d+", u).group(0))
await asyncio.sleep(random.random())
return {'url': u, 'result': i}
async def worker(qin, qout, w):
"""
消费者异步函数
:param qin: Queue1,用于生产者写入和消费者读出
:param qout: Queue2,用于让消费者回写结果
:param w: worker id
:return:
"""
while True:
if qin.empty():
break
u = await qin.get()
print(f"Worker-{w} crawling {u}")
resp = await crawler(u)
await qout.put(resp)
qout.task_done()
async def generate_url(url, qin):
"""
生产者异步函数
:param url:
:param qin: Queue1,写出生产者产出的(这里为传入的) url
:return:
"""
await qin.put(url)
print(f"Queue size = {qin.qsize()}")
# qin.task_done()
async def main(qmax=20):
q_in = asyncio.Queue(qmax)
q_out = asyncio.Queue()
urls = [f"url{i}" for i in range(100)]
producers = [asyncio.create_task(generate_url(u, q_in)) for u in urls]
# producers = [await q_in.put(u) for u in urls]
consumers = [asyncio.create_task(worker(q_in, q_out, i)) for i in range(1, qmax // 2 + 1)]
await asyncio.gather(*consumers)
await asyncio.gather(*producers)
# await q_in.join()
await q_out.join()
for c in consumers:
c.cancel()
return [await q_out.get() for _ in range(q_out.qsize())]
if __name__ == "__main__":
result = asyncio.run(main(30))
df = pd.DataFrame(result).set_index('url')
目前有三个问题,
1
princelai OP 没有大佬来指点下吗?
|
2
GoLand 2019-12-18 17:34:22 +08:00
queue 是多余的。直接:
urls = [f"url{i}" for i in range(100)] tasks = [crawler(url) for url in urls] results = await asyncio.gather(*tasks) 就可以。 |
3
princelai OP @GoLand #2 谢谢,不过你说这个我知道可以,我这么设计的目的我忘说了,因为 url 是本地生成的,所以会很快,如果一次性把 url 全部创建为 task,那么 gather 后会一次性创建非常多的链接链接目标网站,我怕网站受不了,也怕自己 IP 被封,所以才不得已使用生产 /消费者,用输入的 Queue 的最大容量限制爬取速度。
|
4
ClericPy 2019-12-18 17:53:42 +08:00
限频可以用 semphore 和 sleep, Queue 这么用有点怪
|
6
superrichman 2019-12-18 18:20:41 +08:00 via iPhone 2
@princelai 搜一下 aiohttp 和 async with semaphore 就知道怎么写了
|
7
ClericPy 2019-12-18 18:29:55 +08:00 1
@princelai #5 信号量可以粗暴的理解成并发锁, 类似 golang 里 channel 的那个数字, 也就是同时并发的个数, 只有申请到的人才有资格执行, 其他人等待, 楼上已经给你看了
你那个 producers 是想用 async for 么 |
8
xiaozizayang 2019-12-18 21:40:21 +08:00
|
9
gwy15 2019-12-18 22:48:59 +08:00 1
楼主你要的这个功能能抽象出来啊,没必要写这么复杂。
我写的库(不推荐,当时没发现成熟第三方库) https://github.com/gwy15/async_pool 调用方式: ``` with Pool(4) as pool: results = pool.map(fetch, urls) ``` 更好更全的第三方库: https://github.com/h2non/paco 调用方式: ``` responses = await paco.map(fetch, urls, limit=3) ``` |
10
princelai OP @gwy15 #9 感谢,这个库试了下,写出来很简洁,就是可能是我的 py 版本太高,在 pycharm 里有错误提示,但是稍微修改下可以正常运行。
```python import asyncio import random import re import paco async def crawler(u): i = int(re.search(r"\d+", u).group(0)) await asyncio.sleep(random.random() * 3) print(f"crawled {u}") return i async def main(): urls = [f"url{i}" for i in range(100)] gather = await paco.map(crawler, urls, limit=20) return gather if __name__ == "__main__": result = asyncio.run(main()) ``` |
11
princelai OP @ClericPy #7
@superrichman #6 感谢二位,用信号量的代码写出来了,比原来好很多 ```python import asyncio import random import re async def crawler(u, sem): async with sem: i = int(re.search(r"\d+", u).group(0)) await asyncio.sleep(random.random() * 5) print(f"crawled {u}") return i async def main(): sem = asyncio.Semaphore(20) urls = [f"url{i}" for i in range(100)] tasks = [crawler(u, sem) for u in urls] gather = await asyncio.gather(*tasks) return gather if __name__ == "__main__": result = asyncio.run(main()) ``` |
12
princelai OP 不支持 markdown 吗,格式全乱了
|
13
yedashuai 2019-12-19 12:59:15 +08:00
@princelai queue 的使用是不是也有一点好处,可以限制内存的使用量,如果数据量很大,所有的 tasks 都存在 list 里
|
15
sxd96 2019-12-21 10:01:40 +08:00 via iPhone
@princelai 想问下如果爬虫 async 的话,requests 支持嘛?好像是要换用 httpx 或者 aiohttp ?这俩哪个比较好用?
|
16
princelai OP @sxd96 如果所有都并发开始了在那就不是生成器,就已经在内存中运行了,我一般都用官方的 aiohttp,没用过另一个
|
17
sxd96 2019-12-21 21:17:59 +08:00
@princelai 哦哦是这样啊。那我的需求如果是从数据库里拿 url 出来给 crawler,也就是说那边 coroutine 在跑,然后生产者在产生新的 url,是不是还是得用 asyncio.Queue ?
|