From f411049b825b7bf377b59d60021797c039cfc9d7 Mon Sep 17 00:00:00 2001 From: Jakob Ackermann Date: Tue, 23 Feb 2021 13:57:27 +0000 Subject: [PATCH 1/2] [misc] add a new endpoint for exporting all the project history --- services/track-changes/app.js | 1 + .../track-changes/app/js/HttpController.js | 61 +++++++++++++++++++ .../track-changes/app/js/UpdatesManager.js | 36 +++++++++++ .../acceptance/js/ArchivingUpdatesTests.js | 46 +++++++++++++- .../test/acceptance/js/ExportProjectTests.js | 29 +++++++++ .../js/helpers/TrackChangesClient.js | 11 ++++ 6 files changed, 183 insertions(+), 1 deletion(-) create mode 100644 services/track-changes/test/acceptance/js/ExportProjectTests.js diff --git a/services/track-changes/app.js b/services/track-changes/app.js index b9ca5120ce..9dd8f9440b 100644 --- a/services/track-changes/app.js +++ b/services/track-changes/app.js @@ -64,6 +64,7 @@ app.get('/project/:project_id/doc/:doc_id/diff', HttpController.getDiff) app.get('/project/:project_id/doc/:doc_id/check', HttpController.checkDoc) app.get('/project/:project_id/updates', HttpController.getUpdates) +app.get('/project/:project_id/export', HttpController.exportProject) app.post('/project/:project_id/flush', HttpController.flushProject) diff --git a/services/track-changes/app/js/HttpController.js b/services/track-changes/app/js/HttpController.js index c167bae1c9..54191fd909 100644 --- a/services/track-changes/app/js/HttpController.js +++ b/services/track-changes/app/js/HttpController.js @@ -200,6 +200,67 @@ module.exports = HttpController = { ) }, + exportProject(req, res, next) { + // The project history can be huge: + // - updates can weight MBs for insert/delete of full doc + // - multiple updates form a pack + // Flush updates per pack onto the wire. + const { project_id } = req.params + logger.log({ project_id }, 'exporting project history') + UpdatesManager.exportProject(project_id, function ( + err, + updates, + confirmWrite + ) { + const abortStreaming = req.aborted || res.finished || res.destroyed + if (abortStreaming) { + // Tell the producer to stop emitting data + if (confirmWrite) confirmWrite(new Error('stop')) + return + } + const hasStartedStreamingResponse = res.headersSent + if (err) { + logger.error({ project_id, err }, 'export failed') + if (!hasStartedStreamingResponse) { + // Generate a nice 500 + return next(err) + } else { + // Stop streaming + return res.destroy() + } + } + // Compose the response incrementally + const isFirstWrite = !hasStartedStreamingResponse + const isLastWrite = updates.length === 0 + if (isFirstWrite) { + // The first write will emit the 200 status, headers and start of the + // response payload (open array) + res.setHeader('Content-Type', 'application/json') + res.writeHead(200) + res.write('[') + } + if (!isFirstWrite && !isLastWrite) { + // Starting from the 2nd non-empty write, emit a continuing comma. + // write 1: [updates1 + // write 2: ,updates2 + // write 3: ,updates3 + // write N: ] + res.write(',') + } + + // Every write will emit a blob onto the response stream: + // '[update1,update2,...]' + // ^^^^^^^^^^^^^^^^^^^ + res.write(JSON.stringify(updates).slice(1, -1), confirmWrite) + + if (isLastWrite) { + // The last write will have no updates and will finish the response + // payload (close array). + res.end(']') + } + }) + }, + restore(req, res, next) { if (next == null) { next = function (error) {} diff --git a/services/track-changes/app/js/UpdatesManager.js b/services/track-changes/app/js/UpdatesManager.js index 811e9ba1ca..8e0e2dc569 100644 --- a/services/track-changes/app/js/UpdatesManager.js +++ b/services/track-changes/app/js/UpdatesManager.js @@ -631,6 +631,42 @@ module.exports = UpdatesManager = { ) }, + exportProject(projectId, consumer) { + // Flush anything before collecting updates. + UpdatesManager.processUncompressedUpdatesForProject(projectId, (err) => { + if (err) return consumer(err) + + // Fetch all the packs. + const before = undefined + PackManager.makeProjectIterator(projectId, before, (err, iterator) => { + if (err) return consumer(err) + + async.whilst( + () => !iterator.done(), + + (cb) => + iterator.next((err, updatesFromASinglePack) => { + if (err) return cb(err) + + if (updatesFromASinglePack.length === 0) { + // This should not happen when `iterator.done() == false`. + // Emitting an empty array would signal the consumer the final + // call. + return cb() + } + // Emit updates and wait for the consumer. + consumer(null, updatesFromASinglePack, cb) + }), + + (err) => { + if (err) return consumer(err) + consumer(null, []) + } + ) + }) + }) + }, + fetchUserInfo(users, callback) { if (callback == null) { callback = function (error, fetchedUserInfo) {} diff --git a/services/track-changes/test/acceptance/js/ArchivingUpdatesTests.js b/services/track-changes/test/acceptance/js/ArchivingUpdatesTests.js index 5df0844409..d082634573 100644 --- a/services/track-changes/test/acceptance/js/ArchivingUpdatesTests.js +++ b/services/track-changes/test/acceptance/js/ArchivingUpdatesTests.js @@ -141,6 +141,48 @@ describe('Archiving updates', function () { ) }) + function testExportFeature() { + describe('exporting the project', function () { + before('fetch export', function (done) { + TrackChangesClient.exportProject(this.project_id, (error, updates) => { + if (error) { + return done(error) + } + this.exportedUpdates = updates + done() + }) + }) + + it('should include all the imported updates, with ids, sorted by timestamp', function () { + // Add a safe guard for an empty array matching an empty export. + expect(this.updates).to.have.length(1024 + 22) + + const expectedExportedUpdates = this.updates + .slice() + .reverse() + .map((update) => { + // clone object, updates are created once in before handler + const exportedUpdate = Object.assign({}, update) + exportedUpdate.meta = Object.assign({}, update.meta) + + exportedUpdate.doc_id = this.doc_id + exportedUpdate.project_id = this.project_id + + // This is for merged updates, which does not apply here. + exportedUpdate.meta.start_ts = exportedUpdate.meta.end_ts = + exportedUpdate.meta.ts + delete exportedUpdate.meta.ts + return exportedUpdate + }) + expect(this.exportedUpdates).to.deep.equal(expectedExportedUpdates) + }) + }) + } + + describe("before archiving a doc's updates", function () { + testExportFeature() + }) + describe("archiving a doc's updates", function () { before(function (done) { TrackChangesClient.pushDocHistory( @@ -219,7 +261,7 @@ describe('Archiving updates', function () { ) }) - return it('should store 1024 doc changes in S3 in one pack', function (done) { + it('should store 1024 doc changes in S3 in one pack', function (done) { return db.docHistoryIndex.findOne( { _id: ObjectId(this.doc_id) }, (error, index) => { @@ -240,6 +282,8 @@ describe('Archiving updates', function () { } ) }) + + testExportFeature() }) return describe("unarchiving a doc's updates", function () { diff --git a/services/track-changes/test/acceptance/js/ExportProjectTests.js b/services/track-changes/test/acceptance/js/ExportProjectTests.js new file mode 100644 index 0000000000..d2958d7fc0 --- /dev/null +++ b/services/track-changes/test/acceptance/js/ExportProjectTests.js @@ -0,0 +1,29 @@ +const { expect } = require('chai') +const { ObjectId } = require('../../../app/js/mongodb') + +const TrackChangesApp = require('./helpers/TrackChangesApp') +const TrackChangesClient = require('./helpers/TrackChangesClient') + +describe('ExportProject', function () { + before('start app', function (done) { + TrackChangesApp.ensureRunning(done) + }) + + describe('when there are no updates', function () { + before('fetch export', function (done) { + TrackChangesClient.exportProject(ObjectId(), (error, updates) => { + if (error) { + return done(error) + } + this.exportedUpdates = updates + done() + }) + }) + + it('should export an empty array', function () { + expect(this.exportedUpdates).to.deep.equal([]) + }) + }) + + // see ArchivingUpdatesTests for tests with data in mongo/s3 +}) diff --git a/services/track-changes/test/acceptance/js/helpers/TrackChangesClient.js b/services/track-changes/test/acceptance/js/helpers/TrackChangesClient.js index 9e8e05bbc4..aa25839341 100644 --- a/services/track-changes/test/acceptance/js/helpers/TrackChangesClient.js +++ b/services/track-changes/test/acceptance/js/helpers/TrackChangesClient.js @@ -165,6 +165,17 @@ module.exports = TrackChangesClient = { ) }, + exportProject(project_id, callback) { + request.get( + { url: `http://localhost:3015/project/${project_id}/export`, json: true }, + (error, response, updates) => { + if (error) return callback(error) + response.statusCode.should.equal(200) + callback(null, updates) + } + ) + }, + restoreDoc(project_id, doc_id, version, user_id, callback) { if (callback == null) { callback = function (error) {} From ed70b99d8aa0e0863b2b3db71d9c6116a53043ae Mon Sep 17 00:00:00 2001 From: Jakob Ackermann Date: Thu, 25 Feb 2021 09:52:54 +0000 Subject: [PATCH 2/2] [misc] exportProject: collect and send user ids of updates in trailer --- .../track-changes/app/js/HttpController.js | 6 +++-- .../track-changes/app/js/UpdatesManager.js | 19 +++++++++++++-- .../acceptance/js/ArchivingUpdatesTests.js | 23 +++++++++++++------ .../test/acceptance/js/ExportProjectTests.js | 17 +++++++++----- .../js/helpers/TrackChangesClient.js | 2 +- 5 files changed, 49 insertions(+), 18 deletions(-) diff --git a/services/track-changes/app/js/HttpController.js b/services/track-changes/app/js/HttpController.js index 54191fd909..c5db48210a 100644 --- a/services/track-changes/app/js/HttpController.js +++ b/services/track-changes/app/js/HttpController.js @@ -209,7 +209,7 @@ module.exports = HttpController = { logger.log({ project_id }, 'exporting project history') UpdatesManager.exportProject(project_id, function ( err, - updates, + { updates, userIds }, confirmWrite ) { const abortStreaming = req.aborted || res.finished || res.destroyed @@ -236,6 +236,7 @@ module.exports = HttpController = { // The first write will emit the 200 status, headers and start of the // response payload (open array) res.setHeader('Content-Type', 'application/json') + res.setHeader('Trailer', 'X-User-Ids') res.writeHead(200) res.write('[') } @@ -255,7 +256,8 @@ module.exports = HttpController = { if (isLastWrite) { // The last write will have no updates and will finish the response - // payload (close array). + // payload (close array) and emit the userIds as trailer. + res.addTrailers({ 'X-User-Ids': JSON.stringify(userIds) }) res.end(']') } }) diff --git a/services/track-changes/app/js/UpdatesManager.js b/services/track-changes/app/js/UpdatesManager.js index 8e0e2dc569..7890208788 100644 --- a/services/track-changes/app/js/UpdatesManager.js +++ b/services/track-changes/app/js/UpdatesManager.js @@ -641,6 +641,8 @@ module.exports = UpdatesManager = { PackManager.makeProjectIterator(projectId, before, (err, iterator) => { if (err) return consumer(err) + const accumulatedUserIds = new Set() + async.whilst( () => !iterator.done(), @@ -654,13 +656,26 @@ module.exports = UpdatesManager = { // call. return cb() } + updatesFromASinglePack.forEach((update) => { + accumulatedUserIds.add( + // Super defensive access on update details. + String(update && update.meta && update.meta.user_id) + ) + }) // Emit updates and wait for the consumer. - consumer(null, updatesFromASinglePack, cb) + consumer(null, { updates: updatesFromASinglePack }, cb) }), (err) => { if (err) return consumer(err) - consumer(null, []) + + // Adding undefined can happen for broken updates. + accumulatedUserIds.delete('undefined') + + consumer(null, { + updates: [], + userIds: Array.from(accumulatedUserIds).sort() + }) } ) }) diff --git a/services/track-changes/test/acceptance/js/ArchivingUpdatesTests.js b/services/track-changes/test/acceptance/js/ArchivingUpdatesTests.js index d082634573..bb1024389c 100644 --- a/services/track-changes/test/acceptance/js/ArchivingUpdatesTests.js +++ b/services/track-changes/test/acceptance/js/ArchivingUpdatesTests.js @@ -50,6 +50,7 @@ describe('Archiving updates', function () { this.now = Date.now() this.to = this.now this.user_id = ObjectId().toString() + this.user_id_2 = ObjectId().toString() this.doc_id = ObjectId().toString() this.project_id = ObjectId().toString() @@ -92,7 +93,7 @@ describe('Archiving updates', function () { op: [{ i: 'b', p: 0 }], meta: { ts: this.now + (i - 2048) * this.hours + 10 * this.minutes, - user_id: this.user_id + user_id: this.user_id_2 }, v: 2 * i + 2 }) @@ -144,13 +145,17 @@ describe('Archiving updates', function () { function testExportFeature() { describe('exporting the project', function () { before('fetch export', function (done) { - TrackChangesClient.exportProject(this.project_id, (error, updates) => { - if (error) { - return done(error) + TrackChangesClient.exportProject( + this.project_id, + (error, updates, userIds) => { + if (error) { + return done(error) + } + this.exportedUpdates = updates + this.exportedUserIds = userIds + done() } - this.exportedUpdates = updates - done() - }) + ) }) it('should include all the imported updates, with ids, sorted by timestamp', function () { @@ -175,6 +180,10 @@ describe('Archiving updates', function () { return exportedUpdate }) expect(this.exportedUpdates).to.deep.equal(expectedExportedUpdates) + expect(this.exportedUserIds).to.deep.equal([ + this.user_id, + this.user_id_2 + ]) }) }) } diff --git a/services/track-changes/test/acceptance/js/ExportProjectTests.js b/services/track-changes/test/acceptance/js/ExportProjectTests.js index d2958d7fc0..b6ca106a60 100644 --- a/services/track-changes/test/acceptance/js/ExportProjectTests.js +++ b/services/track-changes/test/acceptance/js/ExportProjectTests.js @@ -11,17 +11,22 @@ describe('ExportProject', function () { describe('when there are no updates', function () { before('fetch export', function (done) { - TrackChangesClient.exportProject(ObjectId(), (error, updates) => { - if (error) { - return done(error) + TrackChangesClient.exportProject( + ObjectId(), + (error, updates, userIds) => { + if (error) { + return done(error) + } + this.exportedUpdates = updates + this.exportedUserIds = userIds + done() } - this.exportedUpdates = updates - done() - }) + ) }) it('should export an empty array', function () { expect(this.exportedUpdates).to.deep.equal([]) + expect(this.exportedUserIds).to.deep.equal([]) }) }) diff --git a/services/track-changes/test/acceptance/js/helpers/TrackChangesClient.js b/services/track-changes/test/acceptance/js/helpers/TrackChangesClient.js index aa25839341..f20884448d 100644 --- a/services/track-changes/test/acceptance/js/helpers/TrackChangesClient.js +++ b/services/track-changes/test/acceptance/js/helpers/TrackChangesClient.js @@ -171,7 +171,7 @@ module.exports = TrackChangesClient = { (error, response, updates) => { if (error) return callback(error) response.statusCode.should.equal(200) - callback(null, updates) + callback(null, updates, JSON.parse(response.trailers['x-user-ids'])) } ) },