diff --git a/services/docstore/app/js/DocArchiveManager.js b/services/docstore/app/js/DocArchiveManager.js index 5ea98f85e7..9fc74c15ff 100644 --- a/services/docstore/app/js/DocArchiveManager.js +++ b/services/docstore/app/js/DocArchiveManager.js @@ -10,12 +10,10 @@ const PersistorManager = require('./PersistorManager') const pMap = require('p-map') const PARALLEL_JOBS = Settings.parallelArchiveJobs -const ARCHIVE_BATCH_SIZE = Settings.archiveBatchSize const UN_ARCHIVE_BATCH_SIZE = Settings.unArchiveBatchSize module.exports = { archiveAllDocs: callbackify(archiveAllDocs), - archiveDocById: callbackify(archiveDocById), archiveDoc: callbackify(archiveDoc), unArchiveAllDocs: callbackify(unArchiveAllDocs), unarchiveDoc: callbackify(unarchiveDoc), @@ -23,7 +21,6 @@ module.exports = { getDoc: callbackify(getDoc), promises: { archiveAllDocs, - archiveDocById, archiveDoc, unArchiveAllDocs, unarchiveDoc, @@ -33,43 +30,21 @@ module.exports = { } async function archiveAllDocs(projectId) { - while (true) { - const docs = await MongoManager.getNonArchivedProjectDocs( - projectId, - ARCHIVE_BATCH_SIZE - ) - if (!docs || docs.length === 0) { - break - } - - await pMap(docs, doc => archiveDoc(projectId, doc), { - concurrency: PARALLEL_JOBS, - }) - } + const docIds = await MongoManager.getNonArchivedProjectDocIds(projectId) + await pMap(docIds, docId => archiveDoc(projectId, docId), { + concurrency: PARALLEL_JOBS, + }) } -async function archiveDocById(projectId, docId) { - const doc = await MongoManager.findDoc(projectId, docId, { - lines: true, - ranges: true, - rev: true, - inS3: true, - }) +async function archiveDoc(projectId, docId) { + const doc = await MongoManager.getDocForArchiving(projectId, docId) if (!doc) { - throw new Errors.NotFoundError( - `Cannot find doc ${docId} in project ${projectId}` - ) - } - - if (doc.inS3) { - // No need to throw an error if the doc is already archived + // The doc wasn't found, it was already archived, or the lock couldn't be + // acquired. Since we don't know which it is, silently return. return } - await archiveDoc(projectId, doc) -} -async function archiveDoc(projectId, doc) { logger.debug( { project_id: projectId, doc_id: doc._id }, 'sending doc to persistor' @@ -100,7 +75,7 @@ async function archiveDoc(projectId, doc) { await PersistorManager.sendStream(Settings.docstore.bucket, key, stream, { sourceMd5: md5, }) - await MongoManager.markDocAsArchived(doc._id, doc.rev) + await MongoManager.markDocAsArchived(projectId, docId, doc.rev) } async function unArchiveAllDocs(projectId) { diff --git a/services/docstore/app/js/DocManager.js b/services/docstore/app/js/DocManager.js index 15b1efabff..1f5aeee846 100644 --- a/services/docstore/app/js/DocManager.js +++ b/services/docstore/app/js/DocManager.js @@ -364,7 +364,7 @@ module.exports = DocManager = { if (meta.deleted && Settings.docstore.archiveOnSoftDelete) { // The user will not read this doc anytime soon. Flush it out of mongo. - DocArchive.archiveDocById(project_id, doc_id, err => { + DocArchive.archiveDoc(project_id, doc_id, err => { if (err) { logger.warn( { project_id, doc_id, err }, diff --git a/services/docstore/app/js/HttpController.js b/services/docstore/app/js/HttpController.js index 4f3adf2aa4..60c55341dd 100644 --- a/services/docstore/app/js/HttpController.js +++ b/services/docstore/app/js/HttpController.js @@ -238,7 +238,7 @@ function archiveAllDocs(req, res, next) { function archiveDoc(req, res, next) { const { doc_id: docId, project_id: projectId } = req.params logger.debug({ projectId, docId }, 'archiving a doc') - DocArchive.archiveDocById(projectId, docId, function (error) { + DocArchive.archiveDoc(projectId, docId, function (error) { if (error) { return next(error) } diff --git a/services/docstore/app/js/MongoManager.js b/services/docstore/app/js/MongoManager.js index 1c3b71d262..466196b404 100644 --- a/services/docstore/app/js/MongoManager.js +++ b/services/docstore/app/js/MongoManager.js @@ -6,6 +6,8 @@ const OError = require('@overleaf/o-error') const Errors = require('./Errors') const { promisify } = require('util') +const ARCHIVING_LOCK_DURATION_MS = Settings.archivingLockDurationMs + function findDoc(projectId, docId, filter, callback) { db.docs.findOne( { @@ -59,12 +61,17 @@ function getArchivedProjectDocs(projectId, maxResults, callback) { .toArray(callback) } -function getNonArchivedProjectDocs(projectId, maxResults, callback) { - const query = { - project_id: ObjectId(projectId.toString()), - inS3: { $ne: true }, - } - db.docs.find(query, { limit: maxResults }).toArray(callback) +function getNonArchivedProjectDocIds(projectId, callback) { + db.docs + .find( + { + project_id: ObjectId(projectId), + inS3: { $ne: true }, + }, + { projection: { _id: 1 } } + ) + .map(doc => doc._id) + .toArray(callback) } function getNonDeletedArchivedProjectDocs(projectId, maxResults, callback) { @@ -108,19 +115,44 @@ function patchDoc(projectId, docId, meta, callback) { ) } -function markDocAsArchived(docId, rev, callback) { - const update = { - $set: {}, - $unset: {}, - } - update.$set.inS3 = true - update.$unset.lines = true - update.$unset.ranges = true - const query = { - _id: docId, - rev, - } - db.docs.updateOne(query, update, callback) +/** + * Fetch a doc and lock it for archiving + * + * This will return null if the doc is not found, if it's already archived or + * if the lock can't be acquired. + */ +function getDocForArchiving(projectId, docId, callback) { + const archivingUntil = new Date(Date.now() + ARCHIVING_LOCK_DURATION_MS) + db.docs.findOneAndUpdate( + { + _id: ObjectId(docId), + project_id: ObjectId(projectId), + inS3: { $ne: true }, + $or: [{ archivingUntil: null }, { archivingUntil: { $lt: new Date() } }], + }, + { $set: { archivingUntil } }, + { projection: { lines: 1, ranges: 1, rev: 1 } }, + (err, result) => { + if (err) { + return callback(err) + } + callback(null, result.value) + } + ) +} + +/** + * Clear the doc contents from Mongo and release the archiving lock + */ +function markDocAsArchived(projectId, docId, rev, callback) { + db.docs.updateOne( + { _id: ObjectId(docId), rev }, + { + $set: { inS3: true }, + $unset: { lines: 1, ranges: 1, archivingUntil: 1 }, + }, + callback + ) } /** @@ -267,11 +299,12 @@ module.exports = { getProjectsDeletedDocs, getProjectsDocs, getArchivedProjectDocs, - getNonArchivedProjectDocs, + getNonArchivedProjectDocIds, getNonDeletedArchivedProjectDocs, upsertIntoDocCollection, restoreArchivedDoc, patchDoc, + getDocForArchiving, markDocAsArchived, getDocVersion, setDocVersion, diff --git a/services/docstore/config/settings.defaults.js b/services/docstore/config/settings.defaults.js index 42a2229b5a..d69c9befd5 100644 --- a/services/docstore/config/settings.defaults.js +++ b/services/docstore/config/settings.defaults.js @@ -40,9 +40,10 @@ const Settings = { maxJsonRequestSize: parseInt(process.env.MAX_JSON_REQUEST_SIZE) || 6 * 1024 * 1024, // 6 MB - archiveBatchSize: parseInt(process.env.ARCHIVE_BATCH_SIZE, 10) || 50, unArchiveBatchSize: parseInt(process.env.UN_ARCHIVE_BATCH_SIZE, 10) || 50, parallelArchiveJobs: parseInt(process.env.PARALLEL_ARCHIVE_JOBS, 10) || 5, + archivingLockDurationMs: + parseInt(process.env.ARCHIVING_LOCK_DURATION_MS, 10) || 60000, } if (process.env.MONGO_CONNECTION_STRING) { diff --git a/services/docstore/test/acceptance/js/ArchiveDocsTests.js b/services/docstore/test/acceptance/js/ArchiveDocsTests.js index fe4a85e6f6..f890e713ab 100644 --- a/services/docstore/test/acceptance/js/ArchiveDocsTests.js +++ b/services/docstore/test/acceptance/js/ArchiveDocsTests.js @@ -342,7 +342,7 @@ describe('Archiving', function () { if (error) { return done(error) } - DocstoreClient.archiveDocById( + DocstoreClient.archiveDoc( this.project_id, this.doc._id, (error, res) => { diff --git a/services/docstore/test/acceptance/js/GettingDocsFromArchiveTest.js b/services/docstore/test/acceptance/js/GettingDocsFromArchiveTest.js index 791ebbe096..16ec9fcf48 100644 --- a/services/docstore/test/acceptance/js/GettingDocsFromArchiveTest.js +++ b/services/docstore/test/acceptance/js/GettingDocsFromArchiveTest.js @@ -35,7 +35,7 @@ describe('Getting A Doc from Archive', function () { if (error) { return done(error) } - DocstoreClient.archiveDocById( + DocstoreClient.archiveDoc( this.project_id, this.doc._id, (error, res) => { diff --git a/services/docstore/test/acceptance/js/helpers/DocstoreClient.js b/services/docstore/test/acceptance/js/helpers/DocstoreClient.js index e2c2706fbc..32aadec4b2 100644 --- a/services/docstore/test/acceptance/js/helpers/DocstoreClient.js +++ b/services/docstore/test/acceptance/js/helpers/DocstoreClient.js @@ -163,7 +163,7 @@ module.exports = DocstoreClient = { ) }, - archiveDocById(projectId, docId, callback) { + archiveDoc(projectId, docId, callback) { request.post( { url: `http://localhost:${settings.internal.docstore.port}/project/${projectId}/doc/${docId}/archive`, diff --git a/services/docstore/test/unit/js/DocArchiveManagerTests.js b/services/docstore/test/unit/js/DocArchiveManagerTests.js index 722daa2d6e..e13b99bf2f 100644 --- a/services/docstore/test/unit/js/DocArchiveManagerTests.js +++ b/services/docstore/test/unit/js/DocArchiveManagerTests.js @@ -118,16 +118,27 @@ describe('DocArchiveManager', function () { deleteDirectory: sinon.stub().resolves(), } - const getNonArchivedProjectDocs = sinon.stub() - getNonArchivedProjectDocs + const getNonArchivedProjectDocIds = sinon.stub() + getNonArchivedProjectDocIds .onCall(0) - .resolves(mongoDocs.filter(doc => !doc.inS3)) - getNonArchivedProjectDocs.onCall(1).resolves([]) + .resolves(mongoDocs.filter(doc => !doc.inS3).map(doc => doc._id)) + getNonArchivedProjectDocIds.onCall(1).resolves([]) const getArchivedProjectDocs = sinon.stub() getArchivedProjectDocs.onCall(0).resolves(archivedDocs) getArchivedProjectDocs.onCall(1).resolves([]) + const fakeGetDoc = async (_projectId, _docId) => { + if (_projectId.equals(projectId)) { + for (const mongoDoc of mongoDocs.concat(archivedDocs)) { + if (mongoDoc._id.equals(_docId)) { + return mongoDoc + } + } + } + throw new Errors.NotFoundError() + } + MongoManager = { promises: { markDocAsArchived: sinon.stub().resolves(), @@ -135,17 +146,13 @@ describe('DocArchiveManager', function () { upsertIntoDocCollection: sinon.stub().resolves(), getProjectsDocs: sinon.stub().resolves(mongoDocs), getNonDeletedArchivedProjectDocs: getArchivedProjectDocs, - getNonArchivedProjectDocs, + getNonArchivedProjectDocIds, getArchivedProjectDocs, - findDoc: sinon.stub().rejects(new Errors.NotFoundError()), + findDoc: sinon.stub().callsFake(fakeGetDoc), + getDocForArchiving: sinon.stub().callsFake(fakeGetDoc), destroyProject: sinon.stub().resolves(), }, } - for (const mongoDoc of mongoDocs.concat(archivedDocs)) { - MongoManager.promises.findDoc - .withArgs(projectId, mongoDoc._id, sinon.match.any) - .resolves(mongoDoc) - } DocArchiveManager = SandboxedModule.require(modulePath, { requires: { @@ -163,7 +170,7 @@ describe('DocArchiveManager', function () { describe('archiveDoc', function () { it('should resolve when passed a valid document', async function () { await expect( - DocArchiveManager.promises.archiveDoc(projectId, mongoDocs[0]) + DocArchiveManager.promises.archiveDoc(projectId, mongoDocs[0]._id) ).to.eventually.be.fulfilled }) @@ -172,26 +179,26 @@ describe('DocArchiveManager', function () { doc.lines = null await expect( - DocArchiveManager.promises.archiveDoc(projectId, doc) + DocArchiveManager.promises.archiveDoc(projectId, doc._id) ).to.eventually.be.rejectedWith('doc has no lines') }) it('should add the schema version', async function () { - await DocArchiveManager.promises.archiveDoc(projectId, mongoDocs[1]) + await DocArchiveManager.promises.archiveDoc(projectId, mongoDocs[1]._id) expect(Streamifier.createReadStream).to.have.been.calledWith( sinon.match(/"schema_v":1/) ) }) it('should calculate the hex md5 sum of the content', async function () { - await DocArchiveManager.promises.archiveDoc(projectId, mongoDocs[0]) + await DocArchiveManager.promises.archiveDoc(projectId, mongoDocs[0]._id) expect(Crypto.createHash).to.have.been.calledWith('md5') expect(HashUpdate).to.have.been.calledWith(archivedDocJson) expect(HashDigest).to.have.been.calledWith('hex') }) it('should pass the md5 hash to the object persistor for verification', async function () { - await DocArchiveManager.promises.archiveDoc(projectId, mongoDocs[0]) + await DocArchiveManager.promises.archiveDoc(projectId, mongoDocs[0]._id) expect(PersistorManager.sendStream).to.have.been.calledWith( sinon.match.any, @@ -202,7 +209,7 @@ describe('DocArchiveManager', function () { }) it('should pass the correct bucket and key to the persistor', async function () { - await DocArchiveManager.promises.archiveDoc(projectId, mongoDocs[0]) + await DocArchiveManager.promises.archiveDoc(projectId, mongoDocs[0]._id) expect(PersistorManager.sendStream).to.have.been.calledWith( Settings.docstore.bucket, @@ -211,7 +218,7 @@ describe('DocArchiveManager', function () { }) it('should create a stream from the encoded json and send it', async function () { - await DocArchiveManager.promises.archiveDoc(projectId, mongoDocs[0]) + await DocArchiveManager.promises.archiveDoc(projectId, mongoDocs[0]._id) expect(Streamifier.createReadStream).to.have.been.calledWith( archivedDocJson ) @@ -223,8 +230,9 @@ describe('DocArchiveManager', function () { }) it('should mark the doc as archived', async function () { - await DocArchiveManager.promises.archiveDoc(projectId, mongoDocs[0]) + await DocArchiveManager.promises.archiveDoc(projectId, mongoDocs[0]._id) expect(MongoManager.promises.markDocAsArchived).to.have.been.calledWith( + projectId, mongoDocs[0]._id, mongoDocs[0].rev ) @@ -243,7 +251,7 @@ describe('DocArchiveManager', function () { it('should return an error', async function () { await expect( - DocArchiveManager.promises.archiveDoc(projectId, mongoDocs[0]) + DocArchiveManager.promises.archiveDoc(projectId, mongoDocs[0]._id) ).to.eventually.be.rejectedWith('null bytes detected') }) }) @@ -452,22 +460,25 @@ describe('DocArchiveManager', function () { await DocArchiveManager.promises.archiveAllDocs(projectId) // not inS3 expect(MongoManager.promises.markDocAsArchived).to.have.been.calledWith( + projectId, mongoDocs[0]._id ) expect(MongoManager.promises.markDocAsArchived).to.have.been.calledWith( + projectId, mongoDocs[1]._id ) expect(MongoManager.promises.markDocAsArchived).to.have.been.calledWith( + projectId, mongoDocs[4]._id ) // inS3 expect( MongoManager.promises.markDocAsArchived - ).not.to.have.been.calledWith(mongoDocs[2]._id) + ).not.to.have.been.calledWith(projectId, mongoDocs[2]._id) expect( MongoManager.promises.markDocAsArchived - ).not.to.have.been.calledWith(mongoDocs[3]._id) + ).not.to.have.been.calledWith(projectId, mongoDocs[3]._id) }) }) diff --git a/services/docstore/test/unit/js/DocManagerTests.js b/services/docstore/test/unit/js/DocManagerTests.js index e36289a951..14924cce5e 100644 --- a/services/docstore/test/unit/js/DocManagerTests.js +++ b/services/docstore/test/unit/js/DocManagerTests.js @@ -379,7 +379,7 @@ describe('DocManager', function () { .stub() .yields(null, { _id: ObjectId(this.doc_id) }) this.MongoManager.patchDoc = sinon.stub().yields(null) - this.DocArchiveManager.archiveDocById = sinon.stub().yields(null) + this.DocArchiveManager.archiveDoc = sinon.stub().yields(null) this.meta = {} }) @@ -429,7 +429,7 @@ describe('DocManager', function () { }) it('should not flush the doc out of mongo', function () { - expect(this.DocArchiveManager.archiveDocById).to.not.have.been.called + expect(this.DocArchiveManager.archiveDoc).to.not.have.been.called }) }) @@ -447,7 +447,7 @@ describe('DocManager', function () { }) it('should not flush the doc out of mongo', function () { - expect(this.DocArchiveManager.archiveDocById).to.not.have.been.called + expect(this.DocArchiveManager.archiveDoc).to.not.have.been.called }) }) @@ -459,7 +459,7 @@ describe('DocManager', function () { describe('when the background flush succeeds', function () { beforeEach(function (done) { - this.DocArchiveManager.archiveDocById = sinon.stub().yields(null) + this.DocArchiveManager.archiveDoc = sinon.stub().yields(null) this.callback = sinon.stub().callsFake(done) this.DocManager.patchDoc( this.project_id, @@ -474,18 +474,17 @@ describe('DocManager', function () { }) it('should flush the doc out of mongo', function () { - expect( - this.DocArchiveManager.archiveDocById - ).to.have.been.calledWith(this.project_id, this.doc_id) + expect(this.DocArchiveManager.archiveDoc).to.have.been.calledWith( + this.project_id, + this.doc_id + ) }) }) describe('when the background flush fails', function () { beforeEach(function (done) { this.err = new Error('foo') - this.DocArchiveManager.archiveDocById = sinon - .stub() - .yields(this.err) + this.DocArchiveManager.archiveDoc = sinon.stub().yields(this.err) this.callback = sinon.stub().callsFake(done) this.DocManager.patchDoc( this.project_id, diff --git a/services/docstore/test/unit/js/MongoManagerTests.js b/services/docstore/test/unit/js/MongoManagerTests.js index 1fcf1abe98..95a84bcea8 100644 --- a/services/docstore/test/unit/js/MongoManagerTests.js +++ b/services/docstore/test/unit/js/MongoManagerTests.js @@ -23,7 +23,10 @@ describe('MongoManager', function () { ObjectId, }, '@overleaf/metrics': { timeAsyncMethod: sinon.stub() }, - '@overleaf/settings': { max_deleted_docs: 42 }, + '@overleaf/settings': { + max_deleted_docs: 42, + docstore: { archivingLockDurationMs: 5000 }, + }, './Errors': Errors, }, })