Extend script to allow multiple blobs to be processed from csv

GitOrigin-RevId: ad47eb754436ddc7f56b27ceda627268c3a030a1
This commit is contained in:
Andrew Rumble
2025-03-13 15:01:02 +00:00
committed by Copybot
parent b5d6484991
commit f134746c9c

View File

@@ -11,6 +11,9 @@ import assert from '../lib/assert.js'
import knex from '../lib/knex.js'
import { client } from '../lib/mongodb.js'
import { setTimeout } from 'node:timers/promises'
import fs from 'node:fs'
await loadGlobalBlobs()
/**
* Gracefully shutdown the process
@@ -26,13 +29,72 @@ async function gracefulShutdown() {
/**
*
* @return {Promise<{hash: string, historyId: string}>}
* @param {string} row
* @return {BackupBlobJob}
*/
async function fetchOptions() {
const { historyId, hash } = commandLineArgs([
{ name: 'historyId', type: String },
{ name: 'hash', type: String },
])
function parseCSVRow(row) {
const [historyId, hash] = row.split(',')
validateBackedUpBlobJob({ historyId, hash })
return { historyId, hash }
}
/**
*
* @param {BackupBlobJob} job
*/
function validateBackedUpBlobJob(job) {
assert.projectId(job.historyId)
assert.blobHash(job.hash)
}
/**
*
* @param {string} path
* @return {Promise<Array<BackupBlobJob>>}
*/
async function readCSV(path) {
let fh
/** @type {Array<BackupBlobJob>} */
const rows = []
try {
fh = await fs.promises.open(path, 'r')
} catch (error) {
console.error(`Could not open file: ${error}`)
throw error
}
for await (const line of fh.readLines()) {
try {
const row = parseCSVRow(line)
if (GLOBAL_BLOBS.has(row.hash)) {
console.log(`Skipping global blob: ${line}`)
continue
}
rows.push(row)
} catch (error) {
console.error(error instanceof Error ? error.message : error)
console.log(`Skipping invalid row: ${line}`)
}
}
return rows
}
/**
* @typedef {Object} BackupBlobJob
* @property {string} hash
* @property {string} historyId
*/
/**
* @param {Object} options
* @property {string} [options.historyId]
* @property {string} [options.hash]
* @property {string} [options.input]
* @return {Promise<Array<BackupBlobJob>>}
*/
async function initialiseJobs({ historyId, hash, input }) {
if (input) {
return await readCSV(input)
}
if (!historyId) {
console.error('historyId is required')
@@ -40,24 +102,20 @@ async function fetchOptions() {
await gracefulShutdown()
}
assert.projectId(historyId)
if (!hash) {
console.error('hash is required')
process.exitCode = 1
await gracefulShutdown()
}
assert.blobHash(hash)
await loadGlobalBlobs()
validateBackedUpBlobJob({ historyId, hash })
if (GLOBAL_BLOBS.has(hash)) {
console.error(`Blob ${hash} is a global blob; not backing up`)
process.exitCode = 1
await gracefulShutdown()
}
return { hash, historyId }
return [{ hash, historyId }]
}
/**
@@ -80,28 +138,34 @@ export async function downloadAndBackupBlob(historyId, hash) {
})
}
let options
let jobs
const options = commandLineArgs([
{ name: 'historyId', type: String },
{ name: 'hash', type: String },
{ name: 'input', type: String },
])
try {
options = await fetchOptions()
jobs = await initialiseJobs(options)
} catch (error) {
console.error(error)
await gracefulShutdown()
}
if (!options) {
if (!Array.isArray(jobs)) {
// This is mostly to satisfy typescript
process.exitCode = 1
await gracefulShutdown()
process.exit(1)
}
try {
const { hash, historyId } = options
await downloadAndBackupBlob(historyId, hash)
} catch (error) {
console.error(error)
process.exitCode = 1
} finally {
await gracefulShutdown()
for (const { historyId, hash } of jobs) {
try {
await downloadAndBackupBlob(historyId, hash)
} catch (error) {
console.error(error)
process.exitCode = 1
}
}
await gracefulShutdown()