首页
论坛
社团
我的

ADSKN论坛

 找回密码
立即注册

扫一扫,微信登陆

开启左侧

利用MongoDB中oplog机制实现准实时数据的利用 监控

[复制链接]
 楼主| 草木人语 发表于2022-7-18 23:54:56 | 显示全部楼层 |阅读模式 | 来自 上海市 腾讯云

马上注册加入论坛,获取更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x

媒介

近来 有一个需求是要实时获取到新插入到MongoDB的数据,而插入步伐 本身 已经有一套处理逻辑,以是 不方便直接在插入步伐 里写相干 步伐 ,传统的数据库大多自带这种触发器机制,但是Mongo没有相干 的函数可以用(也可能我相识 的太少了,求纠正),固然 尚有 一点是需要python实现,于是网络 整理了一个相应的实现方法。

一、引子

首先可以想到,这种需求实在 很像数据库的主从备份机制,从数据库之以是 可以大概 同步主库是由于 存在某些指标来做控制,我们知道MongoDB虽然没有现成触发器,但是它可以大概 实现主从备份,以是 我们就从它的主从备份机制入手。

二、OPLOG

首先,需要以master模式来打开mongod保卫 ,命令行利用 –master,大概 设置 文件增长 master键为true。

此时,我们可以在Mongo的系统库local里见到新增的collection——oplog,此时

  1. oplog.$main
复制代码
里就会存储进oplog信息,假如 此时尚有 充当从数据库的Mongo存在,就会尚有 一些slaves的信息,由于我们这里并不是主从同步,以是 不存在这些聚集 。

005535zkd4do02y2zof9d0.jpg

再来看看oplog结构:

  1. "ts" : Timestamp(6417682881216249, 1), 时间戳
  2. "h" : NumberLong(0), 长度
  3. "v" : 2,
  4. "op" : "n", 利用 类型
  5. "ns" : "", 利用 的库和聚集
  6. "o2" : "_id" update条件
  7. "o" : {} 利用 值,即document
复制代码

这里需要知道op的几种属性:

  1. insert,'i'
  2. update, 'u'
  3. remove(delete), 'd'
  4. cmd, 'c'
  5. noop, 'n' 空利用
复制代码

从上面的信息可以看出,我们只要不绝 读取到ts来做对比,然后根据op即可判断 当前出现的是什么利用 ,相当 于利用 步伐 实现了一个从数据库的吸收 端。

三、CODE

在Github上找到了别人的实现方式,不外 它的函数库太老旧,以是 在他的基础上举行 修改。

Github地点 :https://github.com/RedBeard0531/mongo-oplog-watcher

