diff --git a/services/history-v1/storage/scripts/back_fill_file_hash.mjs b/services/history-v1/storage/scripts/back_fill_file_hash.mjs index b496a8bbc2..e9eb20f23a 100644 --- a/services/history-v1/storage/scripts/back_fill_file_hash.mjs +++ b/services/history-v1/storage/scripts/back_fill_file_hash.mjs @@ -86,7 +86,7 @@ ObjectId.cacheHexString = true */ /** - * @return {{PROCESS_DELETED_FILES: boolean, LOGGING_IDENTIFIER: string, BATCH_RANGE_START: string, PROCESS_BLOBS: boolean, BATCH_RANGE_END: string, PROCESS_NON_DELETED_PROJECTS: boolean, PROCESS_DELETED_PROJECTS: boolean, COLLECT_BACKED_UP_BLOBS: boolean}} + * @return {{PROCESS_HASHED_FILES: boolean, PROCESS_DELETED_FILES: boolean, LOGGING_IDENTIFIER: string, BATCH_RANGE_START: string, PROCESS_BLOBS: boolean, BATCH_RANGE_END: string, PROCESS_NON_DELETED_PROJECTS: boolean, PROCESS_DELETED_PROJECTS: boolean, COLLECT_BACKED_UP_BLOBS: boolean}} */ function parseArgs() { const PUBLIC_LAUNCH_DATE = new Date('2012-01-01T00:00:00Z') @@ -94,6 +94,7 @@ function parseArgs() { { name: 'processNonDeletedProjects', type: String, defaultValue: 'false' }, { name: 'processDeletedProjects', type: String, defaultValue: 'false' }, { name: 'processDeletedFiles', type: String, defaultValue: 'false' }, + { name: 'processHashedFiles', type: String, defaultValue: 'false' }, { name: 'processBlobs', type: String, defaultValue: 'true' }, { name: 'collectBackedUpBlobs', type: String, defaultValue: 'true' }, { @@ -127,6 +128,7 @@ function parseArgs() { PROCESS_DELETED_PROJECTS: boolVal('processDeletedProjects'), PROCESS_BLOBS: boolVal('processBlobs'), PROCESS_DELETED_FILES: boolVal('processDeletedFiles'), + PROCESS_HASHED_FILES: boolVal('processHashedFiles'), COLLECT_BACKED_UP_BLOBS: boolVal('collectBackedUpBlobs'), BATCH_RANGE_START, BATCH_RANGE_END, @@ -139,6 +141,7 @@ const { PROCESS_DELETED_PROJECTS, PROCESS_BLOBS, PROCESS_DELETED_FILES, + PROCESS_HASHED_FILES, COLLECT_BACKED_UP_BLOBS, BATCH_RANGE_START, BATCH_RANGE_END, @@ -193,6 +196,7 @@ const STATS = { projects: 0, blobs: 0, backedUpBlobs: 0, + filesWithHash: 0, filesWithoutHash: 0, filesDuplicated: 0, filesRetries: 0, @@ -396,6 +400,13 @@ async function processFileOnce(entry, filePath) { await uploadBlobToAWS(entry, blob, filePath) return hash } + if (entry.hash && entry.ctx.hasBackedUpBlob(entry.hash)) { + STATS.deduplicatedWriteToAWSLocalCount++ + const blob = entry.ctx.getCachedHistoryBlob(entry.hash) + // blob might not exist on re-run with --PROCESS_BLOBS=false + if (blob) STATS.deduplicatedWriteToAWSLocalEgress += estimateBlobSize(blob) + return entry.hash + } STATS.readFromGCSCount++ const src = await filestorePersistor.getObjectStream( @@ -415,6 +426,9 @@ async function processFileOnce(entry, filePath) { await getStringLengthOfFile(blob.getByteLength(), filePath) ) const hash = blob.getHash() + if (entry.hash && hash !== entry.hash) { + throw new OError('hash mismatch', { entry, hash }) + } if (GLOBAL_BLOBS.has(hash)) { STATS.globalBlobsCount++ @@ -447,18 +461,22 @@ async function processFileOnce(entry, filePath) { * @return {Promise} */ async function uploadBlobToGCS(blobStore, entry, blob, hash, filePath) { - if (entry.ctx.hasHistoryBlob(hash)) { + if (entry.ctx.getCachedHistoryBlob(hash)) { return // fast-path using hint from pre-fetched blobs } - if (!PROCESS_BLOBS && (await blobStore.getBlob(hash))) { - entry.ctx.recordHistoryBlob(hash) - return // round trip to postgres/mongo when not pre-fetched + if (!PROCESS_BLOBS) { + // round trip to postgres/mongo when not pre-fetched + const blob = await blobStore.getBlob(hash) + if (blob) { + entry.ctx.recordHistoryBlob(blob) + return + } } // blob missing in history-v1, create in GCS and persist in postgres/mongo STATS.writeToGCSCount++ STATS.writeToGCSEgress += blob.getByteLength() await blobStore.putBlob(filePath, blob) - entry.ctx.recordHistoryBlob(hash) + entry.ctx.recordHistoryBlob(blob) } const GZ_SUFFIX = '.gz' @@ -630,8 +648,13 @@ async function processBatch(batch, prefix = 'rootFolder.0') { STATS.projects += batch.length STATS.blobs += nBlobs STATS.backedUpBlobs += nBackedUpBlobs - STATS.filesWithoutHash += files.length - (nBlobs - nBackedUpBlobs) - batch.length = 0 // GC + + // GC + batch.length = 0 + deletedFiles.clear() + blobs.clear() + backedUpBlobs.clear() + // The files are currently ordered by project-id. // Order them by file-id ASC then blobs ASC to // - process files before blobs @@ -679,7 +702,7 @@ async function handleDeletedFileTreeBatch(batch) { * @return {Promise} */ async function tryUpdateFileRefInMongo(entry) { - if (entry.path === '') { + if (entry.path === MONGO_PATH_DELETED_FILE) { return await tryUpdateDeletedFileRefInMongo(entry) } else if (entry.path.startsWith('project.')) { return await tryUpdateFileRefInMongoInDeletedProject(entry) @@ -802,28 +825,45 @@ async function updateFileRefInMongo(entry) { function* findFiles(ctx, folder, path, isInputLoop = false) { let i = 0 for (const child of folder.folders) { - yield* findFiles(ctx, child, `${path}.folders.${i}`, isInputLoop) - i++ + const idx = i++ + yield* findFiles(ctx, child, `${path}.folders.${idx}`, isInputLoop) } i = 0 for (const fileRef of folder.fileRefs) { + const idx = i++ + if (PROCESS_HASHED_FILES && fileRef.hash) { + if (ctx.canSkipProcessingHashedFile(fileRef.hash)) continue + if (isInputLoop) { + ctx.remainingQueueEntries++ + STATS.filesWithHash++ + } + yield { + ctx, + cacheKey: fileRef.hash, + fileId: fileRef._id.toString(), + path: MONGO_PATH_SKIP_WRITE_HASH_TO_FILE_TREE, + hash: fileRef.hash, + } + } if (!fileRef.hash) { - if (isInputLoop) ctx.remainingQueueEntries++ + if (isInputLoop) { + ctx.remainingQueueEntries++ + STATS.filesWithoutHash++ + } yield { ctx, cacheKey: fileRef._id.toString(), fileId: fileRef._id.toString(), - path: `${path}.fileRefs.${i}`, + path: `${path}.fileRefs.${idx}`, } } - i++ } } /** * @param {Array} projects * @param {string} prefix - * @param {Map>} deletedFiles + * @param {Map>} deletedFiles * @param {Map>} blobs * @param {Map>} backedUpBlobs * @return Generator @@ -847,9 +887,24 @@ function* findFileInBatch( projectBlobs, projectBackedUpBlobs ) - for (const fileId of projectDeletedFiles) { - ctx.remainingQueueEntries++ - yield { ctx, cacheKey: fileId, fileId, path: '' } + for (const fileRef of projectDeletedFiles) { + const fileId = fileRef._id.toString() + if (fileRef.hash) { + if (ctx.canSkipProcessingHashedFile(fileRef.hash)) continue + ctx.remainingQueueEntries++ + STATS.filesWithHash++ + yield { + ctx, + cacheKey: fileRef.hash, + fileId, + hash: fileRef.hash, + path: MONGO_PATH_SKIP_WRITE_HASH_TO_FILE_TREE, + } + } else { + ctx.remainingQueueEntries++ + STATS.filesWithoutHash++ + yield { ctx, cacheKey: fileId, fileId, path: MONGO_PATH_DELETED_FILE } + } } for (const blob of projectBlobs) { if (projectBackedUpBlobs.has(blob.getHash())) continue @@ -857,7 +912,7 @@ function* findFileInBatch( yield { ctx, cacheKey: blob.getHash(), - path: 'blob', + path: MONGO_PATH_SKIP_WRITE_HASH_TO_FILE_TREE, blob, hash: blob.getHash(), } @@ -882,7 +937,7 @@ async function collectProjectBlobs(batch) { /** * @param {Array} projects - * @return {Promise>>} + * @return {Promise>>} */ async function collectDeletedFiles(projects) { const deletedFiles = new Map() @@ -891,22 +946,25 @@ async function collectDeletedFiles(projects) { const cursor = deletedFilesCollection.find( { projectId: { $in: projects.map(p => p._id) }, - hash: { $exists: false }, + ...(PROCESS_HASHED_FILES + ? {} + : { + hash: { $exists: false }, + }), }, { - projection: { _id: 1, projectId: 1 }, + projection: { _id: 1, projectId: 1, hash: 1 }, readPreference: READ_PREFERENCE_SECONDARY, sort: { projectId: 1 }, } ) for await (const deletedFileRef of cursor) { const projectId = deletedFileRef.projectId.toString() - const fileId = deletedFileRef._id.toString() const found = deletedFiles.get(projectId) if (found) { - found.push(fileId) + found.push(deletedFileRef) } else { - deletedFiles.set(projectId, [fileId]) + deletedFiles.set(projectId, [deletedFileRef]) } } return deletedFiles @@ -939,6 +997,9 @@ async function collectBackedUpBlobs(projects) { const BATCH_HASH_WRITES = 1_000 const BATCH_FILE_UPDATES = 100 +const MONGO_PATH_DELETED_FILE = 'deleted-file' +const MONGO_PATH_SKIP_WRITE_HASH_TO_FILE_TREE = 'skip-write-to-file-tree' + class ProjectContext { /** @type {Promise | null} */ #cachedPersistorPromise = null @@ -946,7 +1007,7 @@ class ProjectContext { /** @type {Set} */ #backedUpBlobs - /** @type {Set} */ + /** @type {Map} */ #historyBlobs /** @type {number} */ @@ -962,14 +1023,32 @@ class ProjectContext { this.projectId = projectId this.historyId = historyId this.#backedUpBlobs = backedUpBlobs - this.#historyBlobs = new Set(blobs.map(b => b.getHash())) + this.#historyBlobs = new Map(blobs.map(b => [b.getHash(), b])) } - hasHistoryBlob(hash) { - return this.#historyBlobs.has(hash) + /** + * @param {string} hash + * @return {Blob | undefined} + */ + getCachedHistoryBlob(hash) { + return this.#historyBlobs.get(hash) } - recordHistoryBlob(hash) { - this.#historyBlobs.add(hash) + + /** + * @param {Blob} blob + */ + recordHistoryBlob(blob) { + this.#historyBlobs.set(blob.getHash(), blob) + } + + /** + * @param {string} hash + * @return {boolean} + */ + canSkipProcessingHashedFile(hash) { + if (this.#historyBlobs.has(hash)) return true // This file will be processed as blob. + if (GLOBAL_BLOBS.has(hash)) return true // global blob + return false } /** @@ -1105,7 +1184,7 @@ class ProjectContext { * @param {QueueEntry} entry */ queueFileForWritingHash(entry) { - if (entry.path === 'blob') return + if (entry.path === MONGO_PATH_SKIP_WRITE_HASH_TO_FILE_TREE) return this.#pendingFileWrites.push(entry) } @@ -1136,7 +1215,7 @@ class ProjectContext { const projectEntries = [] const deletedProjectEntries = [] for (const entry of this.#pendingFileWrites) { - if (entry.path === '') { + if (entry.path === MONGO_PATH_DELETED_FILE) { individualUpdates.push(entry) } else if (entry.path.startsWith('project.')) { deletedProjectEntries.push(entry) diff --git a/services/history-v1/test/acceptance/js/storage/back_fill_file_hash.test.mjs b/services/history-v1/test/acceptance/js/storage/back_fill_file_hash.test.mjs index 04c949b011..482d6a6efd 100644 --- a/services/history-v1/test/acceptance/js/storage/back_fill_file_hash.test.mjs +++ b/services/history-v1/test/acceptance/js/storage/back_fill_file_hash.test.mjs @@ -120,11 +120,12 @@ describe('back_fill_file_hash script', function () { const fileId5 = objectIdFromTime('2024-02-01T00:05:00Z') const fileId6 = objectIdFromTime('2017-02-01T00:06:00Z') const fileId7 = objectIdFromTime('2017-02-01T00:07:00Z') - const fileIdDeleted1 = objectIdFromTime('2017-02-01T00:07:00Z') - const fileIdDeleted2 = objectIdFromTime('2017-02-01T00:08:00Z') - const fileIdDeleted3 = objectIdFromTime('2017-02-01T00:09:00Z') - const fileIdDeleted4 = objectIdFromTime('2024-02-01T00:10:00Z') - const fileIdDeleted5 = objectIdFromTime('2024-02-01T00:11:00Z') + const fileId8 = objectIdFromTime('2017-02-01T00:08:00Z') + const fileIdDeleted1 = objectIdFromTime('2017-03-01T00:01:00Z') + const fileIdDeleted2 = objectIdFromTime('2017-03-01T00:02:00Z') + const fileIdDeleted3 = objectIdFromTime('2017-03-01T00:03:00Z') + const fileIdDeleted4 = objectIdFromTime('2024-03-01T00:04:00Z') + const fileIdDeleted5 = objectIdFromTime('2024-03-01T00:05:00Z') const contentTextBlob0 = Buffer.from('Hello 0') const hashTextBlob0 = gitBlobHashBuffer(contentTextBlob0) const contentTextBlob1 = Buffer.from('Hello 1') @@ -141,7 +142,7 @@ describe('back_fill_file_hash script', function () { const twoByteUTF8Symbol = 'รถ' const contentFile7 = Buffer.alloc(4_000_000, twoByteUTF8Symbol) const hashFile7 = gitBlobHashBuffer(contentFile7) - const writtenBlobs = [ + const potentiallyWrittenBlobs = [ { projectId: projectId0, historyId: historyId0, fileId: fileId0 }, // { historyId: projectId0, fileId: fileId6 }, // global blob { @@ -172,7 +173,12 @@ describe('back_fill_file_hash script', function () { }, { projectId: projectId1, historyId: historyId1, fileId: fileId1 }, { projectId: projectId1, historyId: historyId1, fileId: fileIdDeleted1 }, - // { historyId: historyId2, fileId: fileId2 }, // already has hash + { + projectId: projectId2, + historyId: historyId2, + fileId: fileId2, + hasHash: true, + }, { projectId: projectId3, historyId: historyId3, fileId: fileId3 }, { projectId: projectIdDeleted0, @@ -185,7 +191,18 @@ describe('back_fill_file_hash script', function () { fileId: fileIdDeleted2, }, // { historyId: historyIdDeleted0, fileId:fileIdDeleted3 }, // fileIdDeleted3 is dupe of fileIdDeleted2 - // { historyId: historyIdDeleted0, fileId: fileIdDeleted4 }, // already has hash + { + projectId: projectIdDeleted0, + historyId: historyIdDeleted0, + fileId: fileIdDeleted4, + hasHash: true, + }, + { + projectId: projectIdDeleted1, + historyId: historyIdDeleted1, + fileId: fileId5, + hasHash: true, + }, { projectId: projectIdBadFileTree, historyId: historyIdBadFileTree, @@ -223,7 +240,11 @@ describe('back_fill_file_hash script', function () { ...fileIds, }) for (const [name, v] of Object.entries(fileIds)) { - console.log(name, gitBlobHash(v)) + console.log( + name, + gitBlobHash(v), + Array.from(binaryForGitBlobHash(gitBlobHash(v)).value()) + ) } } @@ -238,13 +259,19 @@ describe('back_fill_file_hash script', function () { beforeEach('populate mongo', async function () { await globalBlobs.insertMany([ { _id: gitBlobHash(fileId6), byteLength: 24, stringLength: 24 }, + { _id: gitBlobHash(fileId8), byteLength: 24, stringLength: 24 }, ]) await projectsCollection.insertMany([ { _id: projectId0, rootFolder: [ { - fileRefs: [{ _id: fileId0 }, { _id: fileId6 }, { _id: fileId7 }], + fileRefs: [ + { _id: fileId8, hash: gitBlobHash(fileId8) }, + { _id: fileId0 }, + { _id: fileId6 }, + { _id: fileId7 }, + ], folders: [{ fileRefs: [], folders: [] }], }, ], @@ -581,7 +608,14 @@ describe('back_fill_file_hash script', function () { return { stats, result } } - function commonAssertions() { + /** + * @param {boolean} processHashedFiles + */ + function commonAssertions(processHashedFiles = false) { + const writtenBlobs = potentiallyWrittenBlobs.filter(({ hasHash }) => { + if (processHashedFiles) return true // all files processed + return !hasHash // only files without hash processed + }) it('should update mongo', async function () { expect(await projectsCollection.find({}).toArray()).to.deep.equal([ { @@ -589,6 +623,7 @@ describe('back_fill_file_hash script', function () { rootFolder: [ { fileRefs: [ + { _id: fileId8, hash: gitBlobHash(fileId8) }, { _id: fileId0, hash: gitBlobHash(fileId0) }, { _id: fileId6, hash: gitBlobHash(fileId6) }, { _id: fileId7, hash: hashFile7 }, @@ -812,19 +847,39 @@ describe('back_fill_file_hash script', function () { }, { _id: projectId2, - blobs: [binaryForGitBlobHash(hashTextBlob2)].sort(), + blobs: [binaryForGitBlobHash(hashTextBlob2)] + .concat( + processHashedFiles + ? [binaryForGitBlobHash(gitBlobHash(fileId2))] + : [] + ) + .sort(), }, { _id: projectIdDeleted0, blobs: [ binaryForGitBlobHash(gitBlobHash(fileId4)), binaryForGitBlobHash(gitBlobHash(fileIdDeleted2)), - ].sort(), + ] + .concat( + processHashedFiles + ? [binaryForGitBlobHash(gitBlobHash(fileIdDeleted4))] + : [] + ) + .sort(), }, { _id: projectId3, blobs: [binaryForGitBlobHash(gitBlobHash(fileId3))].sort(), }, + ...(processHashedFiles + ? [ + { + _id: projectIdDeleted1, + blobs: [binaryForGitBlobHash(gitBlobHash(fileId5))].sort(), + }, + ] + : []), { _id: projectIdBadFileTree, blobs: [binaryForGitBlobHash(hashTextBlob3)].sort(), @@ -832,15 +887,27 @@ describe('back_fill_file_hash script', function () { ]) }) it('should process nothing on re-run', async function () { - const rerun = await runScript([], {}, false) - expect(rerun.stats).deep.equal({ + const rerun = await runScript( + processHashedFiles ? ['--processHashedFiles=true'] : [], + {}, + false + ) + let stats = { ...STATS_ALL_ZERO, // We still need to iterate over all the projects and blobs. projects: 7, blobs: 12, backedUpBlobs: 12, badFileTrees: 1, - }) + } + if (processHashedFiles) { + stats = sumStats(stats, { + ...STATS_ALL_ZERO, + blobs: 3, + backedUpBlobs: 3, + }) + } + expect(rerun.stats).deep.equal(stats) }) it('should have backed up all the files', async function () { expect(tieringStorageClass).to.exist @@ -910,7 +977,7 @@ describe('back_fill_file_hash script', function () { expect(log).to.contain({ projectId: projectId0.toString(), fileId: fileId0.toString(), - path: 'rootFolder.0.fileRefs.0', + path: 'rootFolder.0.fileRefs.1', msg, }) expect(log.err).to.contain({ @@ -922,6 +989,7 @@ describe('back_fill_file_hash script', function () { projects: 0, blobs: 0, backedUpBlobs: 0, + filesWithHash: 0, filesWithoutHash: 0, filesDuplicated: 0, filesRetries: 0, @@ -949,6 +1017,7 @@ describe('back_fill_file_hash script', function () { projects: 2, blobs: 2, backedUpBlobs: 0, + filesWithHash: 0, filesWithoutHash: 7, filesDuplicated: 1, filesRetries: 0, @@ -976,6 +1045,7 @@ describe('back_fill_file_hash script', function () { projects: 5, blobs: 2, backedUpBlobs: 0, + filesWithHash: 0, filesWithoutHash: 4, filesDuplicated: 0, filesRetries: 0, @@ -995,7 +1065,7 @@ describe('back_fill_file_hash script', function () { readFromGCSCount: 6, readFromGCSIngress: 110, writeToAWSCount: 5, - writeToAWSEgress: 142, + writeToAWSEgress: 143, writeToGCSCount: 3, writeToGCSEgress: 72, } @@ -1127,6 +1197,29 @@ describe('back_fill_file_hash script', function () { commonAssertions() }) + describe('when processing hashed files', function () { + let output + beforeEach('run script', async function () { + output = await runScript(['--processHashedFiles=true'], {}) + }) + it('should print stats', function () { + expect(output.stats).deep.equal( + sumStats(STATS_ALL, { + ...STATS_ALL_ZERO, + filesWithHash: 3, + mongoUpdates: 1, + readFromGCSCount: 3, + readFromGCSIngress: 72, + writeToAWSCount: 3, + writeToAWSEgress: 89, + writeToGCSCount: 3, + writeToGCSEgress: 72, + }) + ) + }) + commonAssertions(true) + }) + describe('with something in the bucket already', function () { beforeEach('create a file in s3', async function () { const buf = Buffer.from(fileId0.toString())