mirror of
https://github.com/yu-i-i/overleaf-cep.git
synced 2026-05-23 17:19:37 +02:00
962 lines
32 KiB
Diff
962 lines
32 KiB
Diff
|
|
|
|
diff --git a/services/history-v1/storage/scripts/back_fill_file_hash.mjs b/services/history-v1/storage/scripts/back_fill_file_hash.mjs
|
|
index 5a590e347a9..3be1c8a5407 100644
|
|
--- a/services/history-v1/storage/scripts/back_fill_file_hash.mjs
|
|
+++ b/services/history-v1/storage/scripts/back_fill_file_hash.mjs
|
|
@@ -1,28 +1,20 @@
|
|
// @ts-check
|
|
-import Crypto from 'node:crypto'
|
|
import Events from 'node:events'
|
|
import fs from 'node:fs'
|
|
import Path from 'node:path'
|
|
import { performance } from 'node:perf_hooks'
|
|
import Stream from 'node:stream'
|
|
-import zLib from 'node:zlib'
|
|
import { setTimeout } from 'node:timers/promises'
|
|
-import { Binary, ObjectId } from 'mongodb'
|
|
+import { ObjectId } from 'mongodb'
|
|
import pLimit from 'p-limit'
|
|
import logger from '@overleaf/logger'
|
|
import {
|
|
batchedUpdate,
|
|
objectIdFromInput,
|
|
renderObjectId,
|
|
- READ_PREFERENCE_SECONDARY,
|
|
} from '@overleaf/mongo-utils/batchedUpdate.js'
|
|
import OError from '@overleaf/o-error'
|
|
-import {
|
|
- AlreadyWrittenError,
|
|
- NoKEKMatchedError,
|
|
- NotFoundError,
|
|
-} from '@overleaf/object-persistor/src/Errors.js'
|
|
-import { backupPersistor, projectBlobsBucket } from '../lib/backupPersistor.mjs'
|
|
+import { NotFoundError } from '@overleaf/object-persistor/src/Errors.js'
|
|
import {
|
|
BlobStore,
|
|
GLOBAL_BLOBS,
|
|
@@ -30,9 +22,8 @@ import {
|
|
getProjectBlobsBatch,
|
|
getStringLengthOfFile,
|
|
makeBlobForFile,
|
|
- makeProjectKey,
|
|
} from '../lib/blob_store/index.js'
|
|
-import { backedUpBlobs as backedUpBlobsCollection, db } from '../lib/mongodb.js'
|
|
+import { db } from '../lib/mongodb.js'
|
|
import commandLineArgs from 'command-line-args'
|
|
import readline from 'node:readline'
|
|
|
|
@@ -88,7 +79,7 @@ ObjectId.cacheHexString = true
|
|
*/
|
|
|
|
/**
|
|
- * @return {{PROJECT_IDS_FROM: string, PROCESS_HASHED_FILES: boolean, LOGGING_IDENTIFIER: string, BATCH_RANGE_START: string, PROCESS_BLOBS: boolean, BATCH_RANGE_END: string, PROCESS_NON_DELETED_PROJECTS: boolean, PROCESS_DELETED_PROJECTS: boolean, COLLECT_BACKED_UP_BLOBS: boolean}}
|
|
+ * @return {{PROJECT_IDS_FROM: string, PROCESS_HASHED_FILES: boolean, LOGGING_IDENTIFIER: string, BATCH_RANGE_START: string, PROCESS_BLOBS: boolean, BATCH_RANGE_END: string, PROCESS_NON_DELETED_PROJECTS: boolean, PROCESS_DELETED_PROJECTS: boolean}}
|
|
*/
|
|
function parseArgs() {
|
|
const PUBLIC_LAUNCH_DATE = new Date('2012-01-01T00:00:00Z')
|
|
@@ -98,7 +89,6 @@ function parseArgs() {
|
|
{ name: 'processHashedFiles', type: String, defaultValue: 'false' },
|
|
{ name: 'processBlobs', type: String, defaultValue: 'true' },
|
|
{ name: 'projectIdsFrom', type: String, defaultValue: '' },
|
|
- { name: 'collectBackedUpBlobs', type: String, defaultValue: 'true' },
|
|
{
|
|
name: 'BATCH_RANGE_START',
|
|
type: String,
|
|
@@ -130,7 +120,6 @@ function parseArgs() {
|
|
PROCESS_DELETED_PROJECTS: boolVal('processDeletedProjects'),
|
|
PROCESS_BLOBS: boolVal('processBlobs'),
|
|
PROCESS_HASHED_FILES: boolVal('processHashedFiles'),
|
|
- COLLECT_BACKED_UP_BLOBS: boolVal('collectBackedUpBlobs'),
|
|
BATCH_RANGE_START,
|
|
BATCH_RANGE_END,
|
|
LOGGING_IDENTIFIER: args['LOGGING_IDENTIFIER'] || BATCH_RANGE_START,
|
|
@@ -143,7 +132,6 @@ const {
|
|
PROCESS_DELETED_PROJECTS,
|
|
PROCESS_BLOBS,
|
|
PROCESS_HASHED_FILES,
|
|
- COLLECT_BACKED_UP_BLOBS,
|
|
BATCH_RANGE_START,
|
|
BATCH_RANGE_END,
|
|
LOGGING_IDENTIFIER,
|
|
@@ -232,7 +220,6 @@ async function processConcurrently(array, fn) {
|
|
const STATS = {
|
|
projects: 0,
|
|
blobs: 0,
|
|
- backedUpBlobs: 0,
|
|
filesWithHash: 0,
|
|
filesWithoutHash: 0,
|
|
filesDuplicated: 0,
|
|
@@ -246,14 +233,8 @@ const STATS = {
|
|
projectHardDeleted: 0,
|
|
fileHardDeleted: 0,
|
|
mongoUpdates: 0,
|
|
- deduplicatedWriteToAWSLocalCount: 0,
|
|
- deduplicatedWriteToAWSLocalEgress: 0,
|
|
- deduplicatedWriteToAWSRemoteCount: 0,
|
|
- deduplicatedWriteToAWSRemoteEgress: 0,
|
|
readFromGCSCount: 0,
|
|
readFromGCSIngress: 0,
|
|
- writeToAWSCount: 0,
|
|
- writeToAWSEgress: 0,
|
|
writeToGCSCount: 0,
|
|
writeToGCSEgress: 0,
|
|
}
|
|
@@ -275,7 +256,7 @@ function toMiBPerSecond(v, ms) {
|
|
/**
|
|
* @param {any} stats
|
|
* @param {number} ms
|
|
- * @return {{writeToAWSThroughputMiBPerSecond: number, readFromGCSThroughputMiBPerSecond: number}}
|
|
+ * @return {{readFromGCSThroughputMiBPerSecond: number}}
|
|
*/
|
|
function bandwidthStats(stats, ms) {
|
|
return {
|
|
@@ -283,10 +264,6 @@ function bandwidthStats(stats, ms) {
|
|
stats.readFromGCSIngress,
|
|
ms
|
|
),
|
|
- writeToAWSThroughputMiBPerSecond: toMiBPerSecond(
|
|
- stats.writeToAWSEgress,
|
|
- ms
|
|
- ),
|
|
}
|
|
}
|
|
|
|
@@ -382,9 +359,6 @@ async function processFile(entry, filePath) {
|
|
throw err // disable retries for not found in filestore bucket case
|
|
}
|
|
}
|
|
- if (err instanceof NoKEKMatchedError) {
|
|
- throw err // disable retries when upload to S3 will fail again
|
|
- }
|
|
STATS.filesRetries++
|
|
const {
|
|
ctx: { projectId },
|
|
@@ -417,32 +391,8 @@ async function processFileOnce(entry, filePath) {
|
|
if (entry.blob) {
|
|
const { blob } = entry
|
|
const hash = blob.getHash()
|
|
- if (entry.ctx.hasBackedUpBlob(hash)) {
|
|
- STATS.deduplicatedWriteToAWSLocalCount++
|
|
- STATS.deduplicatedWriteToAWSLocalEgress += estimateBlobSize(blob)
|
|
- return hash
|
|
- }
|
|
- entry.ctx.recordPendingBlob(hash)
|
|
- STATS.readFromGCSCount++
|
|
- const src = await blobStore.getStream(hash)
|
|
- const dst = fs.createWriteStream(filePath, {
|
|
- highWaterMark: STREAM_HIGH_WATER_MARK,
|
|
- })
|
|
- try {
|
|
- await Stream.promises.pipeline(src, dst)
|
|
- } finally {
|
|
- STATS.readFromGCSIngress += dst.bytesWritten
|
|
- }
|
|
- await uploadBlobToAWS(entry, blob, filePath)
|
|
return hash
|
|
}
|
|
- if (entry.hash && entry.ctx.hasBackedUpBlob(entry.hash)) {
|
|
- STATS.deduplicatedWriteToAWSLocalCount++
|
|
- const blob = entry.ctx.getCachedHistoryBlob(entry.hash)
|
|
- // blob might not exist on re-run with --PROCESS_BLOBS=false
|
|
- if (blob) STATS.deduplicatedWriteToAWSLocalEgress += estimateBlobSize(blob)
|
|
- return entry.hash
|
|
- }
|
|
|
|
STATS.readFromGCSCount++
|
|
// make a fetch request to filestore itself
|
|
@@ -469,16 +419,14 @@ async function processFileOnce(entry, filePath) {
|
|
STATS.globalBlobsEgress += estimateBlobSize(blob)
|
|
return hash
|
|
}
|
|
- if (entry.ctx.hasBackedUpBlob(hash)) {
|
|
- STATS.deduplicatedWriteToAWSLocalCount++
|
|
- STATS.deduplicatedWriteToAWSLocalEgress += estimateBlobSize(blob)
|
|
+ if (entry.ctx.hasCompletedBlob(hash)) {
|
|
return hash
|
|
}
|
|
entry.ctx.recordPendingBlob(hash)
|
|
|
|
try {
|
|
await uploadBlobToGCS(blobStore, entry, blob, hash, filePath)
|
|
- await uploadBlobToAWS(entry, blob, filePath)
|
|
+ entry.ctx.recordCompletedBlob(hash) // mark upload as completed
|
|
} catch (err) {
|
|
entry.ctx.recordFailedBlob(hash)
|
|
throw err
|
|
@@ -515,76 +463,6 @@ async function uploadBlobToGCS(blobStore, entry, blob, hash, filePath) {
|
|
|
|
const GZ_SUFFIX = '.gz'
|
|
|
|
-/**
|
|
- * @param {QueueEntry} entry
|
|
- * @param {Blob} blob
|
|
- * @param {string} filePath
|
|
- * @return {Promise<void>}
|
|
- */
|
|
-async function uploadBlobToAWS(entry, blob, filePath) {
|
|
- const { historyId } = entry.ctx
|
|
- let backupSource
|
|
- let contentEncoding
|
|
- const md5 = Crypto.createHash('md5')
|
|
- let size
|
|
- if (blob.getStringLength()) {
|
|
- const filePathCompressed = filePath + GZ_SUFFIX
|
|
- backupSource = filePathCompressed
|
|
- contentEncoding = 'gzip'
|
|
- size = 0
|
|
- await Stream.promises.pipeline(
|
|
- fs.createReadStream(filePath, { highWaterMark: STREAM_HIGH_WATER_MARK }),
|
|
- zLib.createGzip(),
|
|
- async function* (source) {
|
|
- for await (const chunk of source) {
|
|
- size += chunk.byteLength
|
|
- md5.update(chunk)
|
|
- yield chunk
|
|
- }
|
|
- },
|
|
- fs.createWriteStream(filePathCompressed, {
|
|
- highWaterMark: STREAM_HIGH_WATER_MARK,
|
|
- })
|
|
- )
|
|
- } else {
|
|
- backupSource = filePath
|
|
- size = blob.getByteLength()
|
|
- await Stream.promises.pipeline(
|
|
- fs.createReadStream(filePath, { highWaterMark: STREAM_HIGH_WATER_MARK }),
|
|
- md5
|
|
- )
|
|
- }
|
|
- const backendKeyPath = makeProjectKey(historyId, blob.getHash())
|
|
- const persistor = await entry.ctx.getCachedPersistor(backendKeyPath)
|
|
- try {
|
|
- STATS.writeToAWSCount++
|
|
- await persistor.sendStream(
|
|
- projectBlobsBucket,
|
|
- backendKeyPath,
|
|
- fs.createReadStream(backupSource, {
|
|
- highWaterMark: STREAM_HIGH_WATER_MARK,
|
|
- }),
|
|
- {
|
|
- contentEncoding,
|
|
- contentType: 'application/octet-stream',
|
|
- contentLength: size,
|
|
- sourceMd5: md5.digest('hex'),
|
|
- ifNoneMatch: '*', // de-duplicate write (we pay for the request, but avoid egress)
|
|
- }
|
|
- )
|
|
- STATS.writeToAWSEgress += size
|
|
- } catch (err) {
|
|
- if (err instanceof AlreadyWrittenError) {
|
|
- STATS.deduplicatedWriteToAWSRemoteCount++
|
|
- STATS.deduplicatedWriteToAWSRemoteEgress += size
|
|
- } else {
|
|
- STATS.writeToAWSEgress += size
|
|
- throw err
|
|
- }
|
|
- }
|
|
- entry.ctx.recordBackedUpBlob(blob.getHash())
|
|
-}
|
|
-
|
|
/**
|
|
* @param {Array<QueueEntry>} files
|
|
* @return {Promise<void>}
|
|
@@ -670,23 +548,18 @@ async function queueNextBatch(batch, prefix = 'rootFolder.0') {
|
|
* @return {Promise<void>}
|
|
*/
|
|
async function processBatch(batch, prefix = 'rootFolder.0') {
|
|
- const [{ nBlobs, blobs }, { nBackedUpBlobs, backedUpBlobs }] =
|
|
- await Promise.all([collectProjectBlobs(batch), collectBackedUpBlobs(batch)])
|
|
- const files = Array.from(findFileInBatch(batch, prefix, blobs, backedUpBlobs))
|
|
+ const { nBlobs, blobs } = await collectProjectBlobs(batch)
|
|
+ const files = Array.from(findFileInBatch(batch, prefix, blobs))
|
|
STATS.projects += batch.length
|
|
STATS.blobs += nBlobs
|
|
- STATS.backedUpBlobs += nBackedUpBlobs
|
|
|
|
// GC
|
|
batch.length = 0
|
|
blobs.clear()
|
|
- backedUpBlobs.clear()
|
|
|
|
// The files are currently ordered by project-id.
|
|
// Order them by file-id ASC then blobs ASC to
|
|
// - process files before blobs
|
|
- // - avoid head-of-line blocking from many project-files waiting on the generation of the projects DEK (round trip to AWS)
|
|
- // - bonus: increase chance of de-duplicating write to AWS
|
|
files.sort(
|
|
/**
|
|
* @param {QueueEntry} a
|
|
@@ -903,23 +776,15 @@ function* findFiles(ctx, folder, path, isInputLoop = false) {
|
|
* @param {Array<Project>} projects
|
|
* @param {string} prefix
|
|
* @param {Map<string,Array<Blob>>} blobs
|
|
- * @param {Map<string,Array<string>>} backedUpBlobs
|
|
* @return Generator<QueueEntry>
|
|
*/
|
|
-function* findFileInBatch(projects, prefix, blobs, backedUpBlobs) {
|
|
+function* findFileInBatch(projects, prefix, blobs) {
|
|
for (const project of projects) {
|
|
const projectIdS = project._id.toString()
|
|
const historyIdS = project.overleaf.history.id.toString()
|
|
const projectBlobs = blobs.get(historyIdS) || []
|
|
- const projectBackedUpBlobs = new Set(backedUpBlobs.get(projectIdS) || [])
|
|
- const ctx = new ProjectContext(
|
|
- project._id,
|
|
- historyIdS,
|
|
- projectBlobs,
|
|
- projectBackedUpBlobs
|
|
- )
|
|
+ const ctx = new ProjectContext(project._id, historyIdS, projectBlobs)
|
|
for (const blob of projectBlobs) {
|
|
- if (projectBackedUpBlobs.has(blob.getHash())) continue
|
|
ctx.remainingQueueEntries++
|
|
yield {
|
|
ctx,
|
|
@@ -951,42 +816,11 @@ async function collectProjectBlobs(batch) {
|
|
return await getProjectBlobsBatch(batch.map(p => p.overleaf.history.id))
|
|
}
|
|
|
|
-/**
|
|
- * @param {Array<Project>} projects
|
|
- * @return {Promise<{nBackedUpBlobs:number,backedUpBlobs:Map<string,Array<string>>}>}
|
|
- */
|
|
-async function collectBackedUpBlobs(projects) {
|
|
- let nBackedUpBlobs = 0
|
|
- const backedUpBlobs = new Map()
|
|
- if (!COLLECT_BACKED_UP_BLOBS) return { nBackedUpBlobs, backedUpBlobs }
|
|
-
|
|
- const cursor = backedUpBlobsCollection.find(
|
|
- { _id: { $in: projects.map(p => p._id) } },
|
|
- {
|
|
- readPreference: READ_PREFERENCE_SECONDARY,
|
|
- sort: { _id: 1 },
|
|
- }
|
|
- )
|
|
- for await (const record of cursor) {
|
|
- const blobs = record.blobs.map(b => b.toString('hex'))
|
|
- backedUpBlobs.set(record._id.toString(), blobs)
|
|
- nBackedUpBlobs += blobs.length
|
|
- }
|
|
- return { nBackedUpBlobs, backedUpBlobs }
|
|
-}
|
|
-
|
|
-const BATCH_HASH_WRITES = 1_000
|
|
const BATCH_FILE_UPDATES = 100
|
|
|
|
const MONGO_PATH_SKIP_WRITE_HASH_TO_FILE_TREE = 'skip-write-to-file-tree'
|
|
|
|
class ProjectContext {
|
|
- /** @type {Promise<CachedPerProjectEncryptedS3Persistor> | null} */
|
|
- #cachedPersistorPromise = null
|
|
-
|
|
- /** @type {Set<string>} */
|
|
- #backedUpBlobs
|
|
-
|
|
/** @type {Map<string, Blob>} */
|
|
#historyBlobs
|
|
|
|
@@ -1000,12 +834,10 @@ class ProjectContext {
|
|
* @param {ObjectId} projectId
|
|
* @param {string} historyId
|
|
* @param {Array<Blob>} blobs
|
|
- * @param {Set<string>} backedUpBlobs
|
|
*/
|
|
- constructor(projectId, historyId, blobs, backedUpBlobs) {
|
|
+ constructor(projectId, historyId, blobs) {
|
|
this.projectId = projectId
|
|
this.historyId = historyId
|
|
- this.#backedUpBlobs = backedUpBlobs
|
|
this.#historyBlobs = new Map(blobs.map(b => [b.getHash(), b]))
|
|
}
|
|
|
|
@@ -1034,75 +866,17 @@ class ProjectContext {
|
|
return false
|
|
}
|
|
|
|
- /**
|
|
- * @param {string} key
|
|
- * @return {Promise<CachedPerProjectEncryptedS3Persistor>}
|
|
- */
|
|
- getCachedPersistor(key) {
|
|
- if (!this.#cachedPersistorPromise) {
|
|
- // Fetch DEK once, but only if needed -- upon the first use
|
|
- this.#cachedPersistorPromise = this.#getCachedPersistorWithRetries(key)
|
|
- }
|
|
- return this.#cachedPersistorPromise
|
|
- }
|
|
-
|
|
- /**
|
|
- * @param {string} key
|
|
- * @return {Promise<CachedPerProjectEncryptedS3Persistor>}
|
|
- */
|
|
- async #getCachedPersistorWithRetries(key) {
|
|
- // Optimization: Skip GET on DEK in case no blobs are marked as backed up yet.
|
|
- let tryGenerateDEKFirst = this.#backedUpBlobs.size === 0
|
|
- for (let attempt = 0; attempt < RETRIES; attempt++) {
|
|
- try {
|
|
- if (tryGenerateDEKFirst) {
|
|
- try {
|
|
- return await backupPersistor.generateDataEncryptionKey(
|
|
- projectBlobsBucket,
|
|
- key
|
|
- )
|
|
- } catch (err) {
|
|
- if (err instanceof AlreadyWrittenError) {
|
|
- tryGenerateDEKFirst = false
|
|
- // fall back to GET below
|
|
- } else {
|
|
- throw err
|
|
- }
|
|
- }
|
|
- }
|
|
- return await backupPersistor.forProject(projectBlobsBucket, key)
|
|
- } catch (err) {
|
|
- if (gracefulShutdownInitiated) throw err
|
|
- if (err instanceof NoKEKMatchedError) {
|
|
- throw err
|
|
- } else {
|
|
- logger.warn(
|
|
- { err, projectId: this.projectId, attempt },
|
|
- 'failed to get DEK, trying again'
|
|
- )
|
|
- const jitter = Math.random() * RETRY_DELAY_MS
|
|
- await setTimeout(RETRY_DELAY_MS + jitter)
|
|
- }
|
|
- }
|
|
- }
|
|
- return await backupPersistor.forProject(projectBlobsBucket, key)
|
|
- }
|
|
-
|
|
async flushMongoQueuesIfNeeded() {
|
|
if (this.remainingQueueEntries === 0) {
|
|
await this.flushMongoQueues()
|
|
}
|
|
|
|
- if (this.#completedBlobs.size > BATCH_HASH_WRITES) {
|
|
- await this.#storeBackedUpBlobs()
|
|
- }
|
|
if (this.#pendingFileWrites.length > BATCH_FILE_UPDATES) {
|
|
await this.#storeFileHashes()
|
|
}
|
|
}
|
|
|
|
async flushMongoQueues() {
|
|
- await this.#storeBackedUpBlobs()
|
|
await this.#storeFileHashes()
|
|
}
|
|
|
|
@@ -1111,20 +885,6 @@ class ProjectContext {
|
|
/** @type {Set<string>} */
|
|
#completedBlobs = new Set()
|
|
|
|
- async #storeBackedUpBlobs() {
|
|
- if (this.#completedBlobs.size === 0) return
|
|
- const blobs = Array.from(this.#completedBlobs).map(
|
|
- hash => new Binary(Buffer.from(hash, 'hex'))
|
|
- )
|
|
- this.#completedBlobs.clear()
|
|
- STATS.mongoUpdates++
|
|
- await backedUpBlobsCollection.updateOne(
|
|
- { _id: this.projectId },
|
|
- { $addToSet: { blobs: { $each: blobs } } },
|
|
- { upsert: true }
|
|
- )
|
|
- }
|
|
-
|
|
/**
|
|
* @param {string} hash
|
|
*/
|
|
@@ -1142,8 +902,7 @@ class ProjectContext {
|
|
/**
|
|
* @param {string} hash
|
|
*/
|
|
- recordBackedUpBlob(hash) {
|
|
- this.#backedUpBlobs.add(hash)
|
|
+ recordCompletedBlob(hash) {
|
|
this.#completedBlobs.add(hash)
|
|
this.#pendingBlobs.delete(hash)
|
|
}
|
|
@@ -1152,12 +911,8 @@ class ProjectContext {
|
|
* @param {string} hash
|
|
* @return {boolean}
|
|
*/
|
|
- hasBackedUpBlob(hash) {
|
|
- return (
|
|
- this.#pendingBlobs.has(hash) ||
|
|
- this.#completedBlobs.has(hash) ||
|
|
- this.#backedUpBlobs.has(hash)
|
|
- )
|
|
+ hasCompletedBlob(hash) {
|
|
+ return this.#pendingBlobs.has(hash) || this.#completedBlobs.has(hash)
|
|
}
|
|
|
|
/** @type {Array<QueueEntry>} */
|
|
diff --git a/services/history-v1/test/acceptance/js/storage/back_fill_file_hash.test.mjs b/services/history-v1/test/acceptance/js/storage/back_fill_file_hash.test.mjs
|
|
index 8f861d39345..62b0b1de25f 100644
|
|
--- a/services/history-v1/test/acceptance/js/storage/back_fill_file_hash.test.mjs
|
|
+++ b/services/history-v1/test/acceptance/js/storage/back_fill_file_hash.test.mjs
|
|
@@ -4,23 +4,17 @@ import Stream from 'node:stream'
|
|
import { setTimeout } from 'node:timers/promises'
|
|
import { promisify } from 'node:util'
|
|
import { ObjectId, Binary } from 'mongodb'
|
|
-import {
|
|
- db,
|
|
- backedUpBlobs,
|
|
- globalBlobs,
|
|
-} from '../../../../storage/lib/mongodb.js'
|
|
+import { db, globalBlobs } from '../../../../storage/lib/mongodb.js'
|
|
import cleanup from './support/cleanup.js'
|
|
import testProjects from '../api/support/test_projects.js'
|
|
import { execFile } from 'node:child_process'
|
|
import chai, { expect } from 'chai'
|
|
import chaiExclude from 'chai-exclude'
|
|
-import config from 'config'
|
|
import { WritableBuffer } from '@overleaf/stream-utils'
|
|
import {
|
|
backupPersistor,
|
|
projectBlobsBucket,
|
|
} from '../../../../storage/lib/backupPersistor.mjs'
|
|
-import projectKey from '../../../../storage/lib/project_key.js'
|
|
import {
|
|
BlobStore,
|
|
makeProjectKey,
|
|
@@ -31,9 +25,6 @@ import express from 'express'
|
|
chai.use(chaiExclude)
|
|
const TIMEOUT = 20 * 1_000
|
|
|
|
-const { deksBucket } = config.get('backupStore')
|
|
-const { tieringStorageClass } = config.get('backupPersistor')
|
|
-
|
|
const projectsCollection = db.collection('projects')
|
|
const deletedProjectsCollection = db.collection('deletedProjects')
|
|
|
|
@@ -117,17 +108,6 @@ function binaryForGitBlobHash(gitBlobHash) {
|
|
return new Binary(Buffer.from(gitBlobHash, 'hex'))
|
|
}
|
|
|
|
-async function listS3Bucket(bucket, wantStorageClass) {
|
|
- const client = backupPersistor._getClientForBucket(bucket)
|
|
- const response = await client.listObjectsV2({ Bucket: bucket }).promise()
|
|
-
|
|
- for (const object of response.Contents || []) {
|
|
- expect(object).to.have.property('StorageClass', wantStorageClass)
|
|
- }
|
|
-
|
|
- return (response.Contents || []).map(item => item.Key || '')
|
|
-}
|
|
-
|
|
function objectIdFromTime(timestamp) {
|
|
return ObjectId.createFromTime(new Date(timestamp).getTime() / 1000)
|
|
}
|
|
@@ -591,11 +571,7 @@ describe('back_fill_file_hash script', function () {
|
|
expect((await fs.promises.readdir('/tmp')).join(';')).to.not.match(
|
|
/back_fill_file_hash/
|
|
)
|
|
- const extraStatsKeys = [
|
|
- 'eventLoop',
|
|
- 'readFromGCSThroughputMiBPerSecond',
|
|
- 'writeToAWSThroughputMiBPerSecond',
|
|
- ]
|
|
+ const extraStatsKeys = ['eventLoop', 'readFromGCSThroughputMiBPerSecond']
|
|
const stats = JSON.parse(
|
|
result.stderr
|
|
.split('\n')
|
|
@@ -610,7 +586,6 @@ describe('back_fill_file_hash script', function () {
|
|
delete stats.time
|
|
if (shouldHaveWritten) {
|
|
expect(stats.readFromGCSThroughputMiBPerSecond).to.be.greaterThan(0)
|
|
- expect(stats.writeToAWSThroughputMiBPerSecond).to.be.greaterThan(0)
|
|
}
|
|
for (const key of extraStatsKeys) {
|
|
delete stats[key]
|
|
@@ -856,109 +831,6 @@ describe('back_fill_file_hash script', function () {
|
|
},
|
|
},
|
|
])
|
|
- expect(
|
|
- (await backedUpBlobs.find({}, { sort: { _id: 1 } }).toArray()).map(
|
|
- entry => {
|
|
- // blobs are pushed unordered into mongo. Sort the list for consistency.
|
|
- entry.blobs.sort()
|
|
- return entry
|
|
- }
|
|
- )
|
|
- ).to.deep.equal([
|
|
- {
|
|
- _id: projectId0,
|
|
- blobs: [
|
|
- binaryForGitBlobHash(gitBlobHash(fileId0)),
|
|
- binaryForGitBlobHash(hashFile7),
|
|
- binaryForGitBlobHash(hashTextBlob0),
|
|
- ].sort(),
|
|
- },
|
|
- {
|
|
- _id: projectId1,
|
|
- blobs: [
|
|
- binaryForGitBlobHash(gitBlobHash(fileId1)),
|
|
- binaryForGitBlobHash(hashTextBlob1),
|
|
- ].sort(),
|
|
- },
|
|
- {
|
|
- _id: projectId2,
|
|
- blobs: [binaryForGitBlobHash(hashTextBlob2)]
|
|
- .concat(
|
|
- processHashedFiles
|
|
- ? [binaryForGitBlobHash(gitBlobHash(fileId2))]
|
|
- : []
|
|
- )
|
|
- .sort(),
|
|
- },
|
|
- {
|
|
- _id: projectIdDeleted0,
|
|
- blobs: [binaryForGitBlobHash(gitBlobHash(fileId4))].sort(),
|
|
- },
|
|
- {
|
|
- _id: projectId3,
|
|
- blobs: [binaryForGitBlobHash(gitBlobHash(fileId3))].sort(),
|
|
- },
|
|
- ...(processHashedFiles
|
|
- ? [
|
|
- {
|
|
- _id: projectIdDeleted1,
|
|
- blobs: [binaryForGitBlobHash(gitBlobHash(fileId5))].sort(),
|
|
- },
|
|
- ]
|
|
- : []),
|
|
- {
|
|
- _id: projectIdBadFileTree0,
|
|
- blobs: [binaryForGitBlobHash(hashTextBlob3)].sort(),
|
|
- },
|
|
- {
|
|
- _id: projectIdBadFileTree3,
|
|
- blobs: [binaryForGitBlobHash(gitBlobHash(fileId9))].sort(),
|
|
- },
|
|
- ])
|
|
- })
|
|
- it('should have backed up all the files', async function () {
|
|
- expect(tieringStorageClass).to.exist
|
|
- const blobs = await listS3Bucket(projectBlobsBucket, tieringStorageClass)
|
|
- expect(blobs.sort()).to.deep.equal(
|
|
- Array.from(
|
|
- new Set(
|
|
- writtenBlobs
|
|
- .map(({ historyId, fileId, hash }) =>
|
|
- makeProjectKey(historyId, hash || gitBlobHash(fileId))
|
|
- )
|
|
- .sort()
|
|
- )
|
|
- )
|
|
- )
|
|
- for (let { historyId, fileId, hash, content } of writtenBlobs) {
|
|
- hash = hash || gitBlobHash(fileId.toString())
|
|
- const s = await backupPersistor.getObjectStream(
|
|
- projectBlobsBucket,
|
|
- makeProjectKey(historyId, hash),
|
|
- { autoGunzip: true }
|
|
- )
|
|
- const buf = new WritableBuffer()
|
|
- await Stream.promises.pipeline(s, buf)
|
|
- expect(gitBlobHashBuffer(buf.getContents())).to.equal(hash)
|
|
- if (content) {
|
|
- expect(buf.getContents()).to.deep.equal(content)
|
|
- } else {
|
|
- const id = buf.getContents().toString('utf-8')
|
|
- expect(id).to.equal(fileId.toString())
|
|
- // double check we are not comparing 'undefined' or '[object Object]' above
|
|
- expect(id).to.match(/^[a-f0-9]{24}$/)
|
|
- }
|
|
- }
|
|
- const deks = await listS3Bucket(deksBucket, 'STANDARD')
|
|
- expect(deks.sort()).to.deep.equal(
|
|
- Array.from(
|
|
- new Set(
|
|
- writtenBlobs.map(
|
|
- ({ historyId }) => projectKey.format(historyId) + '/dek'
|
|
- )
|
|
- )
|
|
- ).sort()
|
|
- )
|
|
})
|
|
it('should have written the back filled files to history v1', async function () {
|
|
for (const { historyId, hash, fileId, content } of writtenBlobs) {
|
|
@@ -991,14 +863,13 @@ describe('back_fill_file_hash script', function () {
|
|
// We still need to iterate over all the projects and blobs.
|
|
projects: 10,
|
|
blobs: 10,
|
|
- backedUpBlobs: 10,
|
|
+
|
|
badFileTrees: 4,
|
|
}
|
|
if (processHashedFiles) {
|
|
stats = sumStats(stats, {
|
|
...STATS_ALL_ZERO,
|
|
blobs: 2,
|
|
- backedUpBlobs: 2,
|
|
})
|
|
}
|
|
expect(rerun.stats).deep.equal(stats)
|
|
@@ -1024,7 +895,6 @@ describe('back_fill_file_hash script', function () {
|
|
const STATS_ALL_ZERO = {
|
|
projects: 0,
|
|
blobs: 0,
|
|
- backedUpBlobs: 0,
|
|
filesWithHash: 0,
|
|
filesWithoutHash: 0,
|
|
filesDuplicated: 0,
|
|
@@ -1038,21 +908,14 @@ describe('back_fill_file_hash script', function () {
|
|
fileHardDeleted: 0,
|
|
badFileTrees: 0,
|
|
mongoUpdates: 0,
|
|
- deduplicatedWriteToAWSLocalCount: 0,
|
|
- deduplicatedWriteToAWSLocalEgress: 0,
|
|
- deduplicatedWriteToAWSRemoteCount: 0,
|
|
- deduplicatedWriteToAWSRemoteEgress: 0,
|
|
readFromGCSCount: 0,
|
|
readFromGCSIngress: 0,
|
|
- writeToAWSCount: 0,
|
|
- writeToAWSEgress: 0,
|
|
writeToGCSCount: 0,
|
|
writeToGCSEgress: 0,
|
|
}
|
|
const STATS_UP_TO_PROJECT1 = {
|
|
projects: 2,
|
|
blobs: 2,
|
|
- backedUpBlobs: 0,
|
|
filesWithHash: 0,
|
|
filesWithoutHash: 5,
|
|
filesDuplicated: 1,
|
|
@@ -1065,22 +928,15 @@ describe('back_fill_file_hash script', function () {
|
|
projectHardDeleted: 0,
|
|
fileHardDeleted: 0,
|
|
badFileTrees: 0,
|
|
- mongoUpdates: 4,
|
|
- deduplicatedWriteToAWSLocalCount: 0,
|
|
- deduplicatedWriteToAWSLocalEgress: 0,
|
|
- deduplicatedWriteToAWSRemoteCount: 0,
|
|
- deduplicatedWriteToAWSRemoteEgress: 0,
|
|
- readFromGCSCount: 6,
|
|
- readFromGCSIngress: 4000086,
|
|
- writeToAWSCount: 5,
|
|
- writeToAWSEgress: 4026,
|
|
+ mongoUpdates: 2, // 4-2 blobs written to backedUpBlobs collection
|
|
+ readFromGCSCount: 4,
|
|
+ readFromGCSIngress: 4000072,
|
|
writeToGCSCount: 3,
|
|
writeToGCSEgress: 4000048,
|
|
}
|
|
const STATS_UP_FROM_PROJECT1_ONWARD = {
|
|
projects: 8,
|
|
blobs: 2,
|
|
- backedUpBlobs: 0,
|
|
filesWithHash: 0,
|
|
filesWithoutHash: 4,
|
|
filesDuplicated: 0,
|
|
@@ -1093,26 +949,18 @@ describe('back_fill_file_hash script', function () {
|
|
projectHardDeleted: 0,
|
|
fileHardDeleted: 0,
|
|
badFileTrees: 4,
|
|
- mongoUpdates: 8,
|
|
- deduplicatedWriteToAWSLocalCount: 1,
|
|
- deduplicatedWriteToAWSLocalEgress: 30,
|
|
- deduplicatedWriteToAWSRemoteCount: 0,
|
|
- deduplicatedWriteToAWSRemoteEgress: 0,
|
|
- readFromGCSCount: 6,
|
|
- readFromGCSIngress: 110,
|
|
- writeToAWSCount: 5,
|
|
- writeToAWSEgress: 143,
|
|
+ mongoUpdates: 3, // previously 5 blobs written to backedUpBlobs collection
|
|
+ readFromGCSCount: 4,
|
|
+ readFromGCSIngress: 96,
|
|
writeToGCSCount: 3,
|
|
writeToGCSEgress: 72,
|
|
}
|
|
const STATS_FILES_HASHED_EXTRA = {
|
|
...STATS_ALL_ZERO,
|
|
filesWithHash: 2,
|
|
- mongoUpdates: 2,
|
|
+ mongoUpdates: 0, // previously 2 blobs written to backedUpBlobs collection
|
|
readFromGCSCount: 2,
|
|
readFromGCSIngress: 48,
|
|
- writeToAWSCount: 2,
|
|
- writeToAWSEgress: 60,
|
|
writeToGCSCount: 2,
|
|
writeToGCSEgress: 48,
|
|
}
|
|
@@ -1144,8 +992,6 @@ describe('back_fill_file_hash script', function () {
|
|
...STATS_ALL_ZERO,
|
|
filesFailed: 1,
|
|
readFromGCSIngress: -24,
|
|
- writeToAWSCount: -1,
|
|
- writeToAWSEgress: -28,
|
|
writeToGCSCount: -1,
|
|
writeToGCSEgress: -24,
|
|
})
|
|
@@ -1269,13 +1115,14 @@ describe('back_fill_file_hash script', function () {
|
|
before('run script with hashed files', async function () {
|
|
output2 = await runScript(['--processHashedFiles=true'], {})
|
|
})
|
|
- it('should print stats', function () {
|
|
+ it('should print stats for the first run without hashed files', function () {
|
|
expect(output1.stats).deep.equal(STATS_ALL)
|
|
+ })
|
|
+ it('should print stats for the hashed files run', function () {
|
|
expect(output2.stats).deep.equal({
|
|
...STATS_FILES_HASHED_EXTRA,
|
|
projects: 10,
|
|
blobs: 10,
|
|
- backedUpBlobs: 10,
|
|
badFileTrees: 4,
|
|
})
|
|
})
|
|
@@ -1322,9 +1169,7 @@ describe('back_fill_file_hash script', function () {
|
|
...STATS_FILES_HASHED_EXTRA,
|
|
readFromGCSCount: 3,
|
|
readFromGCSIngress: 72,
|
|
- deduplicatedWriteToAWSLocalCount: 1,
|
|
- deduplicatedWriteToAWSLocalEgress: 30,
|
|
- mongoUpdates: 1,
|
|
+ mongoUpdates: 0,
|
|
filesWithHash: 3,
|
|
})
|
|
)
|
|
@@ -1354,48 +1199,6 @@ describe('back_fill_file_hash script', function () {
|
|
expect(output.stats).deep.equal(
|
|
sumStats(STATS_ALL, {
|
|
...STATS_ALL_ZERO,
|
|
- // one remote deduplicate
|
|
- deduplicatedWriteToAWSRemoteCount: 1,
|
|
- deduplicatedWriteToAWSRemoteEgress: 28,
|
|
- writeToAWSEgress: -28, // subtract skipped egress
|
|
- })
|
|
- )
|
|
- })
|
|
- commonAssertions()
|
|
- })
|
|
-
|
|
- describe('with something in the bucket and marked as processed', function () {
|
|
- before('prepare environment', prepareEnvironment)
|
|
- before('create a file in s3', async function () {
|
|
- await backupPersistor.sendStream(
|
|
- projectBlobsBucket,
|
|
- makeProjectKey(historyId0, hashTextBlob0),
|
|
- Stream.Readable.from([contentTextBlob0]),
|
|
- { contentLength: contentTextBlob0.byteLength }
|
|
- )
|
|
- await backedUpBlobs.insertMany([
|
|
- {
|
|
- _id: projectId0,
|
|
- blobs: [binaryForGitBlobHash(hashTextBlob0)],
|
|
- },
|
|
- ])
|
|
- })
|
|
- let output
|
|
- before('run script', async function () {
|
|
- output = await runScript([], {
|
|
- CONCURRENCY: '1',
|
|
- })
|
|
- })
|
|
-
|
|
- it('should print stats', function () {
|
|
- expect(output.stats).deep.equal(
|
|
- sumStats(STATS_ALL, {
|
|
- ...STATS_ALL_ZERO,
|
|
- backedUpBlobs: 1,
|
|
- writeToAWSCount: -1,
|
|
- writeToAWSEgress: -27,
|
|
- readFromGCSCount: -1,
|
|
- readFromGCSIngress: -7,
|
|
})
|
|
)
|
|
})
|
|
@@ -1418,8 +1221,10 @@ describe('back_fill_file_hash script', function () {
|
|
})
|
|
})
|
|
|
|
- it('should print stats', function () {
|
|
+ it('should print stats for part 0', function () {
|
|
expect(outputPart0.stats).to.deep.equal(STATS_UP_TO_PROJECT1)
|
|
+ })
|
|
+ it('should print stats for part 1', function () {
|
|
expect(outputPart1.stats).to.deep.equal(STATS_UP_FROM_PROJECT1_ONWARD)
|
|
})
|
|
commonAssertions()
|
|
|
|
|
|
|
|
diff --git a/services/history-v1/storage/scripts/back_fill_file_hash.mjs b/services/history-v1/storage/scripts/back_fill_file_hash.mjs
|
|
index 3be1c8a5407..c9ed13c6cb4 100644
|
|
--- a/services/history-v1/storage/scripts/back_fill_file_hash.mjs
|
|
+++ b/services/history-v1/storage/scripts/back_fill_file_hash.mjs
|
|
@@ -388,12 +388,6 @@ async function processFileOnce(entry, filePath) {
|
|
fileId,
|
|
} = entry
|
|
const blobStore = new BlobStore(historyId)
|
|
- if (entry.blob) {
|
|
- const { blob } = entry
|
|
- const hash = blob.getHash()
|
|
- return hash
|
|
- }
|
|
-
|
|
STATS.readFromGCSCount++
|
|
// make a fetch request to filestore itself
|
|
const src = await fetchFromFilestore(projectId, fileId)
|
|
@@ -784,16 +778,6 @@ function* findFileInBatch(projects, prefix, blobs) {
|
|
const historyIdS = project.overleaf.history.id.toString()
|
|
const projectBlobs = blobs.get(historyIdS) || []
|
|
const ctx = new ProjectContext(project._id, historyIdS, projectBlobs)
|
|
- for (const blob of projectBlobs) {
|
|
- ctx.remainingQueueEntries++
|
|
- yield {
|
|
- ctx,
|
|
- cacheKey: blob.getHash(),
|
|
- path: MONGO_PATH_SKIP_WRITE_HASH_TO_FILE_TREE,
|
|
- blob,
|
|
- hash: blob.getHash(),
|
|
- }
|
|
- }
|
|
try {
|
|
yield* findFiles(ctx, project.rootFolder?.[0], prefix, true)
|
|
} catch (err) {
|
|
|
|
|
|
|
|
diff --git a/services/history-v1/storage/scripts/back_fill_file_hash.mjs b/services/history-v1/storage/scripts/back_fill_file_hash.mjs
|
|
index c9ed13c6cb4..f24ce4a6605 100644
|
|
--- a/services/history-v1/storage/scripts/back_fill_file_hash.mjs
|
|
+++ b/services/history-v1/storage/scripts/back_fill_file_hash.mjs
|
|
@@ -387,6 +387,13 @@ async function processFileOnce(entry, filePath) {
|
|
ctx: { projectId, historyId },
|
|
fileId,
|
|
} = entry
|
|
+ if (entry.hash && entry.ctx.hasCompletedBlob(entry.hash)) {
|
|
+ // We can enter this case for two identical files in the same project,
|
|
+ // one with hash, the other without. When the one without hash gets
|
|
+ // processed first, we can skip downloading the other one we already
|
|
+ // know the hash of.
|
|
+ return entry.hash
|
|
+ }
|
|
const blobStore = new BlobStore(historyId)
|
|
STATS.readFromGCSCount++
|
|
// make a fetch request to filestore itself
|
|
|
|
|
|
|
|
diff --git a/services/history-v1/storage/scripts/back_fill_file_hash.mjs b/services/history-v1/storage/scripts/back_fill_file_hash.mjs
|
|
index f24ce4a6605..0ccadaf5a95 100644
|
|
--- a/services/history-v1/storage/scripts/back_fill_file_hash.mjs
|
|
+++ b/services/history-v1/storage/scripts/back_fill_file_hash.mjs
|
|
@@ -559,8 +559,9 @@ async function processBatch(batch, prefix = 'rootFolder.0') {
|
|
blobs.clear()
|
|
|
|
// The files are currently ordered by project-id.
|
|
- // Order them by file-id ASC then blobs ASC to
|
|
- // - process files before blobs
|
|
+ // Order them by file-id ASC then hash ASC to
|
|
+ // increase the hit rate on the "already processed
|
|
+ // hash for project" checks.
|
|
files.sort(
|
|
/**
|
|
* @param {QueueEntry} a
|
|
|