mirror of
https://github.com/yu-i-i/overleaf-cep.git
synced 2026-05-23 17:19:37 +02:00
Merge pull request #27173 from overleaf/bg-filestore-migration-remove-backups
remove backup code from back_fill_file_hash script and tests GitOrigin-RevId: 364eefc47670e6e7f2314de810ea259b609ff976
This commit is contained in:
@@ -1,28 +1,20 @@
|
||||
// @ts-check
|
||||
import Crypto from 'node:crypto'
|
||||
import Events from 'node:events'
|
||||
import fs from 'node:fs'
|
||||
import Path from 'node:path'
|
||||
import { performance } from 'node:perf_hooks'
|
||||
import Stream from 'node:stream'
|
||||
import zLib from 'node:zlib'
|
||||
import { setTimeout } from 'node:timers/promises'
|
||||
import { Binary, ObjectId } from 'mongodb'
|
||||
import { ObjectId } from 'mongodb'
|
||||
import pLimit from 'p-limit'
|
||||
import logger from '@overleaf/logger'
|
||||
import {
|
||||
batchedUpdate,
|
||||
objectIdFromInput,
|
||||
renderObjectId,
|
||||
READ_PREFERENCE_SECONDARY,
|
||||
} from '@overleaf/mongo-utils/batchedUpdate.js'
|
||||
import OError from '@overleaf/o-error'
|
||||
import {
|
||||
AlreadyWrittenError,
|
||||
NoKEKMatchedError,
|
||||
NotFoundError,
|
||||
} from '@overleaf/object-persistor/src/Errors.js'
|
||||
import { backupPersistor, projectBlobsBucket } from '../lib/backupPersistor.mjs'
|
||||
import { NotFoundError } from '@overleaf/object-persistor/src/Errors.js'
|
||||
import {
|
||||
BlobStore,
|
||||
GLOBAL_BLOBS,
|
||||
@@ -30,9 +22,8 @@ import {
|
||||
getProjectBlobsBatch,
|
||||
getStringLengthOfFile,
|
||||
makeBlobForFile,
|
||||
makeProjectKey,
|
||||
} from '../lib/blob_store/index.js'
|
||||
import { backedUpBlobs as backedUpBlobsCollection, db } from '../lib/mongodb.js'
|
||||
import { db } from '../lib/mongodb.js'
|
||||
import commandLineArgs from 'command-line-args'
|
||||
import readline from 'node:readline'
|
||||
|
||||
@@ -88,7 +79,7 @@ ObjectId.cacheHexString = true
|
||||
*/
|
||||
|
||||
/**
|
||||
* @return {{PROJECT_IDS_FROM: string, PROCESS_HASHED_FILES: boolean, LOGGING_IDENTIFIER: string, BATCH_RANGE_START: string, PROCESS_BLOBS: boolean, BATCH_RANGE_END: string, PROCESS_NON_DELETED_PROJECTS: boolean, PROCESS_DELETED_PROJECTS: boolean, COLLECT_BACKED_UP_BLOBS: boolean}}
|
||||
* @return {{PROJECT_IDS_FROM: string, PROCESS_HASHED_FILES: boolean, LOGGING_IDENTIFIER: string, BATCH_RANGE_START: string, PROCESS_BLOBS: boolean, BATCH_RANGE_END: string, PROCESS_NON_DELETED_PROJECTS: boolean, PROCESS_DELETED_PROJECTS: boolean}}
|
||||
*/
|
||||
function parseArgs() {
|
||||
const PUBLIC_LAUNCH_DATE = new Date('2012-01-01T00:00:00Z')
|
||||
@@ -98,7 +89,6 @@ function parseArgs() {
|
||||
{ name: 'processHashedFiles', type: String, defaultValue: 'false' },
|
||||
{ name: 'processBlobs', type: String, defaultValue: 'true' },
|
||||
{ name: 'projectIdsFrom', type: String, defaultValue: '' },
|
||||
{ name: 'collectBackedUpBlobs', type: String, defaultValue: 'true' },
|
||||
{
|
||||
name: 'BATCH_RANGE_START',
|
||||
type: String,
|
||||
@@ -130,7 +120,6 @@ function parseArgs() {
|
||||
PROCESS_DELETED_PROJECTS: boolVal('processDeletedProjects'),
|
||||
PROCESS_BLOBS: boolVal('processBlobs'),
|
||||
PROCESS_HASHED_FILES: boolVal('processHashedFiles'),
|
||||
COLLECT_BACKED_UP_BLOBS: boolVal('collectBackedUpBlobs'),
|
||||
BATCH_RANGE_START,
|
||||
BATCH_RANGE_END,
|
||||
LOGGING_IDENTIFIER: args['LOGGING_IDENTIFIER'] || BATCH_RANGE_START,
|
||||
@@ -143,7 +132,6 @@ const {
|
||||
PROCESS_DELETED_PROJECTS,
|
||||
PROCESS_BLOBS,
|
||||
PROCESS_HASHED_FILES,
|
||||
COLLECT_BACKED_UP_BLOBS,
|
||||
BATCH_RANGE_START,
|
||||
BATCH_RANGE_END,
|
||||
LOGGING_IDENTIFIER,
|
||||
@@ -232,7 +220,6 @@ async function processConcurrently(array, fn) {
|
||||
const STATS = {
|
||||
projects: 0,
|
||||
blobs: 0,
|
||||
backedUpBlobs: 0,
|
||||
filesWithHash: 0,
|
||||
filesWithoutHash: 0,
|
||||
filesDuplicated: 0,
|
||||
@@ -246,14 +233,8 @@ const STATS = {
|
||||
projectHardDeleted: 0,
|
||||
fileHardDeleted: 0,
|
||||
mongoUpdates: 0,
|
||||
deduplicatedWriteToAWSLocalCount: 0,
|
||||
deduplicatedWriteToAWSLocalEgress: 0,
|
||||
deduplicatedWriteToAWSRemoteCount: 0,
|
||||
deduplicatedWriteToAWSRemoteEgress: 0,
|
||||
readFromGCSCount: 0,
|
||||
readFromGCSIngress: 0,
|
||||
writeToAWSCount: 0,
|
||||
writeToAWSEgress: 0,
|
||||
writeToGCSCount: 0,
|
||||
writeToGCSEgress: 0,
|
||||
}
|
||||
@@ -275,7 +256,7 @@ function toMiBPerSecond(v, ms) {
|
||||
/**
|
||||
* @param {any} stats
|
||||
* @param {number} ms
|
||||
* @return {{writeToAWSThroughputMiBPerSecond: number, readFromGCSThroughputMiBPerSecond: number}}
|
||||
* @return {{readFromGCSThroughputMiBPerSecond: number}}
|
||||
*/
|
||||
function bandwidthStats(stats, ms) {
|
||||
return {
|
||||
@@ -283,10 +264,6 @@ function bandwidthStats(stats, ms) {
|
||||
stats.readFromGCSIngress,
|
||||
ms
|
||||
),
|
||||
writeToAWSThroughputMiBPerSecond: toMiBPerSecond(
|
||||
stats.writeToAWSEgress,
|
||||
ms
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -382,9 +359,6 @@ async function processFile(entry, filePath) {
|
||||
throw err // disable retries for not found in filestore bucket case
|
||||
}
|
||||
}
|
||||
if (err instanceof NoKEKMatchedError) {
|
||||
throw err // disable retries when upload to S3 will fail again
|
||||
}
|
||||
STATS.filesRetries++
|
||||
const {
|
||||
ctx: { projectId },
|
||||
@@ -413,37 +387,14 @@ async function processFileOnce(entry, filePath) {
|
||||
ctx: { projectId, historyId },
|
||||
fileId,
|
||||
} = entry
|
||||
const blobStore = new BlobStore(historyId)
|
||||
if (entry.blob) {
|
||||
const { blob } = entry
|
||||
const hash = blob.getHash()
|
||||
if (entry.ctx.hasBackedUpBlob(hash)) {
|
||||
STATS.deduplicatedWriteToAWSLocalCount++
|
||||
STATS.deduplicatedWriteToAWSLocalEgress += estimateBlobSize(blob)
|
||||
return hash
|
||||
}
|
||||
entry.ctx.recordPendingBlob(hash)
|
||||
STATS.readFromGCSCount++
|
||||
const src = await blobStore.getStream(hash)
|
||||
const dst = fs.createWriteStream(filePath, {
|
||||
highWaterMark: STREAM_HIGH_WATER_MARK,
|
||||
})
|
||||
try {
|
||||
await Stream.promises.pipeline(src, dst)
|
||||
} finally {
|
||||
STATS.readFromGCSIngress += dst.bytesWritten
|
||||
}
|
||||
await uploadBlobToAWS(entry, blob, filePath)
|
||||
return hash
|
||||
}
|
||||
if (entry.hash && entry.ctx.hasBackedUpBlob(entry.hash)) {
|
||||
STATS.deduplicatedWriteToAWSLocalCount++
|
||||
const blob = entry.ctx.getCachedHistoryBlob(entry.hash)
|
||||
// blob might not exist on re-run with --PROCESS_BLOBS=false
|
||||
if (blob) STATS.deduplicatedWriteToAWSLocalEgress += estimateBlobSize(blob)
|
||||
if (entry.hash && entry.ctx.hasCompletedBlob(entry.hash)) {
|
||||
// We can enter this case for two identical files in the same project,
|
||||
// one with hash, the other without. When the one without hash gets
|
||||
// processed first, we can skip downloading the other one we already
|
||||
// know the hash of.
|
||||
return entry.hash
|
||||
}
|
||||
|
||||
const blobStore = new BlobStore(historyId)
|
||||
STATS.readFromGCSCount++
|
||||
// make a fetch request to filestore itself
|
||||
const src = await fetchFromFilestore(projectId, fileId)
|
||||
@@ -469,16 +420,14 @@ async function processFileOnce(entry, filePath) {
|
||||
STATS.globalBlobsEgress += estimateBlobSize(blob)
|
||||
return hash
|
||||
}
|
||||
if (entry.ctx.hasBackedUpBlob(hash)) {
|
||||
STATS.deduplicatedWriteToAWSLocalCount++
|
||||
STATS.deduplicatedWriteToAWSLocalEgress += estimateBlobSize(blob)
|
||||
if (entry.ctx.hasCompletedBlob(hash)) {
|
||||
return hash
|
||||
}
|
||||
entry.ctx.recordPendingBlob(hash)
|
||||
|
||||
try {
|
||||
await uploadBlobToGCS(blobStore, entry, blob, hash, filePath)
|
||||
await uploadBlobToAWS(entry, blob, filePath)
|
||||
entry.ctx.recordCompletedBlob(hash) // mark upload as completed
|
||||
} catch (err) {
|
||||
entry.ctx.recordFailedBlob(hash)
|
||||
throw err
|
||||
@@ -515,76 +464,6 @@ async function uploadBlobToGCS(blobStore, entry, blob, hash, filePath) {
|
||||
|
||||
const GZ_SUFFIX = '.gz'
|
||||
|
||||
/**
|
||||
* @param {QueueEntry} entry
|
||||
* @param {Blob} blob
|
||||
* @param {string} filePath
|
||||
* @return {Promise<void>}
|
||||
*/
|
||||
async function uploadBlobToAWS(entry, blob, filePath) {
|
||||
const { historyId } = entry.ctx
|
||||
let backupSource
|
||||
let contentEncoding
|
||||
const md5 = Crypto.createHash('md5')
|
||||
let size
|
||||
if (blob.getStringLength()) {
|
||||
const filePathCompressed = filePath + GZ_SUFFIX
|
||||
backupSource = filePathCompressed
|
||||
contentEncoding = 'gzip'
|
||||
size = 0
|
||||
await Stream.promises.pipeline(
|
||||
fs.createReadStream(filePath, { highWaterMark: STREAM_HIGH_WATER_MARK }),
|
||||
zLib.createGzip(),
|
||||
async function* (source) {
|
||||
for await (const chunk of source) {
|
||||
size += chunk.byteLength
|
||||
md5.update(chunk)
|
||||
yield chunk
|
||||
}
|
||||
},
|
||||
fs.createWriteStream(filePathCompressed, {
|
||||
highWaterMark: STREAM_HIGH_WATER_MARK,
|
||||
})
|
||||
)
|
||||
} else {
|
||||
backupSource = filePath
|
||||
size = blob.getByteLength()
|
||||
await Stream.promises.pipeline(
|
||||
fs.createReadStream(filePath, { highWaterMark: STREAM_HIGH_WATER_MARK }),
|
||||
md5
|
||||
)
|
||||
}
|
||||
const backendKeyPath = makeProjectKey(historyId, blob.getHash())
|
||||
const persistor = await entry.ctx.getCachedPersistor(backendKeyPath)
|
||||
try {
|
||||
STATS.writeToAWSCount++
|
||||
await persistor.sendStream(
|
||||
projectBlobsBucket,
|
||||
backendKeyPath,
|
||||
fs.createReadStream(backupSource, {
|
||||
highWaterMark: STREAM_HIGH_WATER_MARK,
|
||||
}),
|
||||
{
|
||||
contentEncoding,
|
||||
contentType: 'application/octet-stream',
|
||||
contentLength: size,
|
||||
sourceMd5: md5.digest('hex'),
|
||||
ifNoneMatch: '*', // de-duplicate write (we pay for the request, but avoid egress)
|
||||
}
|
||||
)
|
||||
STATS.writeToAWSEgress += size
|
||||
} catch (err) {
|
||||
if (err instanceof AlreadyWrittenError) {
|
||||
STATS.deduplicatedWriteToAWSRemoteCount++
|
||||
STATS.deduplicatedWriteToAWSRemoteEgress += size
|
||||
} else {
|
||||
STATS.writeToAWSEgress += size
|
||||
throw err
|
||||
}
|
||||
}
|
||||
entry.ctx.recordBackedUpBlob(blob.getHash())
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {Array<QueueEntry>} files
|
||||
* @return {Promise<void>}
|
||||
@@ -670,23 +549,19 @@ async function queueNextBatch(batch, prefix = 'rootFolder.0') {
|
||||
* @return {Promise<void>}
|
||||
*/
|
||||
async function processBatch(batch, prefix = 'rootFolder.0') {
|
||||
const [{ nBlobs, blobs }, { nBackedUpBlobs, backedUpBlobs }] =
|
||||
await Promise.all([collectProjectBlobs(batch), collectBackedUpBlobs(batch)])
|
||||
const files = Array.from(findFileInBatch(batch, prefix, blobs, backedUpBlobs))
|
||||
const { nBlobs, blobs } = await collectProjectBlobs(batch)
|
||||
const files = Array.from(findFileInBatch(batch, prefix, blobs))
|
||||
STATS.projects += batch.length
|
||||
STATS.blobs += nBlobs
|
||||
STATS.backedUpBlobs += nBackedUpBlobs
|
||||
|
||||
// GC
|
||||
batch.length = 0
|
||||
blobs.clear()
|
||||
backedUpBlobs.clear()
|
||||
|
||||
// The files are currently ordered by project-id.
|
||||
// Order them by file-id ASC then blobs ASC to
|
||||
// - process files before blobs
|
||||
// - avoid head-of-line blocking from many project-files waiting on the generation of the projects DEK (round trip to AWS)
|
||||
// - bonus: increase chance of de-duplicating write to AWS
|
||||
// Order them by file-id ASC then hash ASC to
|
||||
// increase the hit rate on the "already processed
|
||||
// hash for project" checks.
|
||||
files.sort(
|
||||
/**
|
||||
* @param {QueueEntry} a
|
||||
@@ -903,32 +778,14 @@ function* findFiles(ctx, folder, path, isInputLoop = false) {
|
||||
* @param {Array<Project>} projects
|
||||
* @param {string} prefix
|
||||
* @param {Map<string,Array<Blob>>} blobs
|
||||
* @param {Map<string,Array<string>>} backedUpBlobs
|
||||
* @return Generator<QueueEntry>
|
||||
*/
|
||||
function* findFileInBatch(projects, prefix, blobs, backedUpBlobs) {
|
||||
function* findFileInBatch(projects, prefix, blobs) {
|
||||
for (const project of projects) {
|
||||
const projectIdS = project._id.toString()
|
||||
const historyIdS = project.overleaf.history.id.toString()
|
||||
const projectBlobs = blobs.get(historyIdS) || []
|
||||
const projectBackedUpBlobs = new Set(backedUpBlobs.get(projectIdS) || [])
|
||||
const ctx = new ProjectContext(
|
||||
project._id,
|
||||
historyIdS,
|
||||
projectBlobs,
|
||||
projectBackedUpBlobs
|
||||
)
|
||||
for (const blob of projectBlobs) {
|
||||
if (projectBackedUpBlobs.has(blob.getHash())) continue
|
||||
ctx.remainingQueueEntries++
|
||||
yield {
|
||||
ctx,
|
||||
cacheKey: blob.getHash(),
|
||||
path: MONGO_PATH_SKIP_WRITE_HASH_TO_FILE_TREE,
|
||||
blob,
|
||||
hash: blob.getHash(),
|
||||
}
|
||||
}
|
||||
const ctx = new ProjectContext(project._id, historyIdS, projectBlobs)
|
||||
try {
|
||||
yield* findFiles(ctx, project.rootFolder?.[0], prefix, true)
|
||||
} catch (err) {
|
||||
@@ -951,42 +808,11 @@ async function collectProjectBlobs(batch) {
|
||||
return await getProjectBlobsBatch(batch.map(p => p.overleaf.history.id))
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {Array<Project>} projects
|
||||
* @return {Promise<{nBackedUpBlobs:number,backedUpBlobs:Map<string,Array<string>>}>}
|
||||
*/
|
||||
async function collectBackedUpBlobs(projects) {
|
||||
let nBackedUpBlobs = 0
|
||||
const backedUpBlobs = new Map()
|
||||
if (!COLLECT_BACKED_UP_BLOBS) return { nBackedUpBlobs, backedUpBlobs }
|
||||
|
||||
const cursor = backedUpBlobsCollection.find(
|
||||
{ _id: { $in: projects.map(p => p._id) } },
|
||||
{
|
||||
readPreference: READ_PREFERENCE_SECONDARY,
|
||||
sort: { _id: 1 },
|
||||
}
|
||||
)
|
||||
for await (const record of cursor) {
|
||||
const blobs = record.blobs.map(b => b.toString('hex'))
|
||||
backedUpBlobs.set(record._id.toString(), blobs)
|
||||
nBackedUpBlobs += blobs.length
|
||||
}
|
||||
return { nBackedUpBlobs, backedUpBlobs }
|
||||
}
|
||||
|
||||
const BATCH_HASH_WRITES = 1_000
|
||||
const BATCH_FILE_UPDATES = 100
|
||||
|
||||
const MONGO_PATH_SKIP_WRITE_HASH_TO_FILE_TREE = 'skip-write-to-file-tree'
|
||||
|
||||
class ProjectContext {
|
||||
/** @type {Promise<CachedPerProjectEncryptedS3Persistor> | null} */
|
||||
#cachedPersistorPromise = null
|
||||
|
||||
/** @type {Set<string>} */
|
||||
#backedUpBlobs
|
||||
|
||||
/** @type {Map<string, Blob>} */
|
||||
#historyBlobs
|
||||
|
||||
@@ -1000,12 +826,10 @@ class ProjectContext {
|
||||
* @param {ObjectId} projectId
|
||||
* @param {string} historyId
|
||||
* @param {Array<Blob>} blobs
|
||||
* @param {Set<string>} backedUpBlobs
|
||||
*/
|
||||
constructor(projectId, historyId, blobs, backedUpBlobs) {
|
||||
constructor(projectId, historyId, blobs) {
|
||||
this.projectId = projectId
|
||||
this.historyId = historyId
|
||||
this.#backedUpBlobs = backedUpBlobs
|
||||
this.#historyBlobs = new Map(blobs.map(b => [b.getHash(), b]))
|
||||
}
|
||||
|
||||
@@ -1034,75 +858,17 @@ class ProjectContext {
|
||||
return false
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} key
|
||||
* @return {Promise<CachedPerProjectEncryptedS3Persistor>}
|
||||
*/
|
||||
getCachedPersistor(key) {
|
||||
if (!this.#cachedPersistorPromise) {
|
||||
// Fetch DEK once, but only if needed -- upon the first use
|
||||
this.#cachedPersistorPromise = this.#getCachedPersistorWithRetries(key)
|
||||
}
|
||||
return this.#cachedPersistorPromise
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} key
|
||||
* @return {Promise<CachedPerProjectEncryptedS3Persistor>}
|
||||
*/
|
||||
async #getCachedPersistorWithRetries(key) {
|
||||
// Optimization: Skip GET on DEK in case no blobs are marked as backed up yet.
|
||||
let tryGenerateDEKFirst = this.#backedUpBlobs.size === 0
|
||||
for (let attempt = 0; attempt < RETRIES; attempt++) {
|
||||
try {
|
||||
if (tryGenerateDEKFirst) {
|
||||
try {
|
||||
return await backupPersistor.generateDataEncryptionKey(
|
||||
projectBlobsBucket,
|
||||
key
|
||||
)
|
||||
} catch (err) {
|
||||
if (err instanceof AlreadyWrittenError) {
|
||||
tryGenerateDEKFirst = false
|
||||
// fall back to GET below
|
||||
} else {
|
||||
throw err
|
||||
}
|
||||
}
|
||||
}
|
||||
return await backupPersistor.forProject(projectBlobsBucket, key)
|
||||
} catch (err) {
|
||||
if (gracefulShutdownInitiated) throw err
|
||||
if (err instanceof NoKEKMatchedError) {
|
||||
throw err
|
||||
} else {
|
||||
logger.warn(
|
||||
{ err, projectId: this.projectId, attempt },
|
||||
'failed to get DEK, trying again'
|
||||
)
|
||||
const jitter = Math.random() * RETRY_DELAY_MS
|
||||
await setTimeout(RETRY_DELAY_MS + jitter)
|
||||
}
|
||||
}
|
||||
}
|
||||
return await backupPersistor.forProject(projectBlobsBucket, key)
|
||||
}
|
||||
|
||||
async flushMongoQueuesIfNeeded() {
|
||||
if (this.remainingQueueEntries === 0) {
|
||||
await this.flushMongoQueues()
|
||||
}
|
||||
|
||||
if (this.#completedBlobs.size > BATCH_HASH_WRITES) {
|
||||
await this.#storeBackedUpBlobs()
|
||||
}
|
||||
if (this.#pendingFileWrites.length > BATCH_FILE_UPDATES) {
|
||||
await this.#storeFileHashes()
|
||||
}
|
||||
}
|
||||
|
||||
async flushMongoQueues() {
|
||||
await this.#storeBackedUpBlobs()
|
||||
await this.#storeFileHashes()
|
||||
}
|
||||
|
||||
@@ -1111,20 +877,6 @@ class ProjectContext {
|
||||
/** @type {Set<string>} */
|
||||
#completedBlobs = new Set()
|
||||
|
||||
async #storeBackedUpBlobs() {
|
||||
if (this.#completedBlobs.size === 0) return
|
||||
const blobs = Array.from(this.#completedBlobs).map(
|
||||
hash => new Binary(Buffer.from(hash, 'hex'))
|
||||
)
|
||||
this.#completedBlobs.clear()
|
||||
STATS.mongoUpdates++
|
||||
await backedUpBlobsCollection.updateOne(
|
||||
{ _id: this.projectId },
|
||||
{ $addToSet: { blobs: { $each: blobs } } },
|
||||
{ upsert: true }
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} hash
|
||||
*/
|
||||
@@ -1142,8 +894,7 @@ class ProjectContext {
|
||||
/**
|
||||
* @param {string} hash
|
||||
*/
|
||||
recordBackedUpBlob(hash) {
|
||||
this.#backedUpBlobs.add(hash)
|
||||
recordCompletedBlob(hash) {
|
||||
this.#completedBlobs.add(hash)
|
||||
this.#pendingBlobs.delete(hash)
|
||||
}
|
||||
@@ -1152,12 +903,8 @@ class ProjectContext {
|
||||
* @param {string} hash
|
||||
* @return {boolean}
|
||||
*/
|
||||
hasBackedUpBlob(hash) {
|
||||
return (
|
||||
this.#pendingBlobs.has(hash) ||
|
||||
this.#completedBlobs.has(hash) ||
|
||||
this.#backedUpBlobs.has(hash)
|
||||
)
|
||||
hasCompletedBlob(hash) {
|
||||
return this.#pendingBlobs.has(hash) || this.#completedBlobs.has(hash)
|
||||
}
|
||||
|
||||
/** @type {Array<QueueEntry>} */
|
||||
|
||||
@@ -4,23 +4,17 @@ import Stream from 'node:stream'
|
||||
import { setTimeout } from 'node:timers/promises'
|
||||
import { promisify } from 'node:util'
|
||||
import { ObjectId, Binary } from 'mongodb'
|
||||
import {
|
||||
db,
|
||||
backedUpBlobs,
|
||||
globalBlobs,
|
||||
} from '../../../../storage/lib/mongodb.js'
|
||||
import { db, globalBlobs } from '../../../../storage/lib/mongodb.js'
|
||||
import cleanup from './support/cleanup.js'
|
||||
import testProjects from '../api/support/test_projects.js'
|
||||
import { execFile } from 'node:child_process'
|
||||
import chai, { expect } from 'chai'
|
||||
import chaiExclude from 'chai-exclude'
|
||||
import config from 'config'
|
||||
import { WritableBuffer } from '@overleaf/stream-utils'
|
||||
import {
|
||||
backupPersistor,
|
||||
projectBlobsBucket,
|
||||
} from '../../../../storage/lib/backupPersistor.mjs'
|
||||
import projectKey from '../../../../storage/lib/project_key.js'
|
||||
import {
|
||||
BlobStore,
|
||||
makeProjectKey,
|
||||
@@ -31,9 +25,6 @@ import express from 'express'
|
||||
chai.use(chaiExclude)
|
||||
const TIMEOUT = 20 * 1_000
|
||||
|
||||
const { deksBucket } = config.get('backupStore')
|
||||
const { tieringStorageClass } = config.get('backupPersistor')
|
||||
|
||||
const projectsCollection = db.collection('projects')
|
||||
const deletedProjectsCollection = db.collection('deletedProjects')
|
||||
|
||||
@@ -117,17 +108,6 @@ function binaryForGitBlobHash(gitBlobHash) {
|
||||
return new Binary(Buffer.from(gitBlobHash, 'hex'))
|
||||
}
|
||||
|
||||
async function listS3Bucket(bucket, wantStorageClass) {
|
||||
const client = backupPersistor._getClientForBucket(bucket)
|
||||
const response = await client.listObjectsV2({ Bucket: bucket }).promise()
|
||||
|
||||
for (const object of response.Contents || []) {
|
||||
expect(object).to.have.property('StorageClass', wantStorageClass)
|
||||
}
|
||||
|
||||
return (response.Contents || []).map(item => item.Key || '')
|
||||
}
|
||||
|
||||
function objectIdFromTime(timestamp) {
|
||||
return ObjectId.createFromTime(new Date(timestamp).getTime() / 1000)
|
||||
}
|
||||
@@ -591,11 +571,7 @@ describe('back_fill_file_hash script', function () {
|
||||
expect((await fs.promises.readdir('/tmp')).join(';')).to.not.match(
|
||||
/back_fill_file_hash/
|
||||
)
|
||||
const extraStatsKeys = [
|
||||
'eventLoop',
|
||||
'readFromGCSThroughputMiBPerSecond',
|
||||
'writeToAWSThroughputMiBPerSecond',
|
||||
]
|
||||
const extraStatsKeys = ['eventLoop', 'readFromGCSThroughputMiBPerSecond']
|
||||
const stats = JSON.parse(
|
||||
result.stderr
|
||||
.split('\n')
|
||||
@@ -610,7 +586,6 @@ describe('back_fill_file_hash script', function () {
|
||||
delete stats.time
|
||||
if (shouldHaveWritten) {
|
||||
expect(stats.readFromGCSThroughputMiBPerSecond).to.be.greaterThan(0)
|
||||
expect(stats.writeToAWSThroughputMiBPerSecond).to.be.greaterThan(0)
|
||||
}
|
||||
for (const key of extraStatsKeys) {
|
||||
delete stats[key]
|
||||
@@ -856,109 +831,6 @@ describe('back_fill_file_hash script', function () {
|
||||
},
|
||||
},
|
||||
])
|
||||
expect(
|
||||
(await backedUpBlobs.find({}, { sort: { _id: 1 } }).toArray()).map(
|
||||
entry => {
|
||||
// blobs are pushed unordered into mongo. Sort the list for consistency.
|
||||
entry.blobs.sort()
|
||||
return entry
|
||||
}
|
||||
)
|
||||
).to.deep.equal([
|
||||
{
|
||||
_id: projectId0,
|
||||
blobs: [
|
||||
binaryForGitBlobHash(gitBlobHash(fileId0)),
|
||||
binaryForGitBlobHash(hashFile7),
|
||||
binaryForGitBlobHash(hashTextBlob0),
|
||||
].sort(),
|
||||
},
|
||||
{
|
||||
_id: projectId1,
|
||||
blobs: [
|
||||
binaryForGitBlobHash(gitBlobHash(fileId1)),
|
||||
binaryForGitBlobHash(hashTextBlob1),
|
||||
].sort(),
|
||||
},
|
||||
{
|
||||
_id: projectId2,
|
||||
blobs: [binaryForGitBlobHash(hashTextBlob2)]
|
||||
.concat(
|
||||
processHashedFiles
|
||||
? [binaryForGitBlobHash(gitBlobHash(fileId2))]
|
||||
: []
|
||||
)
|
||||
.sort(),
|
||||
},
|
||||
{
|
||||
_id: projectIdDeleted0,
|
||||
blobs: [binaryForGitBlobHash(gitBlobHash(fileId4))].sort(),
|
||||
},
|
||||
{
|
||||
_id: projectId3,
|
||||
blobs: [binaryForGitBlobHash(gitBlobHash(fileId3))].sort(),
|
||||
},
|
||||
...(processHashedFiles
|
||||
? [
|
||||
{
|
||||
_id: projectIdDeleted1,
|
||||
blobs: [binaryForGitBlobHash(gitBlobHash(fileId5))].sort(),
|
||||
},
|
||||
]
|
||||
: []),
|
||||
{
|
||||
_id: projectIdBadFileTree0,
|
||||
blobs: [binaryForGitBlobHash(hashTextBlob3)].sort(),
|
||||
},
|
||||
{
|
||||
_id: projectIdBadFileTree3,
|
||||
blobs: [binaryForGitBlobHash(gitBlobHash(fileId9))].sort(),
|
||||
},
|
||||
])
|
||||
})
|
||||
it('should have backed up all the files', async function () {
|
||||
expect(tieringStorageClass).to.exist
|
||||
const blobs = await listS3Bucket(projectBlobsBucket, tieringStorageClass)
|
||||
expect(blobs.sort()).to.deep.equal(
|
||||
Array.from(
|
||||
new Set(
|
||||
writtenBlobs
|
||||
.map(({ historyId, fileId, hash }) =>
|
||||
makeProjectKey(historyId, hash || gitBlobHash(fileId))
|
||||
)
|
||||
.sort()
|
||||
)
|
||||
)
|
||||
)
|
||||
for (let { historyId, fileId, hash, content } of writtenBlobs) {
|
||||
hash = hash || gitBlobHash(fileId.toString())
|
||||
const s = await backupPersistor.getObjectStream(
|
||||
projectBlobsBucket,
|
||||
makeProjectKey(historyId, hash),
|
||||
{ autoGunzip: true }
|
||||
)
|
||||
const buf = new WritableBuffer()
|
||||
await Stream.promises.pipeline(s, buf)
|
||||
expect(gitBlobHashBuffer(buf.getContents())).to.equal(hash)
|
||||
if (content) {
|
||||
expect(buf.getContents()).to.deep.equal(content)
|
||||
} else {
|
||||
const id = buf.getContents().toString('utf-8')
|
||||
expect(id).to.equal(fileId.toString())
|
||||
// double check we are not comparing 'undefined' or '[object Object]' above
|
||||
expect(id).to.match(/^[a-f0-9]{24}$/)
|
||||
}
|
||||
}
|
||||
const deks = await listS3Bucket(deksBucket, 'STANDARD')
|
||||
expect(deks.sort()).to.deep.equal(
|
||||
Array.from(
|
||||
new Set(
|
||||
writtenBlobs.map(
|
||||
({ historyId }) => projectKey.format(historyId) + '/dek'
|
||||
)
|
||||
)
|
||||
).sort()
|
||||
)
|
||||
})
|
||||
it('should have written the back filled files to history v1', async function () {
|
||||
for (const { historyId, hash, fileId, content } of writtenBlobs) {
|
||||
@@ -991,14 +863,13 @@ describe('back_fill_file_hash script', function () {
|
||||
// We still need to iterate over all the projects and blobs.
|
||||
projects: 10,
|
||||
blobs: 10,
|
||||
backedUpBlobs: 10,
|
||||
|
||||
badFileTrees: 4,
|
||||
}
|
||||
if (processHashedFiles) {
|
||||
stats = sumStats(stats, {
|
||||
...STATS_ALL_ZERO,
|
||||
blobs: 2,
|
||||
backedUpBlobs: 2,
|
||||
})
|
||||
}
|
||||
expect(rerun.stats).deep.equal(stats)
|
||||
@@ -1024,7 +895,6 @@ describe('back_fill_file_hash script', function () {
|
||||
const STATS_ALL_ZERO = {
|
||||
projects: 0,
|
||||
blobs: 0,
|
||||
backedUpBlobs: 0,
|
||||
filesWithHash: 0,
|
||||
filesWithoutHash: 0,
|
||||
filesDuplicated: 0,
|
||||
@@ -1038,21 +908,14 @@ describe('back_fill_file_hash script', function () {
|
||||
fileHardDeleted: 0,
|
||||
badFileTrees: 0,
|
||||
mongoUpdates: 0,
|
||||
deduplicatedWriteToAWSLocalCount: 0,
|
||||
deduplicatedWriteToAWSLocalEgress: 0,
|
||||
deduplicatedWriteToAWSRemoteCount: 0,
|
||||
deduplicatedWriteToAWSRemoteEgress: 0,
|
||||
readFromGCSCount: 0,
|
||||
readFromGCSIngress: 0,
|
||||
writeToAWSCount: 0,
|
||||
writeToAWSEgress: 0,
|
||||
writeToGCSCount: 0,
|
||||
writeToGCSEgress: 0,
|
||||
}
|
||||
const STATS_UP_TO_PROJECT1 = {
|
||||
projects: 2,
|
||||
blobs: 2,
|
||||
backedUpBlobs: 0,
|
||||
filesWithHash: 0,
|
||||
filesWithoutHash: 5,
|
||||
filesDuplicated: 1,
|
||||
@@ -1065,22 +928,15 @@ describe('back_fill_file_hash script', function () {
|
||||
projectHardDeleted: 0,
|
||||
fileHardDeleted: 0,
|
||||
badFileTrees: 0,
|
||||
mongoUpdates: 4,
|
||||
deduplicatedWriteToAWSLocalCount: 0,
|
||||
deduplicatedWriteToAWSLocalEgress: 0,
|
||||
deduplicatedWriteToAWSRemoteCount: 0,
|
||||
deduplicatedWriteToAWSRemoteEgress: 0,
|
||||
readFromGCSCount: 6,
|
||||
readFromGCSIngress: 4000086,
|
||||
writeToAWSCount: 5,
|
||||
writeToAWSEgress: 4026,
|
||||
mongoUpdates: 2, // 4-2 blobs written to backedUpBlobs collection
|
||||
readFromGCSCount: 4,
|
||||
readFromGCSIngress: 4000072,
|
||||
writeToGCSCount: 3,
|
||||
writeToGCSEgress: 4000048,
|
||||
}
|
||||
const STATS_UP_FROM_PROJECT1_ONWARD = {
|
||||
projects: 8,
|
||||
blobs: 2,
|
||||
backedUpBlobs: 0,
|
||||
filesWithHash: 0,
|
||||
filesWithoutHash: 4,
|
||||
filesDuplicated: 0,
|
||||
@@ -1093,26 +949,18 @@ describe('back_fill_file_hash script', function () {
|
||||
projectHardDeleted: 0,
|
||||
fileHardDeleted: 0,
|
||||
badFileTrees: 4,
|
||||
mongoUpdates: 8,
|
||||
deduplicatedWriteToAWSLocalCount: 1,
|
||||
deduplicatedWriteToAWSLocalEgress: 30,
|
||||
deduplicatedWriteToAWSRemoteCount: 0,
|
||||
deduplicatedWriteToAWSRemoteEgress: 0,
|
||||
readFromGCSCount: 6,
|
||||
readFromGCSIngress: 110,
|
||||
writeToAWSCount: 5,
|
||||
writeToAWSEgress: 143,
|
||||
mongoUpdates: 3, // previously 5 blobs written to backedUpBlobs collection
|
||||
readFromGCSCount: 4,
|
||||
readFromGCSIngress: 96,
|
||||
writeToGCSCount: 3,
|
||||
writeToGCSEgress: 72,
|
||||
}
|
||||
const STATS_FILES_HASHED_EXTRA = {
|
||||
...STATS_ALL_ZERO,
|
||||
filesWithHash: 2,
|
||||
mongoUpdates: 2,
|
||||
mongoUpdates: 0, // previously 2 blobs written to backedUpBlobs collection
|
||||
readFromGCSCount: 2,
|
||||
readFromGCSIngress: 48,
|
||||
writeToAWSCount: 2,
|
||||
writeToAWSEgress: 60,
|
||||
writeToGCSCount: 2,
|
||||
writeToGCSEgress: 48,
|
||||
}
|
||||
@@ -1144,8 +992,6 @@ describe('back_fill_file_hash script', function () {
|
||||
...STATS_ALL_ZERO,
|
||||
filesFailed: 1,
|
||||
readFromGCSIngress: -24,
|
||||
writeToAWSCount: -1,
|
||||
writeToAWSEgress: -28,
|
||||
writeToGCSCount: -1,
|
||||
writeToGCSEgress: -24,
|
||||
})
|
||||
@@ -1269,13 +1115,14 @@ describe('back_fill_file_hash script', function () {
|
||||
before('run script with hashed files', async function () {
|
||||
output2 = await runScript(['--processHashedFiles=true'], {})
|
||||
})
|
||||
it('should print stats', function () {
|
||||
it('should print stats for the first run without hashed files', function () {
|
||||
expect(output1.stats).deep.equal(STATS_ALL)
|
||||
})
|
||||
it('should print stats for the hashed files run', function () {
|
||||
expect(output2.stats).deep.equal({
|
||||
...STATS_FILES_HASHED_EXTRA,
|
||||
projects: 10,
|
||||
blobs: 10,
|
||||
backedUpBlobs: 10,
|
||||
badFileTrees: 4,
|
||||
})
|
||||
})
|
||||
@@ -1322,9 +1169,7 @@ describe('back_fill_file_hash script', function () {
|
||||
...STATS_FILES_HASHED_EXTRA,
|
||||
readFromGCSCount: 3,
|
||||
readFromGCSIngress: 72,
|
||||
deduplicatedWriteToAWSLocalCount: 1,
|
||||
deduplicatedWriteToAWSLocalEgress: 30,
|
||||
mongoUpdates: 1,
|
||||
mongoUpdates: 0,
|
||||
filesWithHash: 3,
|
||||
})
|
||||
)
|
||||
@@ -1354,48 +1199,6 @@ describe('back_fill_file_hash script', function () {
|
||||
expect(output.stats).deep.equal(
|
||||
sumStats(STATS_ALL, {
|
||||
...STATS_ALL_ZERO,
|
||||
// one remote deduplicate
|
||||
deduplicatedWriteToAWSRemoteCount: 1,
|
||||
deduplicatedWriteToAWSRemoteEgress: 28,
|
||||
writeToAWSEgress: -28, // subtract skipped egress
|
||||
})
|
||||
)
|
||||
})
|
||||
commonAssertions()
|
||||
})
|
||||
|
||||
describe('with something in the bucket and marked as processed', function () {
|
||||
before('prepare environment', prepareEnvironment)
|
||||
before('create a file in s3', async function () {
|
||||
await backupPersistor.sendStream(
|
||||
projectBlobsBucket,
|
||||
makeProjectKey(historyId0, hashTextBlob0),
|
||||
Stream.Readable.from([contentTextBlob0]),
|
||||
{ contentLength: contentTextBlob0.byteLength }
|
||||
)
|
||||
await backedUpBlobs.insertMany([
|
||||
{
|
||||
_id: projectId0,
|
||||
blobs: [binaryForGitBlobHash(hashTextBlob0)],
|
||||
},
|
||||
])
|
||||
})
|
||||
let output
|
||||
before('run script', async function () {
|
||||
output = await runScript([], {
|
||||
CONCURRENCY: '1',
|
||||
})
|
||||
})
|
||||
|
||||
it('should print stats', function () {
|
||||
expect(output.stats).deep.equal(
|
||||
sumStats(STATS_ALL, {
|
||||
...STATS_ALL_ZERO,
|
||||
backedUpBlobs: 1,
|
||||
writeToAWSCount: -1,
|
||||
writeToAWSEgress: -27,
|
||||
readFromGCSCount: -1,
|
||||
readFromGCSIngress: -7,
|
||||
})
|
||||
)
|
||||
})
|
||||
@@ -1418,8 +1221,10 @@ describe('back_fill_file_hash script', function () {
|
||||
})
|
||||
})
|
||||
|
||||
it('should print stats', function () {
|
||||
it('should print stats for part 0', function () {
|
||||
expect(outputPart0.stats).to.deep.equal(STATS_UP_TO_PROJECT1)
|
||||
})
|
||||
it('should print stats for part 1', function () {
|
||||
expect(outputPart1.stats).to.deep.equal(STATS_UP_FROM_PROJECT1_ONWARD)
|
||||
})
|
||||
commonAssertions()
|
||||
|
||||
Reference in New Issue
Block a user