mirror of
https://github.com/yu-i-i/overleaf-cep.git
synced 2026-05-24 17:51:51 +02:00
[object-persistor] depend on @overleaf/metrics directly GitOrigin-RevId: eb0c07af8101d44def14154abb552bc77254e074
268 lines
7.6 KiB
JavaScript
268 lines
7.6 KiB
JavaScript
const AbstractPersistor = require('./AbstractPersistor')
|
|
const Logger = require('@overleaf/logger')
|
|
const Metrics = require('@overleaf/metrics')
|
|
const Stream = require('stream')
|
|
const { pipeline } = require('stream/promises')
|
|
const { NotFoundError, WriteError } = require('./Errors')
|
|
|
|
// Persistor that wraps two other persistors. Talks to the 'primary' by default,
|
|
// but will fall back to an older persistor in the case of a not-found error.
|
|
// If `Settings.fallback.copyOnMiss` is set, this will copy files from the fallback
|
|
// to the primary, in the event that they are missing.
|
|
//
|
|
// It is unlikely that the bucket/location name will be the same on the fallback
|
|
// as the primary. The bucket names should be overridden in `Settings.fallback.buckets`
|
|
// e.g.
|
|
// Settings.fallback.buckets = {
|
|
// myBucketOnS3: 'myBucketOnGCS'
|
|
// }
|
|
|
|
module.exports = class MigrationPersistor extends AbstractPersistor {
|
|
/**
|
|
* @param {AbstractPersistor} primaryPersistor
|
|
* @param {AbstractPersistor} fallbackPersistor
|
|
* @param settings
|
|
*/
|
|
constructor(primaryPersistor, fallbackPersistor, settings) {
|
|
super()
|
|
|
|
/**
|
|
* @type {AbstractPersistor}
|
|
*/
|
|
this.primaryPersistor = primaryPersistor
|
|
/**
|
|
* @type {AbstractPersistor}
|
|
*/
|
|
this.fallbackPersistor = fallbackPersistor
|
|
this.settings = settings
|
|
}
|
|
|
|
async sendFile(...args) {
|
|
return await this.primaryPersistor.sendFile(...args)
|
|
}
|
|
|
|
async sendStream(...args) {
|
|
return await this.primaryPersistor.sendStream(...args)
|
|
}
|
|
|
|
async getRedirectUrl(...args) {
|
|
return await this.primaryPersistor.getRedirectUrl(...args)
|
|
}
|
|
|
|
async getObjectMd5Hash(...args) {
|
|
return await this._runWithFallback('getObjectMd5Hash', ...args)
|
|
}
|
|
|
|
async checkIfObjectExists(...args) {
|
|
return await this._runWithFallback('checkIfObjectExists', ...args)
|
|
}
|
|
|
|
async getObjectSize(...args) {
|
|
return await this._runWithFallback('getObjectSize', ...args)
|
|
}
|
|
|
|
async directorySize(...args) {
|
|
return await this._runWithFallback('directorySize', ...args)
|
|
}
|
|
|
|
async deleteObject(...args) {
|
|
return await this._runOnBoth('deleteObject', ...args)
|
|
}
|
|
|
|
async deleteDirectory(...args) {
|
|
return await this._runOnBoth('deleteDirectory', ...args)
|
|
}
|
|
|
|
async getObjectStream(bucket, key, opts = {}) {
|
|
const shouldCopy = this.settings.copyOnMiss && !opts.start && !opts.end
|
|
|
|
try {
|
|
return await this.primaryPersistor.getObjectStream(bucket, key, opts)
|
|
} catch (err) {
|
|
if (err instanceof NotFoundError) {
|
|
const fallbackBucket = this._getFallbackBucket(bucket)
|
|
const fallbackStream = await this.fallbackPersistor.getObjectStream(
|
|
fallbackBucket,
|
|
key,
|
|
opts
|
|
)
|
|
// tee the stream to the client, and as a copy to the primary (if necessary)
|
|
// start listening on both straight away so that we don't consume bytes
|
|
// in one place before the other
|
|
const returnStream = new Stream.PassThrough()
|
|
pipeline(fallbackStream, returnStream).catch(error => {
|
|
Logger.warn({ error }, 'failed to copy object from fallback')
|
|
})
|
|
|
|
if (shouldCopy) {
|
|
const copyStream = new Stream.PassThrough()
|
|
pipeline(fallbackStream, copyStream).catch(error => {
|
|
Logger.warn({ error }, 'failed to copy object from fallback')
|
|
})
|
|
|
|
this._copyStreamFromFallbackAndVerify(
|
|
copyStream,
|
|
fallbackBucket,
|
|
bucket,
|
|
key,
|
|
key
|
|
).catch(error => {
|
|
Logger.warn({ error }, 'failed to copy file from fallback')
|
|
})
|
|
}
|
|
return returnStream
|
|
}
|
|
throw err
|
|
}
|
|
}
|
|
|
|
async copyObject(bucket, sourceKey, destKey) {
|
|
try {
|
|
return await this.primaryPersistor.copyObject(bucket, sourceKey, destKey)
|
|
} catch (err) {
|
|
if (err instanceof NotFoundError) {
|
|
const fallbackBucket = this._getFallbackBucket(bucket)
|
|
const fallbackStream = await this.fallbackPersistor.getObjectStream(
|
|
fallbackBucket,
|
|
sourceKey,
|
|
{}
|
|
)
|
|
|
|
const copyStream = new Stream.PassThrough()
|
|
pipeline(fallbackStream, copyStream).catch(error => {
|
|
Logger.warn({ error }, 'failed to copy object from fallback')
|
|
})
|
|
|
|
if (this.settings.copyOnMiss) {
|
|
const missStream = new Stream.PassThrough()
|
|
pipeline(fallbackStream, missStream).catch(error => {
|
|
Logger.warn({ error }, 'failed to copy object from fallback')
|
|
})
|
|
|
|
// copy from sourceKey -> sourceKey
|
|
this._copyStreamFromFallbackAndVerify(
|
|
missStream,
|
|
fallbackBucket,
|
|
bucket,
|
|
sourceKey,
|
|
sourceKey
|
|
).catch(() => {
|
|
// swallow errors, as this runs in the background and will log a warning
|
|
})
|
|
}
|
|
// copy from sourceKey -> destKey
|
|
return await this._copyStreamFromFallbackAndVerify(
|
|
copyStream,
|
|
fallbackBucket,
|
|
bucket,
|
|
sourceKey,
|
|
destKey
|
|
)
|
|
}
|
|
throw err
|
|
}
|
|
}
|
|
|
|
async _copyStreamFromFallbackAndVerify(
|
|
stream,
|
|
sourceBucket,
|
|
destBucket,
|
|
sourceKey,
|
|
destKey
|
|
) {
|
|
try {
|
|
let sourceMd5
|
|
try {
|
|
sourceMd5 = await this.fallbackPersistor.getObjectMd5Hash(
|
|
sourceBucket,
|
|
sourceKey
|
|
)
|
|
} catch (err) {
|
|
Logger.warn(err, 'error getting md5 hash from fallback persistor')
|
|
}
|
|
|
|
await this.primaryPersistor.sendStream(destBucket, destKey, stream, {
|
|
sourceMd5,
|
|
})
|
|
} catch (err) {
|
|
const error = new WriteError(
|
|
'unable to copy file to destination persistor',
|
|
{
|
|
sourceBucket,
|
|
destBucket,
|
|
sourceKey,
|
|
destKey,
|
|
},
|
|
err
|
|
)
|
|
Metrics.inc('fallback.copy.failure')
|
|
|
|
try {
|
|
await this.primaryPersistor.deleteObject(destBucket, destKey)
|
|
} catch (err) {
|
|
error.info.cleanupError = new WriteError(
|
|
'unable to clean up destination copy artifact',
|
|
{
|
|
destBucket,
|
|
destKey,
|
|
},
|
|
err
|
|
)
|
|
}
|
|
throw error
|
|
}
|
|
}
|
|
|
|
_getFallbackBucket(bucket) {
|
|
return (this.settings.buckets && this.settings.buckets[bucket]) || bucket
|
|
}
|
|
|
|
async _runOnBoth(methodName, bucket, ...moreArgs) {
|
|
const fallbackBucket = this._getFallbackBucket(bucket)
|
|
|
|
await Promise.all([
|
|
this.primaryPersistor[methodName](bucket, ...moreArgs),
|
|
this.fallbackPersistor[methodName](fallbackBucket, ...moreArgs),
|
|
])
|
|
}
|
|
|
|
/**
|
|
* @param {keyof AbstractPersistor} methodName
|
|
* @param bucket
|
|
* @param key
|
|
* @param moreArgs
|
|
*/
|
|
async _runWithFallback(methodName, bucket, key, ...moreArgs) {
|
|
try {
|
|
return await this.primaryPersistor[methodName](bucket, key, ...moreArgs)
|
|
} catch (err) {
|
|
if (err instanceof NotFoundError) {
|
|
const fallbackBucket = this._getFallbackBucket(bucket)
|
|
if (this.settings.copyOnMiss) {
|
|
const fallbackStream = await this.fallbackPersistor.getObjectStream(
|
|
fallbackBucket,
|
|
key,
|
|
{}
|
|
)
|
|
// run in background
|
|
this._copyStreamFromFallbackAndVerify(
|
|
fallbackStream,
|
|
fallbackBucket,
|
|
bucket,
|
|
key,
|
|
key
|
|
).catch(err => {
|
|
Logger.warn({ err }, 'failed to copy file from fallback')
|
|
})
|
|
}
|
|
return await this.fallbackPersistor[methodName](
|
|
fallbackBucket,
|
|
key,
|
|
...moreArgs
|
|
)
|
|
}
|
|
throw err
|
|
}
|
|
}
|
|
}
|