每一个变更任务会不断对topic产生写操作,触发一系列ChangeEvent产生:
- doInsert:生成随机频道的topic后,执行insert;
- doUpdate:随机取得一个topic,将其channel字段改为随机值,执行update;
- doReplace:随机取得一个topic,将其channel字段改为随机值,执行replace;
- doDelete:随机取得一个topic,执行delete。
以doUpdate为例,实现代码如下:

启动一个全量迁移任务,将topic表中数据迁移到topic_new新表:

在全量迁移开始前,先获得当前时刻的的最大 _id 值(可以将此值记录下来)作为终点,随后逐个完成迁移转换。
在全量迁移完成后,便开始最后一步:增量迁移。
注:增量迁移过程中,变更操作仍然在进行。
- final MongoCollection<Document> topicIncrCollection = getCollection(coll_topic_incr);
- final MongoCollection<Document> topicNewCollection = getCollection(coll_topic_new);
- ObjectId currentId = null;
- Document sort = new Document("_id", 1);
- MongoCursor<Document> cursor = null;
- // 批量大小
- int batchSize = 100;AtomicInteger count = new AtomicInteger(0);
- try {
- while (true) {
- boolean isWatchTaskStillRunning = watchFlag.getCount() > 0;
- // 按ID增量分段拉取
- if (currentId == null) {
- cursor = topicIncrCollection.find().sort(sort).limit(batchSize).iterator();
- } else {
- cursor = topicIncrCollection.find(new Document("_id", new Document("$gt", currentId)))
- .sort(sort).limit(batchSize).iterator();
- }
- boolean hasIncrRecord = false;
- while (cursor.hasNext()) {
- hasIncrRecord = true;
- Document incrDoc = cursor.next();
- OperationType opType = OperationType.fromString(incrDoc.getString(field_op));
- ObjectId docId = incrDoc.getObjectId(field_key);
- // 记录当前ID
- currentId = incrDoc.getObjectId("_id");
- if (opType == OperationType.DELETE) {
- topicNewCollection.deleteOne(new Document("_id", docId));
- } else {
- Document doc = incrDoc.get(field_data, Document.class);
- // channel转换
- String oldChannel = doc.getString(field_channel);
- doc.put(field_channel, Channel.toNewName(oldChannel));
- // 启用upsert
- UpdateOptions options = new UpdateOptions().upsert(true);
- topicNewCollection.replaceOne(new Document("_id", docId),
- incrDoc.get(field_data, Document.class), options);
- }
- if (count.incrementAndGet() % 10 == 0) {
- logger.info("IncrTransferTask progress, count: {}", count.get());
- }
- }
- // 当watch停止工作(没有更多变更),同时也没有需要处理的记录时,跳出
- if (!isWatchTaskStillRunning && !hasIncrRecord) {
- break;
- }
- sleep(200);
- }
- } catch (Exception e) {
- logger.error("IncrTransferTask ERROR", e);
- }
增量迁移的实现是一个不断tail的过程,利用 **_id 字段的有序特性 ** 进行分段迁移;即记录下当前处理的_id值,循环拉取在该_id值之后的记录进行处理。
增量表(topic_incr)中除了DELETE变更之外,其余的类型都保留了整个文档,因此可直接利用replace + upsert追加到新表。
最后,运行整个程序。

查看topic表和topic_new表,发现两者数量是相同的。为了进一步确认一致性,我们对两个表的分别做一次聚合统计:
topic表

topic_new表

前者输出结果:

后者输出结果:

前后对比的结果是一致的。
五、后续优化 (编辑:晋中站长网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|