使用带线程的请求¶
版本 0.4.0 中的新增功能。
该工具带提供了一个简单的 API,用于将请求与线程一起使用。
已记录请求会话为线程安全,但仍有一些边缘情况并非完全线程安全。使用会话的最佳方式是每个线程使用一个会话。
工具带提供的实现是天真的。这意味着我们每个线程使用一个会话,并且我们不努力同步属性(例如身份验证、cookie 等)。这也意味着我们不会尝试将请求定向到已处理对同一域的请求的会话。换句话说,如果您向多个域发出请求,工具带的池不会尝试将请求发送到同一域的同一线程。
此模块提供三个类
在 98% 的情况下,您只需要使用 Pool
,并且您会将 ThreadResponse
当作常规 requests.Response
来对待。
这是一个示例
# This example assumes Python 3
import queue
from requests_toolbelt.threaded import pool
jobs = queue.Queue()
urls = [
# My list of URLs to get
]
for url in urls:
jobs.put({'method': 'GET', 'url': url})
p = pool.Pool(job_queue=jobs)
p.join_all()
for response in p.responses():
print('GET {}. Returned {}.'.format(response.request_kwargs['url'],
response.status_code))
这显然有点令人失望。这就是为什么有一个快捷类方法来从 URL 列表中创建 Pool
的原因。
from requests_toolbelt.threaded import pool
urls = [
# My list of URLs to get
]
p = pool.Pool.from_urls(urls)
p.join_all()
for response in p.responses():
print('GET {}. Returned {}.'.format(response.request_kwargs['url'],
response.status_code))
如果您的列表中的某个 URL 抛出异常,则可以从 exceptions()
生成器中访问该异常。
from requests_toolbelt.threaded import pool
urls = [
# My list of URLs to get
]
p = pool.Pool.from_urls(urls)
p.join_all()
for exc in p.exceptions():
print('GET {}. Raised {}.'.format(exc.request_kwargs['url'],
exc.message))
如果您希望重试已引发的异常,则可以执行以下操作
from requests_toolbelt.threaded import pool
urls = [
# My list of URLs to get
]
p = pool.Pool.from_urls(urls)
p.join_all()
new_pool = pool.Pool.from_exceptions(p.exceptions())
new_pool.join_all()
并非所有请求都建议在不检查是否应该重试的情况下重试。您通常会检查是否要重试。
Pool
对象采用 4 个其他关键字参数
初始化程序
这是一个回调,它将在创建的每个会话上初始化内容。回调必须返回会话。
auth_generator
这是一个回调,在初始化程序回调修改会话之后调用。此回调还必须返回会话。
num_processes
通过传递一个正整数来指示要使用多少个线程。默认情况下为
None
,并将使用multiproccessing.cpu_count()
的结果。会话
您可以传递备用构造函数或任何返回类似
requests.Sesssion
对象的可调用对象。它不会传递任何参数,因为requests.Session
不接受任何参数。
最后,如果您不想担心队列或池管理,可以尝试以下操作
from requests_toolbelt import threaded
requests = [{
'method': 'GET',
'url': 'https://httpbin.org/get',
# ...
}, {
# ...
}, {
# ...
}]
responses_generator, exceptions_generator = threaded.map(requests)
for response in responses_generator:
# Do something
API 和模块自动生成文档¶
此模块提供 requests_toolbelt.threaded
的 API。
该模块提供了一个简洁明了的 API,用于通过线程池发出请求。线程池将使用会话来提高性能。
一个简单的用例是
from requests_toolbelt import threaded
urls_to_get = [{
'url': 'https://api.github.com/users/sigmavirus24',
'method': 'GET',
}, {
'url': 'https://api.github.com/repos/requests/toolbelt',
'method': 'GET',
}, {
'url': 'https://google.com',
'method': 'GET',
}]
responses, errors = threaded.map(urls_to_get)
默认情况下,threaded 子模块将检测您的计算机有多少个 CPU,如果未选择其他进程数,则使用该数。要更改此设置,请始终使用关键字参数 num_processes
。使用上述示例,我们将对其进行如下扩展
responses, errors = threaded.map(urls_to_get, num_processes=10)
您还可以通过创建回调函数来定制 requests.Session
的初始化方式
from requests_toolbelt import user_agent
def initialize_session(session):
session.headers['User-Agent'] = user_agent('my-scraper', '0.1')
session.headers['Accept'] = 'application/json'
responses, errors = threaded.map(urls_to_get,
initializer=initialize_session)
- requests_toolbelt.threaded.map(requests, **kwargs)¶
线程池对象的简单接口。
此函数采用表示使用线程中的会话进行请求的字典列表,并返回一个元组,其中第一项是成功响应的生成器,第二项是异常的生成器。
- 参数:
- 返回:
池中的响应和异常的元组
- 返回类型:
灵感明显来自标准库的多处理库。请参阅以下参考
- class requests_toolbelt.threaded.pool.Pool(job_queue, initializer=None, auth_generator=None, num_processes=None, session=<class 'requests.sessions.Session'>)¶
管理包含会话的线程的池。
- 参数:
job_queue (queue.Queue) – 预计您要使用并向其中添加项的队列。
initializer (collections.Callable) – 用于初始化
session
实例的函数。auth_generator (collections.Callable) – 用于为会话生成新的身份验证凭据的函数。
num_processes (int) – 要创建的线程数。
session (requests.Session) –
- exceptions()¶
遍历池中的所有异常。
- 返回:
ThreadException
的生成器
- classmethod from_exceptions(exceptions, **kwargs)¶
从
ThreadException
创建Pool
提供一个提供
ThreadException
对象的可迭代对象,此类方法将生成一个新池,以重试导致异常的请求。- 参数:
exceptions (可迭代对象) – 返回
ThreadException
的可迭代对象kwargs – 传递给
Pool
初始化器的关键字参数。
- 返回:
已初始化的
Pool
对象。- 返回类型:
- classmethod from_urls(urls, request_kwargs=None, **kwargs)¶
从 URL 的可迭代对象创建
Pool
- 参数:
urls (可迭代对象) – 返回 URL 的可迭代对象,我们用它来创建一个池。
request_kwargs (字典) – 要提供给请求方法的其他关键字参数的字典。
kwargs – 传递给
Pool
初始化器的关键字参数。
- 返回:
已初始化的
Pool
对象。- 返回类型:
- join_all()¶
将所有线程加入主线程。
- responses()¶
遍历池中的所有响应。
- 返回:
生成器
ThreadResponse