1. scrapy项目作为工具库使用

    scrapy项目作为工具库使用 1. 目录树 pysrc - main.py - items.py - ScrapyDemo - scrapy.cfg - ScrapyDemo - spiders - __init__.py - DemoCrawler.py - __init__.py - items.py - pipelines.py - settings.py - utils.py - demo_api.py 2. scrapy项目提供对外接口 在 ScrapyDemo 中 新建文件 demo_api.py, 提供对外访问接口 # coding:utf-8 import json import os import sys root_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) if root_dir not in sys.path: sys.path.append(root_dir) from scrapy.crawler import CrawlerProcess from ScrapyDemo.spiders.DemoCrawler import DemoSpider, logger from ScrapyDemo.utils import get_settings, reset_settings def run_demo_spider(query: str, proxy: dict, output_file: str, lang='', max_page: int = None): """ :param proxy: :param output_file: :param query: :param lang: :param max_page: :return: """ # 记录旧的环境变量 old_environ_dict = {} # proxy for key, value in proxy.items(): old_environ_dict[key] = os.environ.get(key) os.environ[key] = value if proxy and ("https_proxy" not in proxy and "HTTPS_PROXY" not in proxy): logger.warning("https_proxy not found in proxy!") try: settings = get_settings() # 修改 settings settings.set("XXX", output_file, priority="project") process = CrawlerProcess(settings=settings) process.crawl(DemoSpider, query=query, lang=lang, max_page=max_page) process.start() finally: # 恢复环境变量 for key, value in old_environ_dict.items(): if value is None: if key in os.environ: del os.environ[key] else: os.environ[key] = value reset_settings() def run_scrapy(cmd_json: str): """ 执行命令 """ # parse args cmd_kwargs = json.loads(cmd_json) # run spider run_demo_spider(**cmd_kwargs) if __name__ == '__main__': run_scrapy(sys.argv[1]) 3. 主项目通过subprocess调用 scrapy 在主项目main.py中, 通过subprocess调用 scrapy 项目 # coding:utf-8 import json import logging import os import subprocess import sys import arrow import shortuuid from items import DemoItem _cur_dir = os.path.dirname(__file__) def _run_scrapy(**kwargs): """ 执行 scrapy 任务""" cur_dir = os.getcwd() try: os.chdir(os.path.join(_cur_dir, "ScrapyDemo")) subprocess.run([sys.executable, "ScrapyDemo/demo_api.py", json.dumps(kwargs)]) finally: os.chdir(cur_dir) def search(query: str, proxy: dict, lang='', max_page: int = None, logger: logging.Logger = None) -> [DemoItem]: """ :param proxy: :param query: :param lang: :param max_page: :param logger: :return: """ logger = logger or logging.getLogger("xxx") # 数据文件 data_dir = "/tmp/data" if not os.path.exists(data_dir): os.makedirs(data_dir) uid = arrow.get().format("YYYYMMDD") + "_" + shortuuid.ShortUUID().random(length=8) output_file = os.path.join(data_dir, "{}.json".format(uid)) # 启动任务 if proxy and ("https_proxy" not in proxy and "HTTPS_PROXY" not in proxy): logger.warning("https_proxy not found in proxy!") _run_scrapy(**dict( query=query, proxy=proxy, lang=lang, max_page=max_page, output_file=output_file, )) # 解析结果 result_list = [] if not os.path.exists(output_file): logger.error("{} not found!".format(output_file)) return result_list result_dict = {} with open(output_file, "r", encoding="utf-8") as f: for line in f: if len(line) > 2: try: raw_item_dict = json.loads(line.strip()) _index = raw_item_dict.pop("index") if _index in result_dict: logger.warning("item index {} exists before!".format(_index)) demo_item = DemoItem() demo_item["id"] = raw_item_dict.get("ID") result_dict[_index] = demo_item except Exception as e: logger.error(e) # 按照 index 排序 logger.info("index list {}".format([_index for _index in sorted(result_dict.keys())])) for _index in sorted(result_dict.keys()): result_list.append(result_dict[_index]) # 删除临时文件 return result_list

    2019/10/06 技术

  2. charles over proxy

    charles over proxy 使用 charles 对安卓应用进行抓包时,会遇到部分应用必须使用代理才能上网的问题。 解决思路是, charles外接代理。原理如下所示: Android App --> charles --> other proxy --> internet 具体操作如下: 安卓中设置 charles 提供的代理,以便抓包 charles 中设置外部代理,设置方法为依次展开Proxy --> External Proxy Settings..., 填写外部代理即可。

    2019/10/05 技术

  3. 使用 markdown 制作 ppt

    使用 markdown 制作 ppt reveal.js提供了一种利用 markdown 生成 ppt 的方法。 可以使用vscode及vscode-reveal插件,搭建书写环境。 1. 环境配置 安装过程: 安装 vscode 打开vscode, 安装插件 vscode-reveal 2. 示例 步骤一、在 vscode 中新建 sample.md, 并写入如下内容: --- theme : "night" transition: "slide" highlightTheme: "monokai" logoImg: "logo.png" slideNumber: false title: "XXX调研报告" --- <style type="text/css"> .reveal p { text-align: left; } .reveal ul { display: block; } .reveal ol { display: block; } </style> # XXX调研报告 --- ## 1. 概述 --- ## 2. 行业现状 示例要点: ppt 基本配置 ppt 样式写在 style 中 步骤二、通过快键键Command + Shift + P 选择 Revealjs: Open presentation in browser, 即可在浏览器中预览ppt。也可以通过选择 Revealjs: Export in PDF, 导出 pdf 文件。

    2019/10/04 技术

  4. docker挂载目录异常

    docker挂载目录异常 场景如下: docker 以数据卷的方式挂载 目录/文件, 如 -v /opt/code:/code。 当宿主的目录/opt/code以如下方式执行数据更新后,容器中的目录 /code 数据全部消失: rm -rf /opt/code/ mkdir -p /opt/code echo -n > /opt/code/new.data 原因是,容器挂载,只认文件inode。当宿主机的目录被删除再重建后,目录inode变化。容器挂载的inode所对应的宿主文件已经消失,故而数据为空。

    2019/10/03 技术

  5. flask 笔记

    flask学习笔记 1. 响应 1.1 展示文本 from flask import make_response @app.route('/') def hello(): response = make_response("文本内容!") response.headers["Content-Type"] = "text/plain;charset=UTF-8" return response 1.2 下载 txt from flask import make_response @app.route('/') def hello(): response = make_response("文本内容!") # response.headers['Content-Type'] = "text/plain" response.headers['Content-Disposition'] = "attachment; filename=download.txt" return response 1.3 图片处理 from io import BytesIO import numpy as np import requests from PIL import Image from flask import Flask, request, make_response @app.route("/", methods=["POST", 'GET']) def search_image(): url = request.args.get("url") or request.form.get("url") if not url: response = make_response("OK", 200) else: try: # load image raw_image_response = requests.get(url=url, timeout=10) # do something image = Image.open(BytesIO(raw_image_response.content)) new_image_obj = image # response new_image_bytes = BytesIO() new_image_obj.save(new_image_bytes, 'JPEG') response = make_response(new_image_bytes.getvalue()) response.headers['Content-Type'] = "image/jpeg" except Exception as e: print(e) response = make_response("error {}".format(e), 200) return response

    2019/07/02 技术

  6. wsl2 使用体验

    wsl2 使用体验 优点1. docker 正常工作 wsl2其实是一个完整的虚拟机, docker 在里面工作正常。 坑1. wsl ip 不固定 wsl 重启后, ip 变动。 这个坑也有人遇到:[WSL 2] NIC Bridge mode 🖧 (Has Workaround🔨). 研究微软的说明,才发现这个是feature: User Experience Changes Between WSL 1 and WSL 2. 解决思路,win10通过域名wsl.wsl访问wsl; wsl内部负责将ip写入win10的hosts文件中。使用python实现这个逻辑: import subprocess import re def get_address_info(): s1 = subprocess.Popen(["ifconfig"], stdout=subprocess.PIPE) out_string = s1.stdout.read().decode("utf-8") # address name address_name_list = [] for line in out_string.split("\n"): if line and line.find("flags=") > -1: address_name_list.append(line.split(":")[0]) # ip re_address = re.compile(r'(?<=inet )[\d\.]{3,20}?(?= netmask)') all_address = re_address.findall(out_string) # assert len(address_name_list) == len(all_address) return {address_name_list[i]: ip for i, ip in enumerate(all_address)} def get_wsl_ip() -> str: add_info = get_address_info() return add_info["eth0"] def update_wsl_ip(new_ip: str): """ update ip """ host_file = "/mnt/c/Windows/System32/drivers/etc/hosts" with open(host_file, "r") as f: lines = f.readlines() change = False found = False for i, line in enumerate(lines): if len(line) > 5 and line.find("wsl.wsl") > -1: found = True if line.find(new_ip) > -1: print("not change: ip is same!") else: lines[i] = "{}\twsl.wsl\n".format(new_ip) print("change: ip is different!") change = True break if not found: lines.append("{}\twsl.wsl\n".format(new_ip)) print("change: ip not exists!") change = True if lines and change: with open(host_file, "w") as f: f.write("".join(lines)) if __name__ == '__main__': update_wsl_ip(new_ip=get_wsl_ip()) wsl2 中 设置定时任务: */15 * * * * cat ~/.ssh/sss.dat | sudo -S python3 ~/refresh_hosts.py

    2019/06/23 技术

  7. nginx 配置

    nginx 配置 1. http2开启 环境 ubuntu18.04 + nginx1.14(apt 自带) /etc/nginx/nginx.conf 配置,关键是SSL Settings下配置ssl证书信息 user www-data; worker_processes auto; pid /run/nginx.pid; include /etc/nginx/modules-enabled/*.conf; events { worker_connections 768; # multi_accept on; } http { ## # Basic Settings ## sendfile on; tcp_nopush on; tcp_nodelay on; keepalive_timeout 65; types_hash_max_size 2048; # server_tokens off; # server_names_hash_bucket_size 64; # server_name_in_redirect off; include /etc/nginx/mime.types; default_type application/octet-stream; ## # SSL Settings ## ssl_certificate /etc/nginx/cert/b.com.pem; ssl_certificate_key /etc/nginx/cert/b.com.key; ssl_session_timeout 5m; ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:ECDHE:ECDH:AES:HIGH:!NULL:!aNULL:!MD5:!ADH:!RC4; ssl_protocols TLSv1 TLSv1.1 TLSv1.2; ssl_prefer_server_ciphers on; ## # Logging Settings ## access_log /var/log/nginx/access.log; error_log /var/log/nginx/error.log; ## # Gzip Settings ## gzip on; # gzip_vary on; # gzip_proxied any; # gzip_comp_level 6; # gzip_buffers 16 8k; # gzip_http_version 1.1; # gzip_types text/plain text/css application/json application/javascript text/xml application/xml application/xml+rss text/javascript; ## # Virtual Host Configs ## include /etc/nginx/conf.d/*.conf; include /etc/nginx/sites-enabled/*; } 服务配置,新建/etc/nginx/site-avalable/x.conf文件,写入如下信息: server { listen 443 ssl http2; server_name www.b.com; ssl on; root /var/www/b.com; index index.html index.htm; ssl_certificate /etc/nginx/cert/b.com.pem; ssl_certificate_key /etc/nginx/cert/b.com.key; ssl_session_timeout 5m; ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:ECDHE:ECDH:AES:HIGH:!NULL:!aNULL:!MD5:!ADH:!RC4; ssl_protocols TLSv1 TLSv1.1 TLSv1.2; ssl_prefer_server_ciphers on; location / { index index.html index.htm; } } server { listen 80; server_name www.b.com; rewrite ^(.*)$ https://$host$1 permanent; } 2. 重定向 强制使用https server { listen 80; server_name www.b.com; rewrite ^(.*)$ https://$host$1 permanent; } path 转 子域名 rewrite ^/blog/(.*)$ https://blog.b.com/$1 permanent; 修改网址并使用新网址进行其他操作 # 反向代理的例子 location /blog/ { rewrite ^/blog/(.*)$ /$1 break; # 去除blog proxy_pass http://127.0.0.1:6000; } 3. 反向代理 server { listen 443 ssl http2; server_name www.b.com; ssl on; ssl_certificate /etc/nginx/cert/b.com.pem; ssl_certificate_key /etc/nginx/cert/b.com.key; ssl_session_timeout 5m; ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:ECDHE:ECDH:AES:HIGH:!NULL:!aNULL:!MD5:!ADH:!RC4; ssl_protocols TLSv1 TLSv1.1 TLSv1.2; ssl_prefer_server_ciphers on; client_max_body_size 20M; location /static/ { alias /var/www/b.com/static/; } location / { proxy_pass http://127.0.0.1:6000; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_send_timeout 600; proxy_connect_timeout 600; proxy_read_timeout 600; } } 4. 将/admin/*路径所有请求分流到另一台服务器上 将 django 服务挂载到 www.b.com/admin/ 下,www.b.com同时由多个服务器提供独立的服务。 为使 nginx 能正确区分来自django的请求(静态、动态),django服务强制客户端在请求的cookies上标识{"svr": "django"}。 具体配置如下: server { listen 443 ssl http2; server_name www.b.com; # other setting ... location / { set $dj '1'; if ($cookie_svr ~* ^.django.*$ ){ set $dj 1$dj ; } if ($request_uri ~* ^/admin/.*$ ){ set $dj '1' ; } if ($dj = '11' ){ rewrite ^/(.*)$ /admin/$1 permanent; } index index.html index.htm; } # admin location /admin/ { rewrite ^/admin/(.*)$ /$1 break; proxy_pass http://127.0.0.1:8000; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_send_timeout 600; proxy_connect_timeout 600; proxy_read_timeout 600; } } server { listen 80; server_name www.b.com; rewrite ^(.*)$ https://$host$1 permanent; } 注意: nginx下if不能嵌套,没有else 通过$cookie_svr可以获取到svr的值 要正确使用rewrite的停止标志(last, break, permanent) 5. 负载均衡 通过 nginx 的 stream 实现负载均衡 user root; worker_processes auto; events { worker_connections 1024; } stream { log_format lbs '$remote_addr -> $upstream_addr [$time_local] ' '$protocol $status $bytes_sent $bytes_received ' '$session_time "$upstream_connect_time"'; access_log /var/log/nginx/access.log lbs ; open_log_file_cache off; upstream backend { hash $remote_addr consistent; server backend-1:18888; server backend-2:18888; server backend-3:18888; server backend-4:18888; } server { listen 18888; listen 18888 udp; proxy_pass backend; } 6. try_files server { ... location ^~ /static/html/ { alias /opt/code/pages/html/; try_files $uri /static/html/index.html; } } 7. location + if server { location ^~ /static/html/ { if ($url ~* \.(png|jpg)$ ){ rewrite ^/(.*)$ https://my-bucket.oss-cn-shenzhen.aliyuncs.com/$1 permanent; } alias /opt/code/pages/html/; try_files $uri /static/html/index.html; } } 8. 安全验证 生成安全文件 # install htpasswd apt install apache2-utils # create db file htpasswd -c -d passwd.db user chmod 400 passwd.db nginx 配置 server { auth_basic "secret"; auth_basic_user_file /etc/nginx/conf.d/passwd.db; ... } 效果图 9. 仅后端使用安全验证 nginx 配置 server { location ~ ^/api { rewrite ^/api(.*)$ $1 break; proxy_pass http://127.0.0.1:6666; proxy_set_header Authorization "Basic YWRtaW46YWRtaW4xMjM="; proxy_pass_header Authorization; proxy_connect_timeout 300; proxy_read_timeout 300; proxy_send_timeout 300; } ... }

    2019/06/13 技术

  8. mac 配置

    mac 配置

    2019/06/03 技术

  9. 发布自己的 python 包

    发布自己的 python 包 1. 新建 python 包 具体可参考: pyxtools 假设已经成功新建一个名为 my-py-package 的 python 包。 2. 发布 pypi 中注册账号,假设用户名 为 py-user 安装 twine: python -m pip install twine 打包: python setup.py sdist bdist_wheel 上传: twine upload dist/* 到 pypi 中确认包是否存在 3. travis + github 自动发布 在项目下新建 .travis.yml 文件: language: python python: - '3.6' - '2.7' - '3.4' - '3.5' install: - pip install . script: - python -c "import os;" deploy: provider: pypi user: py-user skip_cleanup: true skip_existing: true twine_version: 1.13.0 distributions: "sdist bdist_wheel" on: tags: true python: 3.6 branch: master 注意: distributions: "sdist bdist_wheel" 的目的是同时生成 whl 文件 tags: true表示新建标签时触发代码发布 加密 pypi 密码: pip install travis-encrypt travis-encrypt --deploy py-user my-py-package .travis.yml master 分支 新建标签后,会自动触发包上传。 如果包上传失败,可以到 travis 网站中查看错误日志。 参考 上传并发布你自己发明的轮子 - Python PyPI 实践 使用github+travis将Python包部署到Pypi

    2019/06/03 技术

  10. selenium + chrome 全页面截图

    selenium + chrome 全页面截图 完整代码: __author__ = 'rk.feng' import base64 import json from selenium import webdriver def chrome_take_full_screenshot(driver: webdriver.Chrome): """ copy from https://stackoverflow.com/questions/45199076/take-full-page-screenshot-in-chrome-with-selenium author: Florent B. :param driver: :return: """ def send(cmd, params): resource = "/session/%s/chromium/send_command_and_get_result" % driver.session_id url = driver.command_executor._url + resource body = json.dumps({'cmd': cmd, 'params': params}) response = driver.command_executor._request('POST', url, body) return response.get('value') def evaluate(script): response = send('Runtime.evaluate', {'returnByValue': True, 'expression': script}) return response['result']['value'] metrics = evaluate( "({" + \ "width: Math.max(window.innerWidth, document.body.scrollWidth, document.documentElement.scrollWidth)|0," + \ "height: Math.max(innerHeight, document.body.scrollHeight, document.documentElement.scrollHeight)|0," + \ "deviceScaleFactor: window.devicePixelRatio || 1," + \ "mobile: typeof window.orientation !== 'undefined'" + \ "})") send('Emulation.setDeviceMetricsOverride', metrics) screenshot = send('Page.captureScreenshot', {'format': 'png', 'fromSurface': True}) send('Emulation.clearDeviceMetricsOverride', {}) return base64.b64decode(screenshot['data']) def get_driver(headless: bool = False) -> webdriver.Chrome: capabilities = { 'browserName': 'chrome', 'chromeOptions': { 'useAutomationExtension': False, 'args': ['--disable-infobars'] } } chrome_options = webdriver.ChromeOptions() if headless: chrome_options.add_argument('--headless') chrome_options.add_argument('--disable-gpu') chrome_options.add_argument('--no-sandbox') driver = webdriver.Chrome( executable_path="/Users/pzzh/Work/bin/chromedriver", chrome_options=chrome_options, desired_capabilities=capabilities ) return driver def full_page_screenshot(driver: webdriver.Chrome, url: str, png_file: str = "screenshot.png"): driver.get(url) png = chrome_take_full_screenshot(driver) with open(png_file, 'wb') as f: f.write(png) if __name__ == '__main__': _driver = get_driver(headless=False) try: # 商务部 target_url = "http://www.mofcom.gov.cn/article/b/c/?" full_page_screenshot(driver=_driver, url=target_url, png_file="mofcom_full.png") # 非整页 _driver.get(url=target_url) _driver.save_screenshot("mofcom.png") finally: if _driver: _driver.close() _driver.quit() 结果: 普通截图 全页面截图

    2019/06/01 技术