From 36056e75d77b8341819e8918758c8df6964c66f7 Mon Sep 17 00:00:00 2001 From: Andrew Rumble Date: Wed, 5 Mar 2025 17:02:55 +0000 Subject: [PATCH] Improve chunk loading in backupVerifier Brings the process closer to history_store. We can't use the backup history_store because the keys are generated differently for chunks than the standard history_store way of doing it. GitOrigin-RevId: 07adfc0531f6ec0f38bb70ea0fe8ae0d41f508cc --- .../history-v1/storage/lib/backupVerifier.mjs | 29 +++++++++++++------ 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/services/history-v1/storage/lib/backupVerifier.mjs b/services/history-v1/storage/lib/backupVerifier.mjs index 6e1bc3ee7b..503aa07cc8 100644 --- a/services/history-v1/storage/lib/backupVerifier.mjs +++ b/services/history-v1/storage/lib/backupVerifier.mjs @@ -12,10 +12,10 @@ import { BlobStore, GLOBAL_BLOBS, makeProjectKey } from './blob_store/index.js' import blobHash from './blob_hash.js' import { NotFoundError } from '@overleaf/object-persistor/src/Errors.js' import logger from '@overleaf/logger' -import { text } from 'node:stream/consumers' -import { createGunzip } from 'node:zlib' import path from 'node:path' import projectKey from './project_key.js' +import streams from './streams.js' +import objectPersistor from '@overleaf/object-persistor' const RPO = parseInt(config.get('backupRPOInMS'), 10) @@ -123,12 +123,20 @@ async function loadChunk(historyId, startVersion, backupPersistorForProject) { projectKey.format(historyId), projectKey.pad(startVersion) ) - const backupChunkStream = await backupPersistorForProject.getObjectStream( - chunksBucket, - key - ) - const raw = await text(backupChunkStream.pipe(createGunzip())) - return JSON.parse(raw) + try { + const buf = await streams.gunzipStreamToBuffer( + await backupPersistorForProject.getObjectStream(chunksBucket, key) + ) + return JSON.parse(buf.toString('utf-8')) + } catch (err) { + if (err instanceof objectPersistor.Errors.NotFoundError) { + throw new Chunk.NotPersistedError(historyId) + } + if (err instanceof Error) { + throw OError.tag(err, 'Failed to load chunk', { historyId, startVersion }) + } + throw err + } } /** @@ -165,7 +173,10 @@ export async function verifyProject(historyId, endTimestamp) { ) } catch (err) { if (err instanceof Chunk.NotPersistedError) { - throw new BackupRPOViolationError('backup RPO violation', chunk) + throw new BackupRPOViolationError( + 'BackupRPOviolation: chunk not backed up', + chunk + ) } throw err }