这几天一直在跟数据打交道,眼都花了~

建立索引

建议加上{"background":1}避免读写锁的产生,导致数据库长时间无响应

db.collection.createIndex({"ip":1,"port":1,"timestamp":1},{"background":1})

数据查询

聚合查询

$sample$limit有啥区别?

$limit是取collection中固定顺序(并不是随机)的若干条数据,而$sample则是随机返回若干条数据,但是有两种情况:
$sample 处于聚合管道的第一阶段;
N 小于Collection中的文档数量的 5% ;
* Collection中的文档数量大于 100;

当满足上面三个要求的时候,$sample将通过伪随机的游标来获取记录。当任一条件不满足时,$sample将进行集合扫描,并通过随机排序来选择相应的 N 条记录。

当如果数据集合大于100M的时候建议用{"allowDiskUse":true}解除内存限制。

db.collection.aggregate([
{ "$sample": { "size": 1000 } },
{ "$match": { "asn_code": "" }},
{ "$group": { "_id": "$ip" } 
}])

流加载

MongoDB will stream batched results to the client without waiting for the client to request each batch, reducing latency.
文档上的解释是结果会以流式返回,但是经过测试发现,在小数据情况下,直接用默认的NON_TAILABLEEXHAUST速度区别不大;然鹅在大数据的情况下,流式加载的速度反而会比默认的慢,就有点鸡肋,所以不建议使用这种方式。

cursors = collection.find({"asn_code": "", "asn_name": ""}, {"_id": 0, "ip": 1}, cursor_type=pymongo.cursor.CursorType.EXHAUST, batch_size=500, no_cursor_timeout=True)

ObjectId

专业选手应该都学会先翻阅文档:ObjectId
ObjectId = epoch时间(4字节) + 机器标识(3字节) + 进程号PID(2字节) + 计数器(3字节)

所以如果要将ObjectId格式化成时间的话,就可以用下面这个函数

def id2time(object_id):
    timeStamp = int(object_id[:8], 16)
    return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timeStamp))

如果需要对库中的数据进行清洗,那就可以按照insert时间分片查询,避免服务器内存不足或者数据量太大导致查询缓慢,经过测试,基于ObjectId索引的查询会比我们自己创建的索引速度快上不少,而且可以分片,减轻服务器压力。

from bson.objectid import ObjectId
per = 30
to_time = datetime.datetime.utcnow()
from_time = to_time - datetime.timedelta(minutes=per)

db.collection.find({'_id': {'$gt': ObjectId.from_datetime(from_time), '$lte':ObjectId.from_datetime(to_time)}})

因为mongodb是基于cursor的,所以就算你在大集合下,你用一个find({}),依然不会把你的内存塞满,而是一个个遍历下去,只是对于机子性能比较捉急的,可能就会出现假死的现象,所以用不用ObjectId来分类就见仁见智啦。

数据迁移

数据转移的常规方法就是把老库的数据全都遍历一遍,但是呢单gevent协程不够暴力,那就加个多进程,直接起飞……

但是如果你的数据是有增量的,那就可以用监听CursorType.TAILABLE_AWAIT来完成同步/异步迁移

import time
import pymongo

client = pymongo.MongoClient(host=uri)

oplog = client.local.oplog.rs
first = oplog.find().sort('$natural', pymongo.ASCENDING).limit(-1).next()

ts = first['ts']

while True:
    cursor = oplog.find({'ts': {'$gt': ts}},
                        cursor_type=pymongo.CursorType.TAILABLE_AWAIT,
                        oplog_replay=True)
    while cursor.alive:
        for doc in cursor:
            print(doc)
            if doc['op'] == 'i' and doc['o'] == 'my_collection':
                # blablabla     
        time.sleep(1)

简单的介绍一下oplog的语义,如果想更高级的利用的话,自己看文档嗷。

  • ts: 时间戳
  • op: 操作类型
    • "i": insert
    • "u": update
    • "d": delete
    • "c": command
  • "ns": 操作所在的namespace,通常为数据库名.集合名
  • "o": 当前操作的内容
  • "o2": 在执行更新时的where条件,当opu时才有

数据清洗

因为一开始数据库没有设多字段索引唯一,所以在高并发下导致有若干重复数据写入,还是要写个脚本给他洗个澡。

pipeline = [
    { "$group": { "_id": {"ip":"$ip","port":"$port","time":"$time"}, "count":{"$sum": 1}, "dups": {"$addToSet": '$_id'}} },
    { "$match": { "count": {"$gt": 1}}}
]

map_id = map(lambda doc: doc['dups'][1:], collection.aggregate(pipeline=pipeline,allowDiskUse=True))
list_id = [item for sublist in map_id for item in sublist]

for _id in list_id:
    print(collection.delete_one({"_id":_id}))

查询语句优化

利用explain()可以查看查询语句的各个阶段的执行状态,从而进行查询语句的速度优化。

db.shodan_stream_official.explain("executionStats").aggregate( [
     { $match: { "asn": "", "org":"" } },
   { $group: { _id: "$ip" } }
],{"allowDiskUse":true})