diff --git a/services/history-v1/storage/lib/blob_store/index.js b/services/history-v1/storage/lib/blob_store/index.js index 9c762202a3..c4151637c3 100644 --- a/services/history-v1/storage/lib/blob_store/index.js +++ b/services/history-v1/storage/lib/blob_store/index.js @@ -125,6 +125,34 @@ async function loadGlobalBlobs() { } } +/** + * Return metadata for all blobs in the given project + * @param {Array} projectIds + * @return {Promise<{nBlobs:number, blobs:Map>}>} + */ +async function getProjectBlobsBatch(projectIds) { + const mongoProjects = [] + const postgresProjects = [] + for (const projectId of projectIds) { + if (typeof projectId === 'number') { + postgresProjects.push(projectId) + } else { + mongoProjects.push(projectId) + } + } + const [ + { nBlobs: nBlobsPostgres, blobs: blobsPostgres }, + { nBlobs: nBlobsMongo, blobs: blobsMongo }, + ] = await Promise.all([ + postgresBackend.getProjectBlobsBatch(postgresProjects), + mongoBackend.getProjectBlobsBatch(mongoProjects), + ]) + for (const [id, blobs] of blobsPostgres.entries()) { + blobsMongo.set(id.toString(), blobs) + } + return { nBlobs: nBlobsPostgres + nBlobsMongo, blobs: blobsMongo } +} + /** * @classdesc * Fetch and store the content of files using content-addressable hashing. The @@ -366,6 +394,7 @@ class BlobStore { module.exports = { BlobStore, + getProjectBlobsBatch, loadGlobalBlobs, makeProjectKey, makeBlobForFile, diff --git a/services/history-v1/storage/lib/blob_store/mongo.js b/services/history-v1/storage/lib/blob_store/mongo.js index 276c7f097f..9117382148 100644 --- a/services/history-v1/storage/lib/blob_store/mongo.js +++ b/services/history-v1/storage/lib/blob_store/mongo.js @@ -16,13 +16,17 @@ */ const { Blob } = require('overleaf-editor-core') -const { ObjectId, Binary, MongoError } = require('mongodb') +const { ObjectId, Binary, MongoError, ReadPreference } = require('mongodb') const assert = require('../assert') const mongodb = require('../mongodb') const MAX_BLOBS_IN_BUCKET = 8 const DUPLICATE_KEY_ERROR_CODE = 11000 +/** + * @typedef {import('mongodb').ReadPreferenceLike} ReadPreferenceLike + */ + /** * Set up the data structures for a given project. * @param {string} projectId @@ -246,6 +250,60 @@ async function getProjectBlobs(projectId) { return blobs } +/** + * Return metadata for all blobs in the given project + * @param {Array} projectIds + * @return {Promise<{ nBlobs: number, blobs: Map> }>} + */ +async function getProjectBlobsBatch(projectIds) { + for (const project of projectIds) { + assert.mongoId(project, 'bad projectId') + } + let nBlobs = 0 + const blobs = new Map() + if (projectIds.length === 0) return { nBlobs, blobs } + + // blobs + { + const cursor = await mongodb.blobs.find( + { _id: { $in: projectIds.map(projectId => new ObjectId(projectId)) } }, + { readPreference: ReadPreference.secondaryPreferred } + ) + for await (const record of cursor) { + const projectBlobs = Object.values(record.blobs).flat().map(recordToBlob) + blobs.set(record._id.toString(), projectBlobs) + nBlobs += projectBlobs.length + } + } + + // sharded blobs + { + // @ts-ignore We are using a custom _id here. + const cursor = await mongodb.shardedBlobs.find( + { + _id: { + $gte: makeShardedId(projectIds[0], '0'), + $lte: makeShardedId(projectIds[projectIds.length - 1], 'f'), + }, + }, + { readPreference: ReadPreference.secondaryPreferred } + ) + for await (const record of cursor) { + const recordIdHex = record._id.toString('hex') + const recordProjectId = recordIdHex.slice(0, 24) + const projectBlobs = Object.values(record.blobs).flat().map(recordToBlob) + const found = blobs.get(recordProjectId) + if (found) { + found.push(...projectBlobs) + } else { + blobs.set(recordProjectId, projectBlobs) + } + nBlobs += projectBlobs.length + } + } + return { nBlobs, blobs } +} + /** * Add a blob's metadata to the blobs collection after it has been uploaded. * @param {string} projectId @@ -373,6 +431,7 @@ module.exports = { findBlob, findBlobs, getProjectBlobs, + getProjectBlobsBatch, insertBlob, deleteBlobs, } diff --git a/services/history-v1/storage/lib/blob_store/postgres.js b/services/history-v1/storage/lib/blob_store/postgres.js index fc41644138..715a985b5b 100644 --- a/services/history-v1/storage/lib/blob_store/postgres.js +++ b/services/history-v1/storage/lib/blob_store/postgres.js @@ -70,6 +70,35 @@ async function getProjectBlobs(projectId) { return blobs } +/** + * Return metadata for all blobs in the given project + * @param {Array} projectIds + * @return {Promise<{ nBlobs: number, blobs: Map> }>} + */ +async function getProjectBlobsBatch(projectIds) { + for (const projectId of projectIds) { + assert.integer(projectId, 'bad projectId') + } + let nBlobs = 0 + const blobs = new Map() + if (projectIds.length === 0) return { nBlobs, blobs } + + const records = await knex('project_blobs') + .select('project_id', 'hash_bytes', 'byte_length', 'string_length') + .whereIn('project_id', projectIds) + + for (const record of records) { + const found = blobs.get(record.project_id) + if (found) { + found.push(recordToBlob(record)) + } else { + blobs.set(record.project_id, [recordToBlob(record)]) + } + nBlobs++ + } + return { nBlobs, blobs } +} + /** * Add a blob's metadata to the blobs table after it has been uploaded. */ @@ -126,6 +155,7 @@ module.exports = { findBlob, findBlobs, getProjectBlobs, + getProjectBlobsBatch, insertBlob, deleteBlobs, } diff --git a/services/history-v1/storage/scripts/back_fill_file_hash.mjs b/services/history-v1/storage/scripts/back_fill_file_hash.mjs index 3e919018f4..c1d76a8bcd 100644 --- a/services/history-v1/storage/scripts/back_fill_file_hash.mjs +++ b/services/history-v1/storage/scripts/back_fill_file_hash.mjs @@ -25,11 +25,12 @@ import { BlobStore, GLOBAL_BLOBS, loadGlobalBlobs, + getProjectBlobsBatch, getStringLengthOfFile, makeBlobForFile, makeProjectKey, } from '../lib/blob_store/index.js' -import { backedUpBlobs, db } from '../lib/mongodb.js' +import { backedUpBlobs as backedUpBlobsCollection, db } from '../lib/mongodb.js' import filestorePersistor from '../lib/persistor.js' // Silence warning. @@ -68,10 +69,7 @@ ObjectId.cacheHexString = true * @typedef {Object} Project * @property {ObjectId} _id * @property {Array} rootFolder - * @property {Array} deletedFileIds - * @property {Array} blobs - * @property {{history: {id: string}}} overleaf - * @property {Array} [backedUpBlobs] + * @property {{history: {id: (number|string)}}} overleaf */ /** @@ -499,22 +497,16 @@ async function processFiles(files) { * @return {Promise} */ async function handleLiveTreeBatch(batch, prefix = 'rootFolder.0') { - let nBackedUpBlobs = 0 - if (process.argv.includes('collectBackedUpBlobs')) { - nBackedUpBlobs = await collectBackedUpBlobs(batch) - } - if (process.argv.includes('deletedFiles')) { - await collectDeletedFiles(batch) - } - let blobs = 0 - if (COLLECT_BLOBS) { - blobs = await collectBlobs(batch) - } - const files = Array.from(findFileInBatch(batch, prefix)) + const deletedFiles = await collectDeletedFiles(batch) + const { nBlobs, blobs } = await collectProjectBlobs(batch) + const { nBackedUpBlobs, backedUpBlobs } = await collectBackedUpBlobs(batch) + const files = Array.from( + findFileInBatch(batch, prefix, deletedFiles, blobs, backedUpBlobs) + ) STATS.projects += batch.length - STATS.blobs += blobs + STATS.blobs += nBlobs STATS.backedUpBlobs += nBackedUpBlobs - STATS.filesWithoutHash += files.length - (blobs - nBackedUpBlobs) + STATS.filesWithoutHash += files.length - (nBlobs - nBackedUpBlobs) batch.length = 0 // GC // The files are currently ordered by project-id. // Order them by file-id ASC then blobs ASC to @@ -709,17 +701,36 @@ function* findFiles(ctx, folder, path) { /** * @param {Array} projects * @param {string} prefix + * @param {Map>} deletedFiles + * @param {Map>} blobs + * @param {Map>} backedUpBlobs * @return Generator */ -function* findFileInBatch(projects, prefix) { +function* findFileInBatch( + projects, + prefix, + deletedFiles, + blobs, + backedUpBlobs +) { for (const project of projects) { - const ctx = new ProjectContext(project) + const projectIdS = project._id.toString() + const historyIdS = project.overleaf.history.id.toString() + const projectBlobs = blobs.get(historyIdS) || [] + const projectBackedUpBlobs = new Set(backedUpBlobs.get(projectIdS) || []) + const projectDeletedFiles = deletedFiles.get(projectIdS) || [] + const ctx = new ProjectContext( + project._id, + historyIdS, + projectBlobs, + projectBackedUpBlobs + ) yield* findFiles(ctx, project.rootFolder[0], prefix) - for (const fileId of project.deletedFileIds || []) { + for (const fileId of projectDeletedFiles) { yield { ctx, cacheKey: fileId, fileId, path: '' } } - for (const blob of project.blobs || []) { - if (ctx.hasBackedUpBlob(blob.getHash())) continue + for (const blob of projectBlobs) { + if (projectBackedUpBlobs.has(blob.getHash())) continue yield { ctx, cacheKey: blob.getHash(), @@ -732,25 +743,22 @@ function* findFileInBatch(projects, prefix) { } /** - * @param {Array} projects - * @return {Promise} + * @param {Array} batch + * @return {Promise<{nBlobs: number, blobs: Map>}>} */ -async function collectBlobs(projects) { - let blobs = 0 - for (const project of projects) { - const historyId = project.overleaf.history.id.toString() - const blobStore = new BlobStore(historyId) - project.blobs = await blobStore.getProjectBlobs() - blobs += project.blobs.length - } - return blobs +async function collectProjectBlobs(batch) { + if (!COLLECT_BLOBS) return { nBlobs: 0, blobs: new Map() } + return await getProjectBlobsBatch(batch.map(p => p.overleaf.history.id)) } /** * @param {Array} projects - * @return {Promise} + * @return {Promise>>} */ async function collectDeletedFiles(projects) { + const deletedFiles = new Map() + if (!process.argv.includes('deletedFiles')) return deletedFiles + const cursor = deletedFilesCollection.find( { projectId: { $in: projects.map(p => p._id) }, @@ -762,52 +770,42 @@ async function collectDeletedFiles(projects) { sort: { projectId: 1 }, } ) - const processed = projects.slice() for await (const deletedFileRef of cursor) { - const idx = processed.findIndex( - p => p._id.toString() === deletedFileRef.projectId.toString() - ) - if (idx === -1) { - throw new Error( - `bug: order of deletedFiles mongo records does not match batch of projects (${deletedFileRef.projectId} out of order)` - ) + const projectId = deletedFileRef.projectId.toString() + const fileId = deletedFileRef._id.toString() + const found = deletedFiles.get(projectId) + if (found) { + found.push(fileId) + } else { + deletedFiles.set(projectId, [fileId]) } - processed.splice(0, idx) - const project = processed[0] - project.deletedFileIds = project.deletedFileIds || [] - project.deletedFileIds.push(deletedFileRef._id.toString()) } + return deletedFiles } /** * @param {Array} projects - * @return {Promise} + * @return {Promise<{nBackedUpBlobs:number,backedUpBlobs:Map>}>} */ async function collectBackedUpBlobs(projects) { - const cursor = backedUpBlobs.find( + let nBackedUpBlobs = 0 + const backedUpBlobs = new Map() + if (!process.argv.includes('collectBackedUpBlobs')) { + return { nBackedUpBlobs, backedUpBlobs } + } + const cursor = backedUpBlobsCollection.find( { _id: { $in: projects.map(p => p._id) } }, { readPreference: READ_PREFERENCE_SECONDARY, sort: { _id: 1 }, } ) - let nBackedUpBlobs = 0 - const processed = projects.slice() for await (const record of cursor) { - const idx = processed.findIndex( - p => p._id.toString() === record._id.toString() - ) - if (idx === -1) { - throw new Error( - `bug: order of backedUpBlobs mongo records does not match batch of projects (${record._id} out of order)` - ) - } - processed.splice(0, idx) - const project = processed[0] - project.backedUpBlobs = record.blobs.map(b => b.toString('hex')) - nBackedUpBlobs += record.blobs.length + const blobs = record.blobs.map(b => b.toString('hex')) + backedUpBlobs.set(record._id.toString(), blobs) + nBackedUpBlobs += blobs.length } - return nBackedUpBlobs + return { nBackedUpBlobs, backedUpBlobs } } const BATCH_HASH_WRITES = 1_000 @@ -824,13 +822,16 @@ class ProjectContext { #historyBlobs /** - * @param {Project} project + * @param {ObjectId} projectId + * @param {string} historyId + * @param {Array} blobs + * @param {Set} backedUpBlobs */ - constructor(project) { - this.projectId = project._id - this.historyId = project.overleaf.history.id.toString() - this.#backedUpBlobs = new Set(project.backedUpBlobs || []) - this.#historyBlobs = new Set((project.blobs || []).map(b => b.getHash())) + constructor(projectId, historyId, blobs, backedUpBlobs) { + this.projectId = projectId + this.historyId = historyId + this.#backedUpBlobs = backedUpBlobs + this.#historyBlobs = new Set(blobs.map(b => b.getHash())) } hasHistoryBlob(hash) { @@ -901,7 +902,7 @@ class ProjectContext { ) this.#completedBlobs.clear() STATS.mongoUpdates++ - await backedUpBlobs.updateOne( + await backedUpBlobsCollection.updateOne( { _id: this.projectId }, { $addToSet: { blobs: { $each: blobs } } }, { upsert: true } diff --git a/services/history-v1/test/acceptance/js/storage/blob_store.test.js b/services/history-v1/test/acceptance/js/storage/blob_store.test.js index 3e4a19a204..f61c940356 100644 --- a/services/history-v1/test/acceptance/js/storage/blob_store.test.js +++ b/services/history-v1/test/acceptance/js/storage/blob_store.test.js @@ -22,6 +22,7 @@ const { } = require('../../../../storage') const mongoBackend = require('../../../../storage/lib/blob_store/mongo') const postgresBackend = require('../../../../storage/lib/blob_store/postgres') +const { getProjectBlobsBatch } = require('../../../../storage/lib/blob_store') const mkTmpDir = promisify(temp.mkdir) @@ -327,6 +328,31 @@ describe('BlobStore', function () { 'expected Blob.NotFoundError when calling blobStore.getStream()' ) }) + + if (scenario.backend !== mongoBackend) { + // mongo backend has its own test for this, covering sharding + it('getProjectBlobsBatch() returns blobs per project', async function () { + const projects = [ + parseInt(scenario.projectId, 10), + parseInt(scenario.projectId2, 10), + ] + const { nBlobs, blobs } = + await postgresBackend.getProjectBlobsBatch(projects) + expect(nBlobs).to.equal(2) + expect(Object.fromEntries(blobs.entries())).to.deep.equal({ + [parseInt(scenario.projectId, 10)]: [ + new Blob(helloWorldHash, 11, 11), + ], + [parseInt(scenario.projectId2, 10)]: [ + new Blob( + testFiles.GRAPH_PNG_HASH, + testFiles.GRAPH_PNG_BYTE_LENGTH, + null + ), + ], + }) + }) + } }) describe('a global blob', function () { @@ -454,4 +480,42 @@ describe('BlobStore', function () { }) }) } + + it('getProjectBlobsBatch() with mixed projects', async function () { + for (const scenario of scenarios) { + const blobStore = new BlobStore(scenario.projectId) + const blobStore2 = new BlobStore(scenario.projectId2) + await blobStore.initialize() + await blobStore.putString(helloWorldString) + await blobStore2.initialize() + await blobStore2.putFile(testFiles.path('graph.png')) + } + + const projects = [ + parseInt(scenarios[0].projectId, 10), + scenarios[1].projectId, + parseInt(scenarios[0].projectId2, 10), + scenarios[1].projectId2, + ] + const { nBlobs, blobs } = await getProjectBlobsBatch(projects) + expect(nBlobs).to.equal(4) + expect(Object.fromEntries(blobs.entries())).to.deep.equal({ + [scenarios[0].projectId]: [new Blob(helloWorldHash, 11, 11)], + [scenarios[1].projectId]: [new Blob(helloWorldHash, 11, 11)], + [scenarios[0].projectId2]: [ + new Blob( + testFiles.GRAPH_PNG_HASH, + testFiles.GRAPH_PNG_BYTE_LENGTH, + null + ), + ], + [scenarios[1].projectId2]: [ + new Blob( + testFiles.GRAPH_PNG_HASH, + testFiles.GRAPH_PNG_BYTE_LENGTH, + null + ), + ], + }) + }) }) diff --git a/services/history-v1/test/acceptance/js/storage/blob_store_mongo.test.js b/services/history-v1/test/acceptance/js/storage/blob_store_mongo.test.js index d2695681ab..319827856e 100644 --- a/services/history-v1/test/acceptance/js/storage/blob_store_mongo.test.js +++ b/services/history-v1/test/acceptance/js/storage/blob_store_mongo.test.js @@ -22,6 +22,7 @@ describe('BlobStore Mongo backend', function () { 'abcdaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa', ], 1234: ['1234000000000000000000000000000000000000'], + 1337: ['1337000000000000000000000000000000000000'], } beforeEach('clean up', cleanup.everything) @@ -77,6 +78,42 @@ describe('BlobStore Mongo backend', function () { }) }) + describe('getProjectBlobsBatch', function () { + it('finds all the blobs', async function () { + const projectId0 = new ObjectId().toString() + const hashesProject0 = hashes[1234].concat(hashes.abcd) + const projectId1 = new ObjectId().toString() + const hashesProject1 = hashes[1337].concat(hashes.abcd) + const projectId2 = new ObjectId().toString() + const hashesProject2 = [] // no hashes + const projectId3 = new ObjectId().toString() + const hashesProject3 = hashes[1337] + const projectBlobs = { + [projectId0]: hashesProject0, + [projectId1]: hashesProject1, + [projectId2]: hashesProject2, + [projectId3]: hashesProject3, + } + for (const [projectId, hashes] of Object.entries(projectBlobs)) { + for (const hash of hashes) { + const blob = new Blob(hash, 123, 99) + await mongoBackend.insertBlob(projectId, blob) + } + } + const projects = [projectId0, projectId1, projectId2, projectId3] + const { nBlobs, blobs } = + await mongoBackend.getProjectBlobsBatch(projects) + expect(nBlobs).to.equal( + hashesProject0.length + hashesProject1.length + hashesProject3.length + ) + expect(Object.fromEntries(blobs.entries())).to.deep.equal({ + [projectId0]: hashesProject0.map(hash => new Blob(hash, 123, 99)), + [projectId1]: hashesProject1.map(hash => new Blob(hash, 123, 99)), + [projectId3]: hashesProject3.map(hash => new Blob(hash, 123, 99)), + }) + }) + }) + describe('with existing blobs', function () { beforeEach(async function () { for (const hash of hashes.abcd.concat(hashes[1234])) {