- Published on
scrapy treq http pipeline
- Authors
- Name
- Pony Ma
1. 介绍
scrapy是一个非常优秀的爬虫框架,当我们写一个http pipeline的时候,我们通常会使用requests库来发送http请求。
但是requests是同步的,如果我们想要实现异步的requests pipeline,大概率会这样做:
import requests
from scrapy.exceptions import NotConfigured
from twisted.internet import threads
class HttpPipeline:
def __init__(self, crawler):
self.crawler = crawler
self.settings = crawler.settings
self.url = self.settings.get('HTTP_PIPELINE_URL')
if not self.url:
raise NotConfigured
@classmethod
def from_crawler(cls, crawler):
return cls(crawler)
def process_item(self, item, spider):
threads.deferToThread(self._process_request, item)
return item
def _process_request(self, item):
resp = requests.post(self.url, json=item)
return resp.json()
但其实有一个更好的选择,那就是treq。
treq是twisted的一个异步http客户端,它的api和requests非常相似,但是它是异步的,可以使用它来实现异步的pipeline。
2. 安装
pip install treq
pip install pytest-twisted
3. 使用
写一个简单的pipeline
import treq
from twisted.internet import defer
from scrapy.exceptions import NotConfigured
class HttpPipeline:
def __init__(self, crawler):
self.crawler = crawler
self.settings = crawler.settings
self.url = self.settings.get('HTTP_PIPELINE_URL')
if not self.url:
raise NotConfigured
@classmethod
def from_crawler(cls, crawler):
return cls(crawler)
@defer.inlineCallbacks
def process_item(self, item, spider):
yield self._process_request(item, spider)
defer.returnValue(item)
@defer.inlineCallbacks
def _process_request(self, item, spider):
resp = yield treq.post(self.url, json=item)
result = yield resp.json()
defer.returnValue(result)
写对应的测试
from unittest.mock import Mock
import pytest
from scrapy.spiders import Spider
from scrapy.utils.test import get_crawler
from scrapy.utils.test import get_project_settings
from pytest_twisted import ensureDeferred
import HttpPipeline
class TestHttpPipeline:
@pytest.fixture
def pipeline(self):
settings = get_project_settings()
crawler = get_crawler(Spider, settings)
pipeline = HttpPipeline.from_crawler(crawler)
return pipeline
@pytest.fixture
def item(self):
return {
'key': 'value'
}
@pytest.fixture
def treq_mock(self, mocker):
requests_mock = Mock()
response_mock = Mock()
mocker.patch('treq', requests_mock)
requests_mock.post.side_effect = [response_mock]
response_mock.text.return_value = {"data": "uuid1"}
response_mock.code = 200
return requests_mock, response_mock
@ensureDeferred
async def test_process_item(self, pipeline, item, treq_mock):
result = await pipeline.process_item(item, Spider('test'))
assert result == {"data": "uuid1"}
使用treq写一个scrapy http pipeline非常简单,而且treq的api和requests非常相似,如果你熟悉requests,那么你会很快上手treq。