celery笔记 jieba RPC 服务 http2 vs http1 python发送邮件 gitbook 笔记 docker运行 pyppeteer 百度/腾讯 ocr 试用 页面元素选择 python pickle 实践 k3s 安装加速 FFmpeg 使用总结 Systemd 教程 mysql 1366 错误解决 docker-compose 笔记 sqlite 使用总结 百度网盘命令行工具 bypy 阿里云 PAI-EAS 试用报告 gpt2中文预训练模型试用 文本生成资料汇总 使用 tracemalloc 分析 python 内存使用情况 spark 集群试用 openresty使用笔记 mac下 python 报错 CERTIFICATE_VERIFY_FAILED docker-compose 安装方法 系统代理 mac 下安装 adb scrapy项目作为工具库使用 charles over proxy 使用 markdown 制作 ppt docker挂载目录异常 flask 笔记 wsl2 使用体验 nginx 配置 mac 配置 发布自己的 python 包 selenium + chrome 全页面截图 mongo ORM 笔记 supervisor 使用总结 h5py性能测评 privoxy实现PAC代理上网 session请求示例 ssh笔记 python小技巧 docker学习笔记 tornado使用总结 再读《MongoDB权威指南》 tornado文件上传服务 mongo学习笔记 python异步服务器测试 No module named 'Crypto' on Mac mac中安装python3.5 py3.6环境下numpy C扩展出错 mtcnn读书笔记 shell 学习笔记 install ubuntu18.04 定时备份linux系统的history记录 asyncio异步请求示例 golang setting git使用笔记 Ubuntu16.04下配置python3环境 将Ubuntu16.04升级为Ubuntu18.04(development branch) Ubuntu16.04下源码安装python3.6 virtualenv中安装anaconda模块 基于sqlite3实现数据缓存 修复colaboratory中tensorflow的bug 安装docker-compose docker引起的空间不足 CNN可视化研究 ubuntu16.04中安装wine-qq 在ubuntu16.04中安装wine3.0+winetricks ssh over socks5 python删除文件或目录 shadowsocks+privoxy设置本地代理 python下载大文件的方法 解决python中遇到的乱码问题 修改 ubuntu & windows双系统中系统启动顺序与等待时间 python3安装mysql ubuntu环境变量设置 python 后台程序实现

scrapy项目作为工具库使用

2019年10月06日

scrapy项目作为工具库使用

1. 目录树

pysrc
    - main.py
    - items.py
    - ScrapyDemo
        - scrapy.cfg
        - ScrapyDemo
            - spiders
                - __init__.py
                - DemoCrawler.py
            - __init__.py
            - items.py
            - pipelines.py
            - settings.py
            - utils.py
            - demo_api.py

2. scrapy项目提供对外接口

ScrapyDemo 中 新建文件 demo_api.py, 提供对外访问接口

# coding:utf-8
import json
import os
import sys

root_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
if root_dir not in sys.path:
    sys.path.append(root_dir)

from scrapy.crawler import CrawlerProcess
from ScrapyDemo.spiders.DemoCrawler import DemoSpider, logger
from ScrapyDemo.utils import get_settings, reset_settings


def run_demo_spider(query: str, proxy: dict, output_file: str, lang='', max_page: int = None):
    """

    :param proxy:
    :param output_file:
    :param query:
    :param lang:
    :param max_page:
    :return:
    """
    # 记录旧的环境变量
    old_environ_dict = {}

    # proxy
    for key, value in proxy.items():
        old_environ_dict[key] = os.environ.get(key)
        os.environ[key] = value
    if proxy and ("https_proxy" not in proxy and "HTTPS_PROXY" not in proxy):
        logger.warning("https_proxy not found in proxy!")

    try:
        settings = get_settings()

        # 修改 settings
        settings.set("XXX", output_file, priority="project")

        process = CrawlerProcess(settings=settings)

        process.crawl(DemoSpider, query=query, lang=lang, max_page=max_page)
        process.start()
    finally:
        # 恢复环境变量
        for key, value in old_environ_dict.items():
            if value is None:
                if key in os.environ:
                    del os.environ[key]
            else:
                os.environ[key] = value

        reset_settings()


def run_scrapy(cmd_json: str):
    """ 执行命令 """

    # parse args
    cmd_kwargs = json.loads(cmd_json)

    # run spider
    run_demo_spider(**cmd_kwargs)


if __name__ == '__main__':
    run_scrapy(sys.argv[1])

3. 主项目通过subprocess调用 scrapy

在主项目main.py中, 通过subprocess调用 scrapy 项目

# coding:utf-8

import json
import logging
import os
import subprocess
import sys

import arrow
import shortuuid

from items import DemoItem

_cur_dir = os.path.dirname(__file__)


def _run_scrapy(**kwargs):
    """ 执行 scrapy 任务"""
    cur_dir = os.getcwd()
    try:
        os.chdir(os.path.join(_cur_dir, "ScrapyDemo"))
        subprocess.run([sys.executable, "ScrapyDemo/demo_api.py", json.dumps(kwargs)])
    finally:
        os.chdir(cur_dir)


def search(query: str, proxy: dict, lang='', max_page: int = None, logger: logging.Logger = None) -> [DemoItem]:
    """

    :param proxy:
    :param query:
    :param lang:
    :param max_page:
    :param logger:
    :return:
    """
    logger = logger or logging.getLogger("xxx")

    # 数据文件
    data_dir = "/tmp/data"
    if not os.path.exists(data_dir):
        os.makedirs(data_dir)

    uid = arrow.get().format("YYYYMMDD") + "_" + shortuuid.ShortUUID().random(length=8)
    output_file = os.path.join(data_dir, "{}.json".format(uid))

    # 启动任务
    if proxy and ("https_proxy" not in proxy and "HTTPS_PROXY" not in proxy):
        logger.warning("https_proxy not found in proxy!")

    _run_scrapy(**dict(
        query=query, proxy=proxy, lang=lang, max_page=max_page, output_file=output_file,
    ))

    # 解析结果
    result_list = []
    if not os.path.exists(output_file):
        logger.error("{} not found!".format(output_file))
        return result_list

    result_dict = {}
    with open(output_file, "r", encoding="utf-8") as f:
        for line in f:
            if len(line) > 2:
                try:
                    raw_item_dict = json.loads(line.strip())
                    _index = raw_item_dict.pop("index")

                    if _index in result_dict:
                        logger.warning("item index {} exists before!".format(_index))

                    demo_item = DemoItem()
                    demo_item["id"] = raw_item_dict.get("ID")
                    result_dict[_index] = demo_item
                except Exception as e:
                    logger.error(e)

    # 按照 index 排序
    logger.info("index list {}".format([_index for _index in sorted(result_dict.keys())]))
    for _index in sorted(result_dict.keys()):
        result_list.append(result_dict[_index])

    # 删除临时文件

    return result_list