Published on

scrapy treq http pipeline

Authors
  • avatar
    Name
    Pony Ma
    Twitter

1. 介绍

scrapy是一个非常优秀的爬虫框架,当我们写一个http pipeline的时候,我们通常会使用requests库来发送http请求。

但是requests是同步的,如果我们想要实现异步的requests pipeline,大概率会这样做:

import requests
from scrapy.exceptions import NotConfigured
from twisted.internet import threads

class HttpPipeline:

	def __init__(self, crawler):
		self.crawler = crawler
		self.settings = crawler.settings
		self.url = self.settings.get('HTTP_PIPELINE_URL')
		if not self.url:
			raise NotConfigured

	@classmethod
	def from_crawler(cls, crawler):
		return cls(crawler)

	def process_item(self, item, spider):
		threads.deferToThread(self._process_request, item)
		return item

	def _process_request(self, item):
		resp = requests.post(self.url, json=item)
		return resp.json()

但其实有一个更好的选择,那就是treq。

treq是twisted的一个异步http客户端,它的api和requests非常相似,但是它是异步的,可以使用它来实现异步的pipeline。

2. 安装

pip install treq
pip install pytest-twisted

3. 使用

写一个简单的pipeline

import treq
from twisted.internet import defer
from scrapy.exceptions import NotConfigured

class HttpPipeline:

	def __init__(self, crawler):
		self.crawler = crawler
		self.settings = crawler.settings
		self.url = self.settings.get('HTTP_PIPELINE_URL')
		if not self.url:
			raise NotConfigured

	@classmethod
	def from_crawler(cls, crawler):
		return cls(crawler)

	@defer.inlineCallbacks
	def process_item(self, item, spider):
		yield self._process_request(item, spider)
		defer.returnValue(item)

	@defer.inlineCallbacks
	def _process_request(self, item, spider):
		resp = yield treq.post(self.url, json=item)
		result = yield resp.json()
		defer.returnValue(result)

写对应的测试

from unittest.mock import Mock

import pytest
from scrapy.spiders import Spider
from scrapy.utils.test import get_crawler
from scrapy.utils.test import get_project_settings
from pytest_twisted import ensureDeferred
import HttpPipeline

class TestHttpPipeline:

	@pytest.fixture
	def pipeline(self):
		settings = get_project_settings()
		crawler = get_crawler(Spider, settings)
		pipeline = HttpPipeline.from_crawler(crawler)
		return pipeline

	@pytest.fixture
	def item(self):
		return {
			'key': 'value'
		}

	@pytest.fixture
	def treq_mock(self, mocker):
		requests_mock = Mock()
		response_mock = Mock()
		mocker.patch('treq', requests_mock)
		requests_mock.post.side_effect = [response_mock]
		response_mock.text.return_value = {"data": "uuid1"}
		response_mock.code = 200
		return requests_mock, response_mock

	@ensureDeferred
	async def test_process_item(self, pipeline, item, treq_mock):
		result = await pipeline.process_item(item, Spider('test'))
		assert result == {"data": "uuid1"}

使用treq写一个scrapy http pipeline非常简单,而且treq的api和requests非常相似,如果你熟悉requests,那么你会很快上手treq。

4. 参考