mongo_oplog_watcher.py如下:

  1. #!/usr/bin/python
  2. import pymongo
  3. import re
  4. import time
  5. from pprint import pprint # pretty printer
  6. from pymongo.errors import AutoReconnect
  7. class OplogWatcher(object):
  8. def __init__(self, db=None, collection=None, poll_time=1.0, connection=None, start_now=True):
  9. if collection is not None:
  10. if db is None:
  11. raise ValueError('must specify db if you specify a collection')
  12. self._ns_filter = db + '.' + collection
  13. elif db is not None:
  14. self._ns_filter = re.compile(r'^%s\.' % db)
  15. else:
  16. self._ns_filter = None
  17. self.poll_time = poll_time
  18. self.connection = connection or pymongo.Connection()
  19. if start_now:
  20. self.start()
  21. @staticmethod
  22. def __get_id(op):
  23. id = None
  24. o2 = op.get('o2')
  25. if o2 is not None:
  26. id = o2.get('_id')
  27. if id is None:
  28. id = op['o'].get('_id')
  29. return id
  30. def start(self):
  31. oplog = self.connection.local['oplog.$main']
  32. ts = oplog.find().sort('$natural', -1)[0]['ts']
  33. while True:
  34. if self._ns_filter is None:
  35. filter = {}
  36. else:
  37. filter = {'ns': self._ns_filter}
  38. filter['ts'] = {'$gt': ts}
  39. try:
  40. cursor = oplog.find(filter, tailable=True)
  41. while True:
  42. for op in cursor:
  43. ts = op['ts']
  44. id = self.__get_id(op)
  45. self.all_with_noop(ns=op['ns'], ts=ts, op=op['op'], id=id, raw=op)
  46. time.sleep(self.poll_time)
  47. if not cursor.alive:
  48. break
  49. except AutoReconnect:
  50. time.sleep(self.poll_time)
  51. def all_with_noop(self, ns, ts, op, id, raw):
  52. if op == 'n':
  53. self.noop(ts=ts)
  54. else:
  55. self.all(ns=ns, ts=ts, op=op, id=id, raw=raw)
  56. def all(self, ns, ts, op, id, raw):
  57. if op == 'i':
  58. self.insert(ns=ns, ts=ts, id=id, obj=raw['o'], raw=raw)
  59. elif op == 'u':
  60. self.update(ns=ns, ts=ts, id=id, mod=raw['o'], raw=raw)
  61. elif op == 'd':
  62. self.delete(ns=ns, ts=ts, id=id, raw=raw)
  63. elif op == 'c':
  64. self.command(ns=ns, ts=ts, cmd=raw['o'], raw=raw)
  65. elif op == 'db':
  66. self.db_declare(ns=ns, ts=ts, raw=raw)
  67. def noop(self, ts):
  68. pass
  69. def insert(self, ns, ts, id, obj, raw, **kw):
  70. pass
  71. def update(self, ns, ts, id, mod, raw, **kw):
  72. pass
  73. def delete(self, ns, ts, id, raw, **kw):
  74. pass
  75. def command(self, ns, ts, cmd, raw, **kw):
  76. pass
  77. def db_declare(self, ns, ts, **kw):
  78. pass
  79. class OplogPrinter(OplogWatcher):
  80. def all(self, **kw):
  81. pprint (kw)
  82. print #newline
  83. if __name__ == '__main__':
  84. OplogPrinter()
复制代码

首先是实现一个数据库的初始化,设定一个耽误 时间(准实时):

  1. self.poll_time = poll_time
  2. self.connection = connection or pymongo.MongoClient()
复制代码

重要 的函数是

  1. start()
复制代码
,实现一个时间的比对并举行 相应字段的处理:

  1. def start(self):
  2. oplog = self.connection.local['oplog.$main']
  3. #读取之前提到的库
  4. ts = oplog.find().sort('$natural', -1)[0]['ts']
  5. #获取一个时间边际
  6. while True:
  7. if self._ns_filter is None:
  8. filter = {}
  9. else:
  10. filter = {'ns': self._ns_filter}
  11. filter['ts'] = {'$gt': ts}
  12. try:
  13. cursor = oplog.find(filter)
  14. #对此时间之后的举行 处理
  15. while True:
  16. for op in cursor:
  17. ts = op['ts']
  18. id = self.__get_id(op)
  19. self.all_with_noop(ns=op['ns'], ts=ts, op=op['op'], id=id, raw=op)
  20. #可以指定处理插入监控,更新监控大概 删除监控等
  21. time.sleep(self.poll_time)
  22. if not cursor.alive:
  23. break
  24. except AutoReconnect:
  25. time.sleep(self.poll_time)
复制代码

循环这个start函数,在all_with_noop这里就可以编写相应的监控处理逻辑。

如许 就可以实现一个浅易 的准实时Mongo数据库利用 监控器,下一步就可以共同 其他利用 来对新入库的步伐 举行 相应处理。

总结

以上就是这篇文章的全部内容了,希望本文的内容对各人 的学习大概 工作能带来肯定 的资助,假如 有疑问各人 可以留言交换 ,谢谢各人 对脚本之家的支持。


来源:https://www.jb51.net/article/113432.htm
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
公告
  • 问题反馈请扫码加入一期核心用户群
  • [学生认证] 认证后获取生活类板块发帖权限
高级模式
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则 需要先绑定手机号

QQ|Archiver|手机版|小黑屋|ADSKN短链接收益平台 ( 冀ICP备2021002162号 )

GMT+8, 2022-12-4 19:24 , Processed in 0.342710 second(s), 15 queries , Gzip On, Redis On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表