diff --git a/services/track-changes/app.js b/services/track-changes/app.js index b9ca5120ce..9dd8f9440b 100644 --- a/services/track-changes/app.js +++ b/services/track-changes/app.js @@ -64,6 +64,7 @@ app.get('/project/:project_id/doc/:doc_id/diff', HttpController.getDiff) app.get('/project/:project_id/doc/:doc_id/check', HttpController.checkDoc) app.get('/project/:project_id/updates', HttpController.getUpdates) +app.get('/project/:project_id/export', HttpController.exportProject) app.post('/project/:project_id/flush', HttpController.flushProject) diff --git a/services/track-changes/app/js/HttpController.js b/services/track-changes/app/js/HttpController.js index c167bae1c9..c5db48210a 100644 --- a/services/track-changes/app/js/HttpController.js +++ b/services/track-changes/app/js/HttpController.js @@ -200,6 +200,69 @@ module.exports = HttpController = { ) }, + exportProject(req, res, next) { + // The project history can be huge: + // - updates can weight MBs for insert/delete of full doc + // - multiple updates form a pack + // Flush updates per pack onto the wire. + const { project_id } = req.params + logger.log({ project_id }, 'exporting project history') + UpdatesManager.exportProject(project_id, function ( + err, + { updates, userIds }, + confirmWrite + ) { + const abortStreaming = req.aborted || res.finished || res.destroyed + if (abortStreaming) { + // Tell the producer to stop emitting data + if (confirmWrite) confirmWrite(new Error('stop')) + return + } + const hasStartedStreamingResponse = res.headersSent + if (err) { + logger.error({ project_id, err }, 'export failed') + if (!hasStartedStreamingResponse) { + // Generate a nice 500 + return next(err) + } else { + // Stop streaming + return res.destroy() + } + } + // Compose the response incrementally + const isFirstWrite = !hasStartedStreamingResponse + const isLastWrite = updates.length === 0 + if (isFirstWrite) { + // The first write will emit the 200 status, headers and start of the + // response payload (open array) + res.setHeader('Content-Type', 'application/json') + res.setHeader('Trailer', 'X-User-Ids') + res.writeHead(200) + res.write('[') + } + if (!isFirstWrite && !isLastWrite) { + // Starting from the 2nd non-empty write, emit a continuing comma. + // write 1: [updates1 + // write 2: ,updates2 + // write 3: ,updates3 + // write N: ] + res.write(',') + } + + // Every write will emit a blob onto the response stream: + // '[update1,update2,...]' + // ^^^^^^^^^^^^^^^^^^^ + res.write(JSON.stringify(updates).slice(1, -1), confirmWrite) + + if (isLastWrite) { + // The last write will have no updates and will finish the response + // payload (close array) and emit the userIds as trailer. + res.addTrailers({ 'X-User-Ids': JSON.stringify(userIds) }) + res.end(']') + } + }) + }, + restore(req, res, next) { if (next == null) { next = function (error) {} diff --git a/services/track-changes/app/js/UpdatesManager.js b/services/track-changes/app/js/UpdatesManager.js index 811e9ba1ca..7890208788 100644 --- a/services/track-changes/app/js/UpdatesManager.js +++ b/services/track-changes/app/js/UpdatesManager.js @@ -631,6 +631,57 @@ module.exports = UpdatesManager = { ) }, + exportProject(projectId, consumer) { + // Flush anything before collecting updates. + UpdatesManager.processUncompressedUpdatesForProject(projectId, (err) => { + if (err) return consumer(err) + + // Fetch all the packs. + const before = undefined + PackManager.makeProjectIterator(projectId, before, (err, iterator) => { + if (err) return consumer(err) + + const accumulatedUserIds = new Set() + + async.whilst( + () => !iterator.done(), + + (cb) => + iterator.next((err, updatesFromASinglePack) => { + if (err) return cb(err) + + if (updatesFromASinglePack.length === 0) { + // This should not happen when `iterator.done() == false`. + // Emitting an empty array would signal the consumer the final + // call. + return cb() + } + updatesFromASinglePack.forEach((update) => { + accumulatedUserIds.add( + // Super defensive access on update details. + String(update && update.meta && update.meta.user_id) + ) + }) + // Emit updates and wait for the consumer. + consumer(null, { updates: updatesFromASinglePack }, cb) + }), + + (err) => { + if (err) return consumer(err) + + // Adding undefined can happen for broken updates. + accumulatedUserIds.delete('undefined') + + consumer(null, { + updates: [], + userIds: Array.from(accumulatedUserIds).sort() + }) + } + ) + }) + }) + }, + fetchUserInfo(users, callback) { if (callback == null) { callback = function (error, fetchedUserInfo) {} diff --git a/services/track-changes/test/acceptance/js/ArchivingUpdatesTests.js b/services/track-changes/test/acceptance/js/ArchivingUpdatesTests.js index 5df0844409..bb1024389c 100644 --- a/services/track-changes/test/acceptance/js/ArchivingUpdatesTests.js +++ b/services/track-changes/test/acceptance/js/ArchivingUpdatesTests.js @@ -50,6 +50,7 @@ describe('Archiving updates', function () { this.now = Date.now() this.to = this.now this.user_id = ObjectId().toString() + this.user_id_2 = ObjectId().toString() this.doc_id = ObjectId().toString() this.project_id = ObjectId().toString() @@ -92,7 +93,7 @@ describe('Archiving updates', function () { op: [{ i: 'b', p: 0 }], meta: { ts: this.now + (i - 2048) * this.hours + 10 * this.minutes, - user_id: this.user_id + user_id: this.user_id_2 }, v: 2 * i + 2 }) @@ -141,6 +142,56 @@ describe('Archiving updates', function () { ) }) + function testExportFeature() { + describe('exporting the project', function () { + before('fetch export', function (done) { + TrackChangesClient.exportProject( + this.project_id, + (error, updates, userIds) => { + if (error) { + return done(error) + } + this.exportedUpdates = updates + this.exportedUserIds = userIds + done() + } + ) + }) + + it('should include all the imported updates, with ids, sorted by timestamp', function () { + // Add a safe guard for an empty array matching an empty export. + expect(this.updates).to.have.length(1024 + 22) + + const expectedExportedUpdates = this.updates + .slice() + .reverse() + .map((update) => { + // clone object, updates are created once in before handler + const exportedUpdate = Object.assign({}, update) + exportedUpdate.meta = Object.assign({}, update.meta) + + exportedUpdate.doc_id = this.doc_id + exportedUpdate.project_id = this.project_id + + // This is for merged updates, which does not apply here. + exportedUpdate.meta.start_ts = exportedUpdate.meta.end_ts = + exportedUpdate.meta.ts + delete exportedUpdate.meta.ts + return exportedUpdate + }) + expect(this.exportedUpdates).to.deep.equal(expectedExportedUpdates) + expect(this.exportedUserIds).to.deep.equal([ + this.user_id, + this.user_id_2 + ]) + }) + }) + } + + describe("before archiving a doc's updates", function () { + testExportFeature() + }) + describe("archiving a doc's updates", function () { before(function (done) { TrackChangesClient.pushDocHistory( @@ -219,7 +270,7 @@ describe('Archiving updates', function () { ) }) - return it('should store 1024 doc changes in S3 in one pack', function (done) { + it('should store 1024 doc changes in S3 in one pack', function (done) { return db.docHistoryIndex.findOne( { _id: ObjectId(this.doc_id) }, (error, index) => { @@ -240,6 +291,8 @@ describe('Archiving updates', function () { } ) }) + + testExportFeature() }) return describe("unarchiving a doc's updates", function () { diff --git a/services/track-changes/test/acceptance/js/ExportProjectTests.js b/services/track-changes/test/acceptance/js/ExportProjectTests.js new file mode 100644 index 0000000000..b6ca106a60 --- /dev/null +++ b/services/track-changes/test/acceptance/js/ExportProjectTests.js @@ -0,0 +1,34 @@ +const { expect } = require('chai') +const { ObjectId } = require('../../../app/js/mongodb') + +const TrackChangesApp = require('./helpers/TrackChangesApp') +const TrackChangesClient = require('./helpers/TrackChangesClient') + +describe('ExportProject', function () { + before('start app', function (done) { + TrackChangesApp.ensureRunning(done) + }) + + describe('when there are no updates', function () { + before('fetch export', function (done) { + TrackChangesClient.exportProject( + ObjectId(), + (error, updates, userIds) => { + if (error) { + return done(error) + } + this.exportedUpdates = updates + this.exportedUserIds = userIds + done() + } + ) + }) + + it('should export an empty array', function () { + expect(this.exportedUpdates).to.deep.equal([]) + expect(this.exportedUserIds).to.deep.equal([]) + }) + }) + + // see ArchivingUpdatesTests for tests with data in mongo/s3 +}) diff --git a/services/track-changes/test/acceptance/js/helpers/TrackChangesClient.js b/services/track-changes/test/acceptance/js/helpers/TrackChangesClient.js index 9e8e05bbc4..f20884448d 100644 --- a/services/track-changes/test/acceptance/js/helpers/TrackChangesClient.js +++ b/services/track-changes/test/acceptance/js/helpers/TrackChangesClient.js @@ -165,6 +165,17 @@ module.exports = TrackChangesClient = { ) }, + exportProject(project_id, callback) { + request.get( + { url: `http://localhost:3015/project/${project_id}/export`, json: true }, + (error, response, updates) => { + if (error) return callback(error) + response.statusCode.should.equal(200) + callback(null, updates, JSON.parse(response.trailers['x-user-ids'])) + } + ) + }, + restoreDoc(project_id, doc_id, version, user_id, callback) { if (callback == null) { callback = function (error) {}