mongo ORM 笔记 工作中, 使用ORM操作mongo数据库. 总体感觉是, 与django ORM操作类似, 也能方便地使用pymongo的接口. 1. 使用方法 环境: python 3.x mongoengine model定义 import datetime from mongoengine import * class CommentModel(DynamicDocument): meta = { 'indexes': [ { 'fields': ['name'], "cls": True, "unique": True, } ] } name = StringField(required=True, max_length=32) age = IntField() create_at = DateTimeField(default=datetime.datetime.now) 基本操作: save(insert/update): CommentModel.objects().save(instance) search: CommentModel.objects(name="ABC").first() or CommentModel.objects(__raw__={"name":"ABC"}).first() 2. group用法 2.1 简单统计 统计消费者的消费订单号: res_list = OrderModel.objects().aggregate( {'$match': { OrderModel.buyer_id.name: {"$in": list(set(buyer_id_list))} }}, {"$group": { "_id": "${}".format(OrderModel.buyer_id.name), "order_id": {"$addToSet": "${}".format(OrderModel.order_id.name)} }} ) buyer_vs_order_dict = {res["_id"]: res["order_id"] for res in res_list}
supervisor 使用总结 1. 安全添加/更新 任务 新建或者更新任务配置文件后,supervisor可以在不影响其他任务的前提下 加载或重新 加载任务。 supervisorctl reread && supervisorctl update 2. 任务配置示例 多个子进程按顺序使用不同的端口 [program:demo] command=docker run --name=demo_%(process_num)05d -p %(process_num)05d:80 diy/server:latest directory=/tmp process_name=%(program_name)s_%(process_num)05d numprocs=5 numprocs_start=8001 startsecs = 5 startretries = 3 redirect_stderr = true stdout_logfile = /var/log/supervisor/xx.log autostart=true autorestart=unexpected stopsignal=TERM 上述配置,会依次启动 demo_8001, demo_8002, …, demo_8005 共 5 个 容器, 分别监听 8001, 8002, …, 8005端口。 3. 监控supervisor自身 使用一下代码,定时监控supervisor:没运行则启动,运行则维持原状。 # crontab -e */5 * * * * supervisord -c /etc/supervisord.conf
h5py性能测评 代码 import pickle import sys import time import unittest import h5py import numpy as np import os class TestH5(unittest.TestCase): def setUp(self): self.pickle_file = "./data.pkl" self.h5_file = "./data.h5" def tearDown(self): os.remove(self.pickle_file) os.remove(self.h5_file) @staticmethod def get_file_size(file_path): file_size = os.path.getsize(file_path) / float(1024 * 1024) return "{}MB".format(round(file_size, 2)) @staticmethod def get_size(obj): return sys.getsizeof(obj) def create_file(self): """ 创建文件 """ data = np.random.random(size=(100000, 1024)) print("size of data is {}".format(self.get_size(data))) target_index = [1, 5, 10, 50, 100, 500, 1000, 5000, 9000, 9001, 9003] target_result = data[target_index] print("size of target_result is {}".format(self.get_size(target_result))) # pickle with open(self.pickle_file, "wb") as fw: pickle.dump(data, fw) print("pickle file size is {}".format(self.get_file_size(self.pickle_file))) # h5py with h5py.File(self.h5_file, 'w') as hf: hf.create_dataset('data', data=data) print("h5 file size is {}".format(self.get_file_size(self.h5_file))) return target_index, target_result def pickle_load(self, target_index, target_result): time_start = time.time() with open(self.pickle_file, "rb") as fr: all_data = pickle.load(fr) self.assertTrue((target_result == all_data[target_index]).all()) return time.time() - time_start def h5py_load(self, target_index, target_result): time_start = time.time() with h5py.File(self.h5_file, 'r') as hf: all_data = hf["data"] self.assertTrue((target_result == all_data[target_index]).all()) return time.time() - time_start def testFileLoad(self): """ 文件加载 """ target_index, target_result = self.create_file() # pickle: load 100 time time_list = [] for i in range(10): time_list.append(self.pickle_load(target_index=target_index, target_result=target_result)) print("pickle load 10 times: {}s per step, max time is {}s, min time is {}s!".format( sum(time_list) / len(time_list), max(time_list), min(time_list))) # h5py: load 10 time time_list = [] for i in range(10): time_list.append(self.h5py_load(target_index=target_index, target_result=target_result)) print("h5 load 10 times: {}s per step, max time is {}s, min time is {}s!".format( sum(time_list) / len(time_list), max(time_list), min(time_list))) 测试结果 文件加载测试结果如下: Launching unittests with arguments python -m unittest hdf5_benchmark.TestH5 in /mnt/e/frkhit/wsl/tmp/pycharm_benchmark size of data is 819200112 size of target_result is 90224 pickle file size is 781.25MB h5 file size is 781.25MB pickle load 10 times: 2.1771466970443725s per step, max time is 2.5986461639404297s, min time is 2.0592007637023926s! h5 load 10 times: 0.002041530609130859s per step, max time is 0.004301786422729492s, min time is 0.0013699531555175781s! 结论: h5py不一定能节省空间, 在本测试中, h5py的文件大小与pickle一样 h5py在加载数据时, 更省时间(只从硬盘中加载需要的数据)
privoxy实现PAC代理上网 本文主要参考: Linux 使用 ShadowSocks + Privoxy 实现 PAC 代理 1. Privoxy实现http代理上网 安装privoxy: sudo apt install privoxy 配置: vim /etc/privoxy/config # 修改监听地址 listen-address 127.0.0.1:8118 # 代理转发: 若不打算实现PAC模式, 确保去除下一行的注释 # forward-socks5 / 127.0.0.1:1080 . 重启服务: sudo service privoxy start 2. PAC 生成pac.action: cd /tmp && curl -4sSkLO https://raw.github.com/zfl9/gfwlist2privoxy/master/gfwlist2privoxy && bash gfwlist2privoxy 127.0.0.1:1080 mv -f pac.action /etc/privoxy/ && echo 'actionsfile pac.action' >>/etc/privoxy/config && sudo service privoxy start 3. 测试 # 使用代理 curl www.google.com # 本地地址 curl "http://pv.sohu.com/cityjson?ie=utf-8"
session请求示例 1. requests session requests自带session管理, 示例: import json import requests with requests.Session() as session: session.get('https://httpbin.org/cookies/set/sessioncookie/123456789') r = session.get('https://httpbin.org/cookies') assert r.status_code == 200 assert json.loads(r.text)["cookies"]["sessioncookie"] == "123456789" 2. scrapy session scrapy使用cookiejar管理session. 参考. def start_first_page(self, ): yield scrapy.Request("https://httpbin.org/cookies/set/sessioncookie/123456789", meta={'cookiejar': 0}, callback=self.parse_second_page) def parse_second_page(self, response): return scrapy.Request("https://httpbin.org/cookies", meta={'cookiejar': response.meta['cookiejar']}, callback=self.parse_other_page) 3. tornado client + session tornado本身不带session模块, 客户端可使用cookies维护session. 获取新cookies: cookies = response.headers.get_list('Set-Cookie') 使用新cookies: import tornado.httpclient http_client = tornado.httpclient.HTTPClient() # cookies = {"Cookie" : 'my_cookie=abc'} http_client.fetch("http://abc.com/test", headers=cookies)
ssh笔记 1. 免密码登录 主机 host1 希望免密码登录到服务器 server1中. 步骤: # in host1 # 生成私钥 ssh-keygen -t rsa # 将公钥复制到服务器中 scp ~/.ssh/id_rsa.pub ubuntu@server1:~/.ssh/tmp_id_rsa.pub # in server1 # 将公钥追加到授权 key 中 cat ~/.ssh/tmp_id_rsa.pub >> ~/.ssh/authorized_keys # in host1 # 免密码连接到 server1中 ssh ubuntu@server1 2. 使用代理 参考: ssh over socks5 3. 内网穿透 参考: 使用SSH反向隧道进行内网穿透 4. 维持心跳 客户端维持心跳的方法是, 在/etc/ssh/ssh_config中设置TCPKeepAlive yes, ServerAliveInterval 300, 然后重启. 也可以在ssh命令中添加参数: ssh -o TCPKeepAlive=yes -o ServerAliveInterval=300 ubuntu@server 5. 断点续传 参考: scp 断点续传 rsync -P --rsh=ssh your.file remote_server:/tmp/ 6. 硬件相关 根据How to change LCD brightness from command line (or via script)? , 可通过以下命令设置屏幕亮度: echo 400 | sudo tee /sys/class/backlight/intel_backlight/brightness 省电模式: echo 0 | sudo tee /sys/class/backlight/intel_backlight/brightness 亮度最大值为 cat /sys/class/backlight/intel_backlight/max_brightness 7. 文件传输 使用功能: rsync传输后删除源文件 遍历列表 tar解压缩到指定目录 # collect file servers=( "1.abc.com" "2.abc.com" "3.abc.com" ) for i in "${servers[@]}" do echo $i rsync -avz --remove-source-files root@$i:/opt/data/*.tar.gz /opt/data/ done # extract file for filename in /opt/data/*.tar.gz; do echo "$filename" tar -xzvf "$filename" -C /opt/data/extract/ && rm "$filename" done
python小技巧
docker学习笔记 1. 打包 flask server实例 参考 Dockerfile FROM ubuntu:latest MAINTAINER Rajdeep Dua "dua_rajdeep@yahoo.com" RUN apt-get update -y RUN apt-get install -y python-pip python-dev build-essential COPY . /app WORKDIR /app RUN pip install -r requirements.txt ENTRYPOINT ["python"] CMD ["app.py"] build: docker build -t flask-sample-one:latest . run: docker run -d -p 5000:5000 flask-sample-one 2. 将 container 保存为 image docker commit <CONTAIN-ID> <IMAGE-NAME> 3. 导出镜像 参考 使用 docker export <container_name> 导出镜像: # 导出 docker export furious_bell > /home/myubuntu-export-1204.tar # 导入 docker import - /home/myubuntu-export-1204.tar 使用 docker save <image_name> 导出镜像: # 导出 docker save 9610cfc68e8d > /home/myubuntu-save-1204.tar # 导入 docker load < /home/myubuntu-save-1204.tar # 重命名 docker images docker tag <image_id> image_name:latest 4. 镜像重命名 docker tag <image_id> image_name:latest 5. 使用外部文件 # create Dockerfile echo 'FROM python:3.6 WORKDIR /app pip install tornado -i http://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com ENTRYPOINT ["python"] CMD ["app.py"] ' >> Dockerfile # build container docker build -t diy/server:latest . # start container docker run -d -v /home/ubuntu/app:/app -p 5000:5000 diy/server 6. docker常用命令示例 获取container日志: docker logs <container_id> 实时获取container日志: docker logs -f <container_id> 停止container: docker stop <container_id> 删除镜像标签: docker rmi -f <image_name>:<tag> 指定工作目录: docker run -d -v /home/frkhit/rkfeng/server:/app -w /app frkhit/docker-python:3.6-chrome python main.py 使用宿主时区:docker run -d -v /etc/localtime:/etc/localtime:ro ... 运行时指定时区:docker run -d --env TZ=Asia/Shanghai ... 执行多条命令: docker run -d -w /app frkhit/docker-python:3.6-chrome sh -c "python jd_main.py; python main.py" 查看容器变化: docker diff <container> 改变entrypoint执行新命令: docker run -itd --name=demo --entrypoint="tail" <image> -f /dev/null 7. Dockerfile常用命令示例 复制多个文件: COPY file_1 file_2 file_3 ./ 添加作者信息: MAINTAINER frkhit "frkhit@gmail.com" 8. 端口绑定 # 端口绑定 docker run -d -p 8080:80 ... # ip + 端口 绑定 docker run -d -p 127.0.0.1:9999:80 ... # ip + 所有端口 绑定 docker run -d -p 127.0.0.1::80 ... # 查看容器端口绑定 docker port <container_id> 9. docker充当命令行工具 docker compose命令行工具 详见 https://github.com/docker/compose/releases 中的 run.sh工具。 运行 4.0版本的 mongoimport命令 mkdir -p dodo && chmod 777 dodo/ -R && cd dodo/ docker pull mongo:4.0 docker run --rm -v $(pwd):/workdir/ -w /workdir/ mongo:4.0 mongoexport --uri "<url>" --collection my_collection --out ./my_collection.bak 10. 宿主与容器传数据 # cp docker cp container:/app/data ./ docker cp ./data container:/app/ # 管道 cat input.txt | docker exec -i tor-1 /bin/bash -c 'cat > /app/data.txt' 11. 中文乱码 Dockerfile增加 ENV LANG C.UTF-8 如果需要增加中文字体支持, 可以参考: 给Docker镜像(Debian)添加中文支持和中文字体 12. 支持 crontab 容器中安装 crontab, Dockerfile 配置: # add crontab RUN apt-get -y install cron && \ echo '*/10 * * * * date >> /date.log ' >> /etc/cron.d/hello-cron && \ crontab /etc/cron.d/hello-cron && \ rm -rf /var/lib/apt/lists/* 启动命令增加 cron && ...
tornado使用总结 1. 请求参数解析 获取请求消息体: self.request.body 获取请求参数(query, form-data, …): self.get_argument("name", None) # 必须指定默认值, 不然找不到key时会触发异常 获取请求参数列表(query, form-data, …): self.get_arguments("name[]") # 找不到key, 返回空列表; 不能设置默认值 获取请求参数object: self.get_argument("people[name]"), self.get_arguments("people[friend][]") 2. 异步/并发 使用线程池+yield实现异步: from concurrent.futures import ThreadPoolExecutor import tornado from tornado.concurrent import run_on_executor from tornado.web import RequestHandler import time class SimpleAsyncServer(RequestHandler): def __init__(self, application, request, **kwargs): super(SimpleAsyncServer, self).__init__(application, request, **kwargs) self.executor = ThreadPoolExecutor(10) @tornado.gen.coroutine def get(self, ): print("on get...") result = yield self._do_something() self.write(result) @run_on_executor def _do_something(self, ): """ 模拟耗时操作 :return: """ time.sleep(5) return {"msg": "OK"} 3. 先响应再执行后续操作 处理一个请求时, 可以显式调用self.finish(), 向客户端返回响应, 接着在服务端继续处理剩下的记录日志等业务. 如: def post(self, ): # 写入响应内容 self.write({"code": 200, "msg": "OK"}) # 返回响应: 客户端能拿到结果 self.finish() # 处理剩下的业务 self.logger.info("logging request...") time.sleep(60) self.logger.info("done!") 4. 返回响应 返回json 返回json结果, 最简单的方法是直接把dict传给self.write(dict_obj), self.write会在内部将响应内容转为str, 并设置header. 实际使用的过程中, 遇到过客户端解析结果, 因为单/双引号问题, 导致json解析错误. 为降低风险, 直接这样调用: self.set_header("Content-Type", "application/json; charset=UTF-8") self.write(json.dumps(result)) 返回二进制数据 有时候为了方便测试, 会在响应中直接返回二进制内容. 如服务端直接返回pickle序列化后的数据: self.write(pickle.dumps(data)) 客户端可以这样解析: import pickle import requests response = requests.post(xxx) data = pickle.loads(response.content) # object 5. 压缩数据 app = tornado.web.Application([ (r'/file', FileUploadHandler), ], compress_response=True, ) app.listen(8080) tornado.ioloop.IOLoop.instance().start()
再读《MongoDB权威指南》 刚接触mongo时,所看过最好的入门资料,就是《MongoDB权威指南》. 阅读后, 对mongo有一个直观的了解, 配合mongo官方文档, 使用mongo就没什么困难了. 最近新项目, 需要深度使用mongo, 如使用复杂的聚合操作生成报表数据. 项目中还遇到聚合操作形成数据库全局锁, 导致其他操作被阻塞的问题. 空闲之下, 决定再读《MongoDB权威指南》, 加深理解. 1. Mongo简介 聚合操作, 数据库能自动优化 支持存在时间有限的集合, 适合session管理等场景 支持固定大小的集合,适合保存日志等场景 支持一种协议,用于存储大文件和文件元数据(?) 2. MongoDB基础知识 objectId能提取时间戳 3. 创建 更新和删除文档 删除集合最快的方法: drop + 重建索引 更新操作不可分割: 原子性 array vs set. $push 对array添加元素; $addToSet将array当做set并添加元素; 前两者配合$each实现添加多个元素 删除元素: $pop vs $pull, 具体参考官方文档. $基于位置的数组修改器定位符. db.blog.update({"comments.author" : "John"},{"$set" : {"comments.$.author" : "Jim"}}), 定位符只更新第一个匹配的元素 修改器速度: mongo为文档大小预留空间; 当文档修改后大小不够,会执行文档移动操作. 批量更新文档,获取更新信息: ` db.runCommand({getLastError : 1}) # “返回最后一次操作的相关信息`. 写入安全(Write Concern), 有两种模式, 应答式(确认成功)和非应答式(不返回响应). 非应答式用于不重要数据存储场合. 4. 查询 返回制定键, 默认返回_id. ` db.users.find({}, {“username” : 1, “email” : 1}) # 返回三个键, db.users.find({}, {“fatal_weakness” : 0}) # 不返回指定键` 高级查询, 参数及示例. $where, 借用js. 但速度慢, 如无必要, 不要使用. 服务端脚本, 容易受到攻击. 避免使用skip略过大量结果, skip大量结果, 很占资源. 可行的方法是, 利用上一次查询的条件, 略过已查询的结果. 高级查询选项: $min, $max能强制使用索引, 具体参考书本. $maxscan, 扫描文档上限. 游标, 获取一致性结果. cursor = db.foo.find();while (cursor.hasNext()) {...}, 相同的结果可能返回多次, 原因是文档修改后空间不足可能被移动到后面. 解决方法是使用快照 db.foo.find().snapshot(), 但占资源, 尽量不用. 服务端游标的生命周期. 一般客户端会通知销毁游标; 服务端有超时记录, 超时后会自动销毁, 如果客户端需要长时间使用游标, 需要告知服务端不要超时销毁. 5. 索引 explain查看具体执行操作, 调试索引的好工具. 如 db.users.find({username: "user101"}).explain() db.currentOp() 如果新建索引不能短时间内返回结果, 可以另开shell执行db.currentOp()查看 索引的代价: 每次操作, 需要更新索引, 耗时更长. mongo限制索引上限为64; 每个集合最好不超过两个索引. hit(): 强制 MongoDB 使用特定的索引 稀疏索引: sparse. ` db.ensureIndex({“email” : 1}, {“unique” : true, “sparse” : true})`, email存在, 则唯一. 6. 特殊的索引和集合 固定集合: 固定大小, 循环队列. TTL索引, 这种索引允许为每一个文档设置一个超时时间. 一个文档到达预设置的老化程度之后就会被删除. 这种类型的索引对于缓存问题(比如会话的保存)非常有用. db.foo.ensureIndex({"lastUpdated" : 1}, {"expireAfterSecs" : 60*60*24}) # 24h 全文本索引, 用于全文检索, 耗资源. 地理空间检索, 最常用的是 2dsphere 索引(用于地球表面类型的地图)和 2d索引(用于平面地图和时间连续的数据). 支持交集, 包含和接近查询. GridFS存储文件. 7. 聚合 TODO: 待补充 17. 了解应用的动态 ` db.currentOp()`: 查看正在进行的操作 db.killOp(): 终止操作. 注意, 只有交出了锁的进程才能被终止 system profiler: 可获得慢查询日志, 该操作默认不打开 stats: 获取集合, 数据库状态信息 mongotop 类似top, 获取当前操作信息. mongotop-locks, 获得每个数据库的锁状态 18. 数据管理 权限控制: 颗粒度 建立索引: ` db.foo.ensureIndex({“somefield” : 1}, {“background” : true})`. 前台建索引, 锁定数据库; 后台建索引, 定期释放写锁 压缩数据: 数据碎片; 使用 db.runCommand({"compact" : "collName"})压缩数据