使用带线程的请求

版本 0.4.0 中的新增功能。

该工具带提供了一个简单的 API,用于将请求与线程一起使用。

已记录请求会话为线程安全,但仍有一些边缘情况并非完全线程安全。使用会话的最佳方式是每个线程使用一个会话。

工具带提供的实现是天真的。这意味着我们每个线程使用一个会话,并且我们不努力同步属性(例如身份验证、cookie 等)。这也意味着我们不会尝试将请求定向到已处理对同一域的请求的会话。换句话说,如果您向多个域发出请求,工具带的池不会尝试将请求发送到同一域的同一线程。

此模块提供三个类

在 98% 的情况下,您只需要使用 Pool,并且您会将 ThreadResponse 当作常规 requests.Response 来对待。

这是一个示例

# This example assumes Python 3
import queue
from requests_toolbelt.threaded import pool

jobs = queue.Queue()
urls = [
    # My list of URLs to get
]

for url in urls:
    jobs.put({'method': 'GET', 'url': url})

p = pool.Pool(job_queue=jobs)
p.join_all()

for response in p.responses():
    print('GET {}. Returned {}.'.format(response.request_kwargs['url'],
                                        response.status_code))

这显然有点令人失望。这就是为什么有一个快捷类方法来从 URL 列表中创建 Pool 的原因。

from requests_toolbelt.threaded import pool

urls = [
    # My list of URLs to get
]

p = pool.Pool.from_urls(urls)
p.join_all()

for response in p.responses():
    print('GET {}. Returned {}.'.format(response.request_kwargs['url'],
                                        response.status_code))

如果您的列表中的某个 URL 抛出异常,则可以从 exceptions() 生成器中访问该异常。

from requests_toolbelt.threaded import pool

urls = [
    # My list of URLs to get
]

p = pool.Pool.from_urls(urls)
p.join_all()

for exc in p.exceptions():
    print('GET {}. Raised {}.'.format(exc.request_kwargs['url'],
                                      exc.message))

如果您希望重试已引发的异常,则可以执行以下操作

from requests_toolbelt.threaded import pool

urls = [
    # My list of URLs to get
]

p = pool.Pool.from_urls(urls)
p.join_all()

new_pool = pool.Pool.from_exceptions(p.exceptions())
new_pool.join_all()

并非所有请求都建议在不检查是否应该重试的情况下重试。您通常会检查是否要重试。

Pool 对象采用 4 个其他关键字参数

  • 初始化程序

    这是一个回调,它将在创建的每个会话上初始化内容。回调必须返回会话。

  • auth_generator

    这是一个回调,在初始化程序回调修改会话之后调用。此回调还必须返回会话。

  • num_processes

    通过传递一个正整数来指示要使用多少个线程。默认情况下为 None,并将使用 multiproccessing.cpu_count() 的结果。

  • 会话

    您可以传递备用构造函数或任何返回类似 requests.Sesssion 对象的可调用对象。它不会传递任何参数,因为 requests.Session 不接受任何参数。

最后,如果您不想担心队列或池管理,可以尝试以下操作

from requests_toolbelt import threaded

requests = [{
    'method': 'GET',
    'url': 'https://httpbin.org/get',
    # ...
}, {
    # ...
}, {
    # ...
}]

responses_generator, exceptions_generator = threaded.map(requests)
for response in responses_generator:
   # Do something

API 和模块自动生成文档

此模块提供 requests_toolbelt.threaded 的 API。

该模块提供了一个简洁明了的 API,用于通过线程池发出请求。线程池将使用会话来提高性能。

一个简单的用例是

from requests_toolbelt import threaded

urls_to_get = [{
    'url': 'https://api.github.com/users/sigmavirus24',
    'method': 'GET',
}, {
    'url': 'https://api.github.com/repos/requests/toolbelt',
    'method': 'GET',
}, {
    'url': 'https://google.com',
    'method': 'GET',
}]
responses, errors = threaded.map(urls_to_get)

默认情况下,threaded 子模块将检测您的计算机有多少个 CPU,如果未选择其他进程数,则使用该数。要更改此设置,请始终使用关键字参数 num_processes。使用上述示例,我们将对其进行如下扩展

responses, errors = threaded.map(urls_to_get, num_processes=10)

您还可以通过创建回调函数来定制 requests.Session 的初始化方式

from requests_toolbelt import user_agent

def initialize_session(session):
    session.headers['User-Agent'] = user_agent('my-scraper', '0.1')
    session.headers['Accept'] = 'application/json'

responses, errors = threaded.map(urls_to_get,
                                 initializer=initialize_session)
requests_toolbelt.threaded.map(requests, **kwargs)

线程池对象的简单接口。

此函数采用表示使用线程中的会话进行请求的字典列表,并返回一个元组,其中第一项是成功响应的生成器,第二项是异常的生成器。

参数:
  • requests (list) – 表示使用池对象进行请求的字典的集合。

  • **kwargs – 传递给 Pool 对象的关键字参数。

返回:

池中的响应和异常的元组

返回类型:

(ThreadResponse, ThreadException)

灵感明显来自标准库的多处理库。请参阅以下参考

class requests_toolbelt.threaded.pool.Pool(job_queue, initializer=None, auth_generator=None, num_processes=None, session=<class 'requests.sessions.Session'>)

管理包含会话的线程的池。

参数:
  • job_queue (queue.Queue) – 预计您要使用并向其中添加项的队列。

  • initializer (collections.Callable) – 用于初始化 session 实例的函数。

  • auth_generator (collections.Callable) – 用于为会话生成新的身份验证凭据的函数。

  • num_processes (int) – 要创建的线程数。

  • session (requests.Session) –

exceptions()

遍历池中的所有异常。

返回:

ThreadException 的生成器

classmethod from_exceptions(exceptions, **kwargs)

ThreadException 创建 Pool

提供一个提供 ThreadException 对象的可迭代对象,此类方法将生成一个新池,以重试导致异常的请求。

参数:
  • exceptions (可迭代对象) – 返回 ThreadException 的可迭代对象

  • kwargs – 传递给 Pool 初始化器的关键字参数。

返回:

已初始化的 Pool 对象。

返回类型:

classmethod from_urls(urls, request_kwargs=None, **kwargs)

从 URL 的可迭代对象创建 Pool

参数:
返回:

已初始化的 Pool 对象。

返回类型:

get_exception()

从池中获取异常。

返回类型:

线程异常

get_response()

从池中获取响应。

返回类型:

线程响应

join_all()

将所有线程加入主线程。

responses()

遍历池中的所有响应。

返回:

生成器 ThreadResponse

class requests_toolbelt.threaded.pool.ThreadResponse(request_kwargs, response)

requests Response 对象的包装器。

这将代理大多数属性访问操作到 Response 对象。例如,如果您想要从响应中解析 JSON,您可能这样做

thread_response = pool.get_response()
json = thread_response.json()
request_kwargs

提供给队列的原始关键字参数

response

包装的响应

class requests_toolbelt.threaded.pool.ThreadException(request_kwargs, exception)

请求期间引发的异常的包装器。

这将代理大多数属性访问操作到异常对象。例如,如果您想要从异常中获取消息,您可能这样做

thread_exc = pool.get_exception()
msg = thread_exc.message
exception

捕获并包装的异常

request_kwargs

提供给队列的原始关键字参数