假如有一个文件,里面有 10 万个 url,需要对每个 url 发送 http 请求,并打印请求结果的状态码,如何编写代码尽可能快的完成这些任务呢?
Python 并发编程有很多方法,多线程的标准库 threading,concurrency,协程 asyncio,当然还有 grequests 这种异步库,每一个都可以实现上述需求,下面一一用代码实现一下,本文的代码可以直接运行,给你以后的并发编程作为参考:

队列+多线程

定义一个大小为 400 的队列,然后开启 200 个线程,每个线程都是不断的从队列中获取 url 并访问。
主线程读取文件中的 url 放入队列中,然后等待队列中所有的元素都被接收和处理完毕。代码如下:
from
 threading 
import
 Thread

import
 sys

from
 queue 
import
 Queue

import
 requests


concurrent = 
200


defdoWork():
whileTrue
:

        url = q.get()

        status, url = getStatus(url)

        doSomethingWithResult(status, url)

        q.task_done()



defgetStatus(ourl):
try
:

        res = requests.get(ourl)

return
 res.status_code, ourl

except
:

return"error"
, ourl



defdoSomethingWithResult(status, url):
    print(status, url)



q = Queue(concurrent * 
2
)

for
 i 
in
 range(concurrent):

    t = Thread(target=doWork)

    t.daemon = 
True
    t.start()


try
:

for
 url 
in
 open(
"urllist.txt"
):

        q.put(url.strip())

    q.join()

except
 KeyboardInterrupt:

    sys.exit(
1
)

运行结果如下:
有没有 get 到新技能?

线程池

如果你使用线程池,推荐使用更高级的 concurrent.futures 库:
import
 concurrent.futures

import
 requests


out = []

CONNECTIONS = 
100
TIMEOUT = 
5

urls = []

with
 open(
"urllist.txt"
as
 reader:

for
 url 
in
 reader:

        urls.append(url.strip())


defload_url(url, timeout):
    ans = requests.get(url, timeout=timeout)

return
 ans.status_code


with
 concurrent.futures.ThreadPoolExecutor(max_workers=CONNECTIONS) 
as
 executor:

    future_to_url = (executor.submit(load_url, url, TIMEOUT) 
for
 url 
in
 urls)

for
 future 
in
 concurrent.futures.as_completed(future_to_url):

try
:

            data = future.result()

except
 Exception 
as
 exc:

            data = str(type(exc))

finally
:

            out.append(data)

            print(data)

协程 + aiohttp

协程也是并发非常常用的工具了:
import
 asyncio

from
 aiohttp 
import
 ClientSession, ClientConnectorError


asyncdeffetch_html(url: str, session: ClientSession, **kwargs) -> tuple:
try
:

        resp = 
await
 session.request(method=
"GET"
, url=url, **kwargs)

except
 ClientConnectorError:

return
 (url, 
404
)

return
 (url, resp.status)


asyncdefmake_requests(urls: set, **kwargs) -> None:
asyncwith
 ClientSession() 
as
 session:

        tasks = []

for
 url 
in
 urls:

            tasks.append(

                fetch_html(url=url, session=session, **kwargs)

            )

        results = 
await
 asyncio.gather(*tasks)


for
 result 
in
 results:

        print(
f'{result[1]} - {str(result[0])}'
)


if
 __name__ == 
"__main__"
:

import
 sys

assert
 sys.version_info >= (
3
7
), 
"Script requires Python 3.7+."
with
 open(
"urllist.txt"
as
 infile:

        urls = set(map(str.strip, infile))

    asyncio.run(make_requests(urls=urls))

grequests[1]

这是个第三方库,目前有 3.8K 个星,就是 Requests + Gevent[2],让异步 http 请求变得更加简单。Gevent 的本质还是协程。
使用前:
pip install grequests

使用起来那是相当的简单:
import
 grequests


urls = []

with
 open(
"urllist.txt"
as
 reader:

for
 url 
in
 reader:

        urls.append(url.strip())


rs = (grequests.get(u) 
for
 u 
in
 urls)


for
 result 
in
 grequests.map(rs):

    print(result.status_code, result.url)

注意 grequests.map(rs) 是并发执行的。运行结果如下:
也可以加入异常处理:
>>> defexception_handler(request, exception):
... 
   print(
"Request failed"
)


>>> 
reqs = [

... 
   grequests.get(
'http://httpbin.org/delay/1'
, timeout=
0.001
),

... 
   grequests.get(
'http://fakedomain/'
),

... 
   grequests.get(
'http://httpbin.org/status/500'
)]

>>> 
grequests.map(reqs, exception_handler=exception_handler)

Request failed

Request failed

[
None
None
, <Response [
500
]>]

最后的话

今天分享了并发 http 请求的几种实现方式,有人说异步(协程)性能比多线程好,其实要分场景看的,没有一种方法适用所有的场景,笔者就曾做过一个实验,也是请求 url,当并发数量超过 500 时,协程明显变慢。
参考资料
[1] grequests: https://github.com/spyoungtech/grequests
[2] Gevent: http://www.gevent.org/
- EOF -
推荐阅读点击标题可跳转
关注「程序员的那些事」加星标,不错过圈内事
点赞和在看就是最大的支持❤️
继续阅读
阅读原文