这几天一直在跟数据打交道,眼都花了~
建立索引
建议加上{"background":1}
避免读写锁的产生,导致数据库长时间无响应
db.collection.createIndex({"ip":1,"port":1,"timestamp":1},{"background":1})
数据查询
聚合查询
$sample
和$limit
有啥区别?
$limit
是取collection中固定顺序(并不是随机)的若干条数据,而$sample
则是随机返回若干条数据,但是有两种情况:
$sample 处于聚合管道的第一阶段;
N 小于Collection中的文档数量的 5% ;
* Collection中的文档数量大于 100;
当满足上面三个要求的时候,$sample
将通过伪随机的游标来获取记录。当任一条件不满足时,$sample
将进行集合扫描,并通过随机排序来选择相应的 N 条记录。
当如果数据集合大于100M
的时候建议用{"allowDiskUse":true}
解除内存限制。
db.collection.aggregate([
{ "$sample": { "size": 1000 } },
{ "$match": { "asn_code": "" }},
{ "$group": { "_id": "$ip" }
}])
流加载
MongoDB will stream batched results to the client without waiting for the client to request each batch, reducing latency.
文档上的解释是结果会以流式返回,但是经过测试发现,在小数据情况下,直接用默认的NON_TAILABLE
和EXHAUST
速度区别不大;然鹅在大数据的情况下,流式加载的速度反而会比默认的慢,就有点鸡肋,所以不建议使用这种方式。
cursors = collection.find({"asn_code": "", "asn_name": ""}, {"_id": 0, "ip": 1}, cursor_type=pymongo.cursor.CursorType.EXHAUST, batch_size=500, no_cursor_timeout=True)
ObjectId
专业选手应该都学会先翻阅文档:ObjectId
ObjectId = epoch时间(4字节) + 机器标识(3字节) + 进程号PID(2字节) + 计数器(3字节)
所以如果要将ObjectId格式化成时间的话,就可以用下面这个函数
def id2time(object_id):
timeStamp = int(object_id[:8], 16)
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timeStamp))
如果需要对库中的数据进行清洗,那就可以按照insert
时间分片查询,避免服务器内存
不足或者数据量太大导致查询缓慢,经过测试,基于ObjectId
索引的查询会比我们自己创建的索引速度快上不少,而且可以分片,减轻服务器压力。
from bson.objectid import ObjectId
per = 30
to_time = datetime.datetime.utcnow()
from_time = to_time - datetime.timedelta(minutes=per)
db.collection.find({'_id': {'$gt': ObjectId.from_datetime(from_time), '$lte':ObjectId.from_datetime(to_time)}})
因为mongodb是基于cursor
的,所以就算你在大集合下,你用一个find({})
,依然不会把你的内存塞满,而是一个个遍历下去,只是对于机子性能比较捉急的,可能就会出现假死的现象,所以用不用ObjectId
来分类就见仁见智啦。
数据迁移
数据转移的常规方法就是把老库的数据全都遍历一遍,但是呢单gevent协程不够暴力,那就加个多进程,直接起飞……
但是如果你的数据是有增量的,那就可以用监听CursorType.TAILABLE_AWAIT
来完成同步/异步迁移
import time
import pymongo
client = pymongo.MongoClient(host=uri)
oplog = client.local.oplog.rs
first = oplog.find().sort('$natural', pymongo.ASCENDING).limit(-1).next()
ts = first['ts']
while True:
cursor = oplog.find({'ts': {'$gt': ts}},
cursor_type=pymongo.CursorType.TAILABLE_AWAIT,
oplog_replay=True)
while cursor.alive:
for doc in cursor:
print(doc)
if doc['op'] == 'i' and doc['o'] == 'my_collection':
# blablabla
time.sleep(1)
简单的介绍一下oplog
的语义,如果想更高级的利用的话,自己看文档嗷。
- ts: 时间戳
- op: 操作类型
- "i": insert
- "u": update
- "d": delete
- "c": command
- "ns": 操作所在的namespace,通常为
数据库名.集合名
- "o": 当前操作的内容
- "o2": 在执行更新时的
where
条件,当op
是u
时才有
数据清洗
因为一开始数据库没有设多字段索引唯一,所以在高并发下导致有若干重复数据写入,还是要写个脚本给他洗个澡。
pipeline = [
{ "$group": { "_id": {"ip":"$ip","port":"$port","time":"$time"}, "count":{"$sum": 1}, "dups": {"$addToSet": '$_id'}} },
{ "$match": { "count": {"$gt": 1}}}
]
map_id = map(lambda doc: doc['dups'][1:], collection.aggregate(pipeline=pipeline,allowDiskUse=True))
list_id = [item for sublist in map_id for item in sublist]
for _id in list_id:
print(collection.delete_one({"_id":_id}))
查询语句优化
利用explain()
可以查看查询语句的各个阶段的执行状态,从而进行查询语句的速度优化。
db.shodan_stream_official.explain("executionStats").aggregate( [
{ $match: { "asn": "", "org":"" } },
{ $group: { _id: "$ip" } }
],{"allowDiskUse":true})