Merge pull request #22123 from overleaf/jpa-process-hashed

[history-v1] back_fill_file_hash: optionally process hashed files

GitOrigin-RevId: 53ebaa7b03166a6b1aacc0f985bb295c9ec04804
This commit is contained in:
Jakob Ackermann
2024-11-25 17:41:53 +01:00
committed by Copybot
parent 06d87f4590
commit b7d37b434a
2 changed files with 223 additions and 51 deletions

View File

@@ -86,7 +86,7 @@ ObjectId.cacheHexString = true
*/
/**
* @return {{PROCESS_DELETED_FILES: boolean, LOGGING_IDENTIFIER: string, BATCH_RANGE_START: string, PROCESS_BLOBS: boolean, BATCH_RANGE_END: string, PROCESS_NON_DELETED_PROJECTS: boolean, PROCESS_DELETED_PROJECTS: boolean, COLLECT_BACKED_UP_BLOBS: boolean}}
* @return {{PROCESS_HASHED_FILES: boolean, PROCESS_DELETED_FILES: boolean, LOGGING_IDENTIFIER: string, BATCH_RANGE_START: string, PROCESS_BLOBS: boolean, BATCH_RANGE_END: string, PROCESS_NON_DELETED_PROJECTS: boolean, PROCESS_DELETED_PROJECTS: boolean, COLLECT_BACKED_UP_BLOBS: boolean}}
*/
function parseArgs() {
const PUBLIC_LAUNCH_DATE = new Date('2012-01-01T00:00:00Z')
@@ -94,6 +94,7 @@ function parseArgs() {
{ name: 'processNonDeletedProjects', type: String, defaultValue: 'false' },
{ name: 'processDeletedProjects', type: String, defaultValue: 'false' },
{ name: 'processDeletedFiles', type: String, defaultValue: 'false' },
{ name: 'processHashedFiles', type: String, defaultValue: 'false' },
{ name: 'processBlobs', type: String, defaultValue: 'true' },
{ name: 'collectBackedUpBlobs', type: String, defaultValue: 'true' },
{
@@ -127,6 +128,7 @@ function parseArgs() {
PROCESS_DELETED_PROJECTS: boolVal('processDeletedProjects'),
PROCESS_BLOBS: boolVal('processBlobs'),
PROCESS_DELETED_FILES: boolVal('processDeletedFiles'),
PROCESS_HASHED_FILES: boolVal('processHashedFiles'),
COLLECT_BACKED_UP_BLOBS: boolVal('collectBackedUpBlobs'),
BATCH_RANGE_START,
BATCH_RANGE_END,
@@ -139,6 +141,7 @@ const {
PROCESS_DELETED_PROJECTS,
PROCESS_BLOBS,
PROCESS_DELETED_FILES,
PROCESS_HASHED_FILES,
COLLECT_BACKED_UP_BLOBS,
BATCH_RANGE_START,
BATCH_RANGE_END,
@@ -193,6 +196,7 @@ const STATS = {
projects: 0,
blobs: 0,
backedUpBlobs: 0,
filesWithHash: 0,
filesWithoutHash: 0,
filesDuplicated: 0,
filesRetries: 0,
@@ -396,6 +400,13 @@ async function processFileOnce(entry, filePath) {
await uploadBlobToAWS(entry, blob, filePath)
return hash
}
if (entry.hash && entry.ctx.hasBackedUpBlob(entry.hash)) {
STATS.deduplicatedWriteToAWSLocalCount++
const blob = entry.ctx.getCachedHistoryBlob(entry.hash)
// blob might not exist on re-run with --PROCESS_BLOBS=false
if (blob) STATS.deduplicatedWriteToAWSLocalEgress += estimateBlobSize(blob)
return entry.hash
}
STATS.readFromGCSCount++
const src = await filestorePersistor.getObjectStream(
@@ -415,6 +426,9 @@ async function processFileOnce(entry, filePath) {
await getStringLengthOfFile(blob.getByteLength(), filePath)
)
const hash = blob.getHash()
if (entry.hash && hash !== entry.hash) {
throw new OError('hash mismatch', { entry, hash })
}
if (GLOBAL_BLOBS.has(hash)) {
STATS.globalBlobsCount++
@@ -447,18 +461,22 @@ async function processFileOnce(entry, filePath) {
* @return {Promise<void>}
*/
async function uploadBlobToGCS(blobStore, entry, blob, hash, filePath) {
if (entry.ctx.hasHistoryBlob(hash)) {
if (entry.ctx.getCachedHistoryBlob(hash)) {
return // fast-path using hint from pre-fetched blobs
}
if (!PROCESS_BLOBS && (await blobStore.getBlob(hash))) {
entry.ctx.recordHistoryBlob(hash)
return // round trip to postgres/mongo when not pre-fetched
if (!PROCESS_BLOBS) {
// round trip to postgres/mongo when not pre-fetched
const blob = await blobStore.getBlob(hash)
if (blob) {
entry.ctx.recordHistoryBlob(blob)
return
}
}
// blob missing in history-v1, create in GCS and persist in postgres/mongo
STATS.writeToGCSCount++
STATS.writeToGCSEgress += blob.getByteLength()
await blobStore.putBlob(filePath, blob)
entry.ctx.recordHistoryBlob(hash)
entry.ctx.recordHistoryBlob(blob)
}
const GZ_SUFFIX = '.gz'
@@ -630,8 +648,13 @@ async function processBatch(batch, prefix = 'rootFolder.0') {
STATS.projects += batch.length
STATS.blobs += nBlobs
STATS.backedUpBlobs += nBackedUpBlobs
STATS.filesWithoutHash += files.length - (nBlobs - nBackedUpBlobs)
batch.length = 0 // GC
// GC
batch.length = 0
deletedFiles.clear()
blobs.clear()
backedUpBlobs.clear()
// The files are currently ordered by project-id.
// Order them by file-id ASC then blobs ASC to
// - process files before blobs
@@ -679,7 +702,7 @@ async function handleDeletedFileTreeBatch(batch) {
* @return {Promise<boolean>}
*/
async function tryUpdateFileRefInMongo(entry) {
if (entry.path === '') {
if (entry.path === MONGO_PATH_DELETED_FILE) {
return await tryUpdateDeletedFileRefInMongo(entry)
} else if (entry.path.startsWith('project.')) {
return await tryUpdateFileRefInMongoInDeletedProject(entry)
@@ -802,28 +825,45 @@ async function updateFileRefInMongo(entry) {
function* findFiles(ctx, folder, path, isInputLoop = false) {
let i = 0
for (const child of folder.folders) {
yield* findFiles(ctx, child, `${path}.folders.${i}`, isInputLoop)
i++
const idx = i++
yield* findFiles(ctx, child, `${path}.folders.${idx}`, isInputLoop)
}
i = 0
for (const fileRef of folder.fileRefs) {
const idx = i++
if (PROCESS_HASHED_FILES && fileRef.hash) {
if (ctx.canSkipProcessingHashedFile(fileRef.hash)) continue
if (isInputLoop) {
ctx.remainingQueueEntries++
STATS.filesWithHash++
}
yield {
ctx,
cacheKey: fileRef.hash,
fileId: fileRef._id.toString(),
path: MONGO_PATH_SKIP_WRITE_HASH_TO_FILE_TREE,
hash: fileRef.hash,
}
}
if (!fileRef.hash) {
if (isInputLoop) ctx.remainingQueueEntries++
if (isInputLoop) {
ctx.remainingQueueEntries++
STATS.filesWithoutHash++
}
yield {
ctx,
cacheKey: fileRef._id.toString(),
fileId: fileRef._id.toString(),
path: `${path}.fileRefs.${i}`,
path: `${path}.fileRefs.${idx}`,
}
}
i++
}
}
/**
* @param {Array<Project>} projects
* @param {string} prefix
* @param {Map<string,Array<string>>} deletedFiles
* @param {Map<string,Array<DeletedFileRef>>} deletedFiles
* @param {Map<string,Array<Blob>>} blobs
* @param {Map<string,Array<string>>} backedUpBlobs
* @return Generator<QueueEntry>
@@ -847,9 +887,24 @@ function* findFileInBatch(
projectBlobs,
projectBackedUpBlobs
)
for (const fileId of projectDeletedFiles) {
ctx.remainingQueueEntries++
yield { ctx, cacheKey: fileId, fileId, path: '' }
for (const fileRef of projectDeletedFiles) {
const fileId = fileRef._id.toString()
if (fileRef.hash) {
if (ctx.canSkipProcessingHashedFile(fileRef.hash)) continue
ctx.remainingQueueEntries++
STATS.filesWithHash++
yield {
ctx,
cacheKey: fileRef.hash,
fileId,
hash: fileRef.hash,
path: MONGO_PATH_SKIP_WRITE_HASH_TO_FILE_TREE,
}
} else {
ctx.remainingQueueEntries++
STATS.filesWithoutHash++
yield { ctx, cacheKey: fileId, fileId, path: MONGO_PATH_DELETED_FILE }
}
}
for (const blob of projectBlobs) {
if (projectBackedUpBlobs.has(blob.getHash())) continue
@@ -857,7 +912,7 @@ function* findFileInBatch(
yield {
ctx,
cacheKey: blob.getHash(),
path: 'blob',
path: MONGO_PATH_SKIP_WRITE_HASH_TO_FILE_TREE,
blob,
hash: blob.getHash(),
}
@@ -882,7 +937,7 @@ async function collectProjectBlobs(batch) {
/**
* @param {Array<Project>} projects
* @return {Promise<Map<string, Array<string>>>}
* @return {Promise<Map<string, Array<DeletedFileRef>>>}
*/
async function collectDeletedFiles(projects) {
const deletedFiles = new Map()
@@ -891,22 +946,25 @@ async function collectDeletedFiles(projects) {
const cursor = deletedFilesCollection.find(
{
projectId: { $in: projects.map(p => p._id) },
hash: { $exists: false },
...(PROCESS_HASHED_FILES
? {}
: {
hash: { $exists: false },
}),
},
{
projection: { _id: 1, projectId: 1 },
projection: { _id: 1, projectId: 1, hash: 1 },
readPreference: READ_PREFERENCE_SECONDARY,
sort: { projectId: 1 },
}
)
for await (const deletedFileRef of cursor) {
const projectId = deletedFileRef.projectId.toString()
const fileId = deletedFileRef._id.toString()
const found = deletedFiles.get(projectId)
if (found) {
found.push(fileId)
found.push(deletedFileRef)
} else {
deletedFiles.set(projectId, [fileId])
deletedFiles.set(projectId, [deletedFileRef])
}
}
return deletedFiles
@@ -939,6 +997,9 @@ async function collectBackedUpBlobs(projects) {
const BATCH_HASH_WRITES = 1_000
const BATCH_FILE_UPDATES = 100
const MONGO_PATH_DELETED_FILE = 'deleted-file'
const MONGO_PATH_SKIP_WRITE_HASH_TO_FILE_TREE = 'skip-write-to-file-tree'
class ProjectContext {
/** @type {Promise<CachedPerProjectEncryptedS3Persistor> | null} */
#cachedPersistorPromise = null
@@ -946,7 +1007,7 @@ class ProjectContext {
/** @type {Set<string>} */
#backedUpBlobs
/** @type {Set<string>} */
/** @type {Map<string, Blob>} */
#historyBlobs
/** @type {number} */
@@ -962,14 +1023,32 @@ class ProjectContext {
this.projectId = projectId
this.historyId = historyId
this.#backedUpBlobs = backedUpBlobs
this.#historyBlobs = new Set(blobs.map(b => b.getHash()))
this.#historyBlobs = new Map(blobs.map(b => [b.getHash(), b]))
}
hasHistoryBlob(hash) {
return this.#historyBlobs.has(hash)
/**
* @param {string} hash
* @return {Blob | undefined}
*/
getCachedHistoryBlob(hash) {
return this.#historyBlobs.get(hash)
}
recordHistoryBlob(hash) {
this.#historyBlobs.add(hash)
/**
* @param {Blob} blob
*/
recordHistoryBlob(blob) {
this.#historyBlobs.set(blob.getHash(), blob)
}
/**
* @param {string} hash
* @return {boolean}
*/
canSkipProcessingHashedFile(hash) {
if (this.#historyBlobs.has(hash)) return true // This file will be processed as blob.
if (GLOBAL_BLOBS.has(hash)) return true // global blob
return false
}
/**
@@ -1105,7 +1184,7 @@ class ProjectContext {
* @param {QueueEntry} entry
*/
queueFileForWritingHash(entry) {
if (entry.path === 'blob') return
if (entry.path === MONGO_PATH_SKIP_WRITE_HASH_TO_FILE_TREE) return
this.#pendingFileWrites.push(entry)
}
@@ -1136,7 +1215,7 @@ class ProjectContext {
const projectEntries = []
const deletedProjectEntries = []
for (const entry of this.#pendingFileWrites) {
if (entry.path === '') {
if (entry.path === MONGO_PATH_DELETED_FILE) {
individualUpdates.push(entry)
} else if (entry.path.startsWith('project.')) {
deletedProjectEntries.push(entry)

View File

@@ -120,11 +120,12 @@ describe('back_fill_file_hash script', function () {
const fileId5 = objectIdFromTime('2024-02-01T00:05:00Z')
const fileId6 = objectIdFromTime('2017-02-01T00:06:00Z')
const fileId7 = objectIdFromTime('2017-02-01T00:07:00Z')
const fileIdDeleted1 = objectIdFromTime('2017-02-01T00:07:00Z')
const fileIdDeleted2 = objectIdFromTime('2017-02-01T00:08:00Z')
const fileIdDeleted3 = objectIdFromTime('2017-02-01T00:09:00Z')
const fileIdDeleted4 = objectIdFromTime('2024-02-01T00:10:00Z')
const fileIdDeleted5 = objectIdFromTime('2024-02-01T00:11:00Z')
const fileId8 = objectIdFromTime('2017-02-01T00:08:00Z')
const fileIdDeleted1 = objectIdFromTime('2017-03-01T00:01:00Z')
const fileIdDeleted2 = objectIdFromTime('2017-03-01T00:02:00Z')
const fileIdDeleted3 = objectIdFromTime('2017-03-01T00:03:00Z')
const fileIdDeleted4 = objectIdFromTime('2024-03-01T00:04:00Z')
const fileIdDeleted5 = objectIdFromTime('2024-03-01T00:05:00Z')
const contentTextBlob0 = Buffer.from('Hello 0')
const hashTextBlob0 = gitBlobHashBuffer(contentTextBlob0)
const contentTextBlob1 = Buffer.from('Hello 1')
@@ -141,7 +142,7 @@ describe('back_fill_file_hash script', function () {
const twoByteUTF8Symbol = 'ö'
const contentFile7 = Buffer.alloc(4_000_000, twoByteUTF8Symbol)
const hashFile7 = gitBlobHashBuffer(contentFile7)
const writtenBlobs = [
const potentiallyWrittenBlobs = [
{ projectId: projectId0, historyId: historyId0, fileId: fileId0 },
// { historyId: projectId0, fileId: fileId6 }, // global blob
{
@@ -172,7 +173,12 @@ describe('back_fill_file_hash script', function () {
},
{ projectId: projectId1, historyId: historyId1, fileId: fileId1 },
{ projectId: projectId1, historyId: historyId1, fileId: fileIdDeleted1 },
// { historyId: historyId2, fileId: fileId2 }, // already has hash
{
projectId: projectId2,
historyId: historyId2,
fileId: fileId2,
hasHash: true,
},
{ projectId: projectId3, historyId: historyId3, fileId: fileId3 },
{
projectId: projectIdDeleted0,
@@ -185,7 +191,18 @@ describe('back_fill_file_hash script', function () {
fileId: fileIdDeleted2,
},
// { historyId: historyIdDeleted0, fileId:fileIdDeleted3 }, // fileIdDeleted3 is dupe of fileIdDeleted2
// { historyId: historyIdDeleted0, fileId: fileIdDeleted4 }, // already has hash
{
projectId: projectIdDeleted0,
historyId: historyIdDeleted0,
fileId: fileIdDeleted4,
hasHash: true,
},
{
projectId: projectIdDeleted1,
historyId: historyIdDeleted1,
fileId: fileId5,
hasHash: true,
},
{
projectId: projectIdBadFileTree,
historyId: historyIdBadFileTree,
@@ -223,7 +240,11 @@ describe('back_fill_file_hash script', function () {
...fileIds,
})
for (const [name, v] of Object.entries(fileIds)) {
console.log(name, gitBlobHash(v))
console.log(
name,
gitBlobHash(v),
Array.from(binaryForGitBlobHash(gitBlobHash(v)).value())
)
}
}
@@ -238,13 +259,19 @@ describe('back_fill_file_hash script', function () {
beforeEach('populate mongo', async function () {
await globalBlobs.insertMany([
{ _id: gitBlobHash(fileId6), byteLength: 24, stringLength: 24 },
{ _id: gitBlobHash(fileId8), byteLength: 24, stringLength: 24 },
])
await projectsCollection.insertMany([
{
_id: projectId0,
rootFolder: [
{
fileRefs: [{ _id: fileId0 }, { _id: fileId6 }, { _id: fileId7 }],
fileRefs: [
{ _id: fileId8, hash: gitBlobHash(fileId8) },
{ _id: fileId0 },
{ _id: fileId6 },
{ _id: fileId7 },
],
folders: [{ fileRefs: [], folders: [] }],
},
],
@@ -581,7 +608,14 @@ describe('back_fill_file_hash script', function () {
return { stats, result }
}
function commonAssertions() {
/**
* @param {boolean} processHashedFiles
*/
function commonAssertions(processHashedFiles = false) {
const writtenBlobs = potentiallyWrittenBlobs.filter(({ hasHash }) => {
if (processHashedFiles) return true // all files processed
return !hasHash // only files without hash processed
})
it('should update mongo', async function () {
expect(await projectsCollection.find({}).toArray()).to.deep.equal([
{
@@ -589,6 +623,7 @@ describe('back_fill_file_hash script', function () {
rootFolder: [
{
fileRefs: [
{ _id: fileId8, hash: gitBlobHash(fileId8) },
{ _id: fileId0, hash: gitBlobHash(fileId0) },
{ _id: fileId6, hash: gitBlobHash(fileId6) },
{ _id: fileId7, hash: hashFile7 },
@@ -812,19 +847,39 @@ describe('back_fill_file_hash script', function () {
},
{
_id: projectId2,
blobs: [binaryForGitBlobHash(hashTextBlob2)].sort(),
blobs: [binaryForGitBlobHash(hashTextBlob2)]
.concat(
processHashedFiles
? [binaryForGitBlobHash(gitBlobHash(fileId2))]
: []
)
.sort(),
},
{
_id: projectIdDeleted0,
blobs: [
binaryForGitBlobHash(gitBlobHash(fileId4)),
binaryForGitBlobHash(gitBlobHash(fileIdDeleted2)),
].sort(),
]
.concat(
processHashedFiles
? [binaryForGitBlobHash(gitBlobHash(fileIdDeleted4))]
: []
)
.sort(),
},
{
_id: projectId3,
blobs: [binaryForGitBlobHash(gitBlobHash(fileId3))].sort(),
},
...(processHashedFiles
? [
{
_id: projectIdDeleted1,
blobs: [binaryForGitBlobHash(gitBlobHash(fileId5))].sort(),
},
]
: []),
{
_id: projectIdBadFileTree,
blobs: [binaryForGitBlobHash(hashTextBlob3)].sort(),
@@ -832,15 +887,27 @@ describe('back_fill_file_hash script', function () {
])
})
it('should process nothing on re-run', async function () {
const rerun = await runScript([], {}, false)
expect(rerun.stats).deep.equal({
const rerun = await runScript(
processHashedFiles ? ['--processHashedFiles=true'] : [],
{},
false
)
let stats = {
...STATS_ALL_ZERO,
// We still need to iterate over all the projects and blobs.
projects: 7,
blobs: 12,
backedUpBlobs: 12,
badFileTrees: 1,
})
}
if (processHashedFiles) {
stats = sumStats(stats, {
...STATS_ALL_ZERO,
blobs: 3,
backedUpBlobs: 3,
})
}
expect(rerun.stats).deep.equal(stats)
})
it('should have backed up all the files', async function () {
expect(tieringStorageClass).to.exist
@@ -910,7 +977,7 @@ describe('back_fill_file_hash script', function () {
expect(log).to.contain({
projectId: projectId0.toString(),
fileId: fileId0.toString(),
path: 'rootFolder.0.fileRefs.0',
path: 'rootFolder.0.fileRefs.1',
msg,
})
expect(log.err).to.contain({
@@ -922,6 +989,7 @@ describe('back_fill_file_hash script', function () {
projects: 0,
blobs: 0,
backedUpBlobs: 0,
filesWithHash: 0,
filesWithoutHash: 0,
filesDuplicated: 0,
filesRetries: 0,
@@ -949,6 +1017,7 @@ describe('back_fill_file_hash script', function () {
projects: 2,
blobs: 2,
backedUpBlobs: 0,
filesWithHash: 0,
filesWithoutHash: 7,
filesDuplicated: 1,
filesRetries: 0,
@@ -976,6 +1045,7 @@ describe('back_fill_file_hash script', function () {
projects: 5,
blobs: 2,
backedUpBlobs: 0,
filesWithHash: 0,
filesWithoutHash: 4,
filesDuplicated: 0,
filesRetries: 0,
@@ -995,7 +1065,7 @@ describe('back_fill_file_hash script', function () {
readFromGCSCount: 6,
readFromGCSIngress: 110,
writeToAWSCount: 5,
writeToAWSEgress: 142,
writeToAWSEgress: 143,
writeToGCSCount: 3,
writeToGCSEgress: 72,
}
@@ -1127,6 +1197,29 @@ describe('back_fill_file_hash script', function () {
commonAssertions()
})
describe('when processing hashed files', function () {
let output
beforeEach('run script', async function () {
output = await runScript(['--processHashedFiles=true'], {})
})
it('should print stats', function () {
expect(output.stats).deep.equal(
sumStats(STATS_ALL, {
...STATS_ALL_ZERO,
filesWithHash: 3,
mongoUpdates: 1,
readFromGCSCount: 3,
readFromGCSIngress: 72,
writeToAWSCount: 3,
writeToAWSEgress: 89,
writeToGCSCount: 3,
writeToGCSEgress: 72,
})
)
})
commonAssertions(true)
})
describe('with something in the bucket already', function () {
beforeEach('create a file in s3', async function () {
const buf = Buffer.from(fileId0.toString())