mirror of
https://github.com/yu-i-i/overleaf-cep.git
synced 2026-05-23 17:19:37 +02:00
Merge pull request #21380 from overleaf/jpa-s3-ssec-backend
[object-persistor] add backend for SSE-C with S3 using KEK and DEK GitOrigin-RevId: 9676f5cd5e08107c8c284b68b8d450a1c05bf1b1
This commit is contained in:
@@ -24,6 +24,7 @@
|
||||
"@overleaf/logger": "*",
|
||||
"@overleaf/metrics": "*",
|
||||
"@overleaf/o-error": "*",
|
||||
"@overleaf/stream-utils": "*",
|
||||
"aws-sdk": "^2.1691.0",
|
||||
"fast-crc32c": "overleaf/node-fast-crc32c#aae6b2a4c7a7a159395df9cc6c38dfde702d6f51",
|
||||
"glob": "^7.1.6",
|
||||
|
||||
@@ -1,6 +1,12 @@
|
||||
const { NotImplementedError } = require('./Errors')
|
||||
|
||||
module.exports = class AbstractPersistor {
|
||||
/**
|
||||
* @param location
|
||||
* @param target
|
||||
* @param {string} source
|
||||
* @return {Promise<void>}
|
||||
*/
|
||||
async sendFile(location, target, source) {
|
||||
throw new NotImplementedError('method not implemented in persistor', {
|
||||
method: 'sendFile',
|
||||
@@ -10,6 +16,13 @@ module.exports = class AbstractPersistor {
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* @param location
|
||||
* @param target
|
||||
* @param {NodeJS.ReadableStream} sourceStream
|
||||
* @param {Object} opts
|
||||
* @return {Promise<void>}
|
||||
*/
|
||||
async sendStream(location, target, sourceStream, opts = {}) {
|
||||
throw new NotImplementedError('method not implemented in persistor', {
|
||||
method: 'sendStream',
|
||||
@@ -25,7 +38,7 @@ module.exports = class AbstractPersistor {
|
||||
* @param {Object} opts
|
||||
* @param {Number} opts.start
|
||||
* @param {Number} opts.end
|
||||
* @return {Promise<Readable>}
|
||||
* @return {Promise<NodeJS.ReadableStream>}
|
||||
*/
|
||||
async getObjectStream(location, name, opts) {
|
||||
throw new NotImplementedError('method not implemented in persistor', {
|
||||
@@ -36,6 +49,11 @@ module.exports = class AbstractPersistor {
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} location
|
||||
* @param {string} name
|
||||
* @return {Promise<string>}
|
||||
*/
|
||||
async getRedirectUrl(location, name) {
|
||||
throw new NotImplementedError('method not implemented in persistor', {
|
||||
method: 'getRedirectUrl',
|
||||
@@ -44,7 +62,13 @@ module.exports = class AbstractPersistor {
|
||||
})
|
||||
}
|
||||
|
||||
async getObjectSize(location, name) {
|
||||
/**
|
||||
* @param {string} location
|
||||
* @param {string} name
|
||||
* @param {Object} opts
|
||||
* @return {Promise<number>}
|
||||
*/
|
||||
async getObjectSize(location, name, opts) {
|
||||
throw new NotImplementedError('method not implemented in persistor', {
|
||||
method: 'getObjectSize',
|
||||
location,
|
||||
@@ -52,7 +76,13 @@ module.exports = class AbstractPersistor {
|
||||
})
|
||||
}
|
||||
|
||||
async getObjectMd5Hash(location, name) {
|
||||
/**
|
||||
* @param {string} location
|
||||
* @param {string} name
|
||||
* @param {Object} opts
|
||||
* @return {Promise<string>}
|
||||
*/
|
||||
async getObjectMd5Hash(location, name, opts) {
|
||||
throw new NotImplementedError('method not implemented in persistor', {
|
||||
method: 'getObjectMd5Hash',
|
||||
location,
|
||||
@@ -60,7 +90,14 @@ module.exports = class AbstractPersistor {
|
||||
})
|
||||
}
|
||||
|
||||
async copyObject(location, fromName, toName) {
|
||||
/**
|
||||
* @param {string} location
|
||||
* @param {string} fromName
|
||||
* @param {string} toName
|
||||
* @param {Object} opts
|
||||
* @return {Promise<void>}
|
||||
*/
|
||||
async copyObject(location, fromName, toName, opts) {
|
||||
throw new NotImplementedError('method not implemented in persistor', {
|
||||
method: 'copyObject',
|
||||
location,
|
||||
@@ -69,6 +106,11 @@ module.exports = class AbstractPersistor {
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} location
|
||||
* @param {string} name
|
||||
* @return {Promise<void>}
|
||||
*/
|
||||
async deleteObject(location, name) {
|
||||
throw new NotImplementedError('method not implemented in persistor', {
|
||||
method: 'deleteObject',
|
||||
@@ -77,7 +119,13 @@ module.exports = class AbstractPersistor {
|
||||
})
|
||||
}
|
||||
|
||||
async deleteDirectory(location, name) {
|
||||
/**
|
||||
* @param {string} location
|
||||
* @param {string} name
|
||||
* @param {string} [continuationToken]
|
||||
* @return {Promise<void>}
|
||||
*/
|
||||
async deleteDirectory(location, name, continuationToken) {
|
||||
throw new NotImplementedError('method not implemented in persistor', {
|
||||
method: 'deleteDirectory',
|
||||
location,
|
||||
@@ -85,7 +133,13 @@ module.exports = class AbstractPersistor {
|
||||
})
|
||||
}
|
||||
|
||||
async checkIfObjectExists(location, name) {
|
||||
/**
|
||||
* @param {string} location
|
||||
* @param {string} name
|
||||
* @param {Object} opts
|
||||
* @return {Promise<boolean>}
|
||||
*/
|
||||
async checkIfObjectExists(location, name, opts) {
|
||||
throw new NotImplementedError('method not implemented in persistor', {
|
||||
method: 'checkIfObjectExists',
|
||||
location,
|
||||
@@ -93,7 +147,13 @@ module.exports = class AbstractPersistor {
|
||||
})
|
||||
}
|
||||
|
||||
async directorySize(location, name) {
|
||||
/**
|
||||
* @param {string} location
|
||||
* @param {string} name
|
||||
* @param {string} [continuationToken]
|
||||
* @return {Promise<number>}
|
||||
*/
|
||||
async directorySize(location, name, continuationToken) {
|
||||
throw new NotImplementedError('method not implemented in persistor', {
|
||||
method: 'directorySize',
|
||||
location,
|
||||
|
||||
282
libraries/object-persistor/src/PerProjectEncryptedS3Persistor.js
Normal file
282
libraries/object-persistor/src/PerProjectEncryptedS3Persistor.js
Normal file
@@ -0,0 +1,282 @@
|
||||
// @ts-check
|
||||
const Stream = require('stream')
|
||||
const { promisify } = require('util')
|
||||
const Crypto = require('crypto')
|
||||
const { WritableBuffer } = require('@overleaf/stream-utils')
|
||||
const { S3Persistor, SSECOptions } = require('./S3Persistor.js')
|
||||
|
||||
const generateKey = promisify(Crypto.generateKey)
|
||||
|
||||
/**
|
||||
* @typedef {Object} Settings
|
||||
* @property {(bucketName: string, path: string) => {bucketName: string, path: string}} pathToDataEncryptionKeyPath
|
||||
* @property {(bucketName: string, path: string) => boolean} pathIsProjectFolder
|
||||
* @property {() => Promise<Buffer>} getKeyEncryptionKey
|
||||
*/
|
||||
|
||||
const {
|
||||
NotFoundError,
|
||||
NotImplementedError,
|
||||
AlreadyWrittenError,
|
||||
} = require('./Errors')
|
||||
const fs = require('fs')
|
||||
|
||||
class PerProjectEncryptedS3Persistor extends S3Persistor {
|
||||
/** @type Settings */
|
||||
#settings
|
||||
/** @type Promise<SSECOptions> */
|
||||
#keyEncryptionKeyOptions
|
||||
|
||||
/**
|
||||
* @param {Settings} settings
|
||||
*/
|
||||
constructor(settings) {
|
||||
super(settings)
|
||||
this.#settings = settings
|
||||
this.#keyEncryptionKeyOptions = this.#settings
|
||||
.getKeyEncryptionKey()
|
||||
.then(keyAsBuffer => new SSECOptions(keyAsBuffer))
|
||||
}
|
||||
|
||||
async ensureKeyEncryptionKeyLoaded() {
|
||||
await this.#keyEncryptionKeyOptions
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} bucketName
|
||||
* @param {string} path
|
||||
*/
|
||||
async getDataEncryptionKeySize(bucketName, path) {
|
||||
const dekPath = this.#settings.pathToDataEncryptionKeyPath(bucketName, path)
|
||||
return await super.getObjectSize(dekPath.bucketName, dekPath.path, {
|
||||
ssecOptions: await this.#keyEncryptionKeyOptions,
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} bucketName
|
||||
* @param {string} path
|
||||
* @return {Promise<CachedPerProjectEncryptedS3Persistor>}
|
||||
*/
|
||||
async forProject(bucketName, path) {
|
||||
return new CachedPerProjectEncryptedS3Persistor(
|
||||
this,
|
||||
await this.#getDataEncryptionKeyOptions(bucketName, path)
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} bucketName
|
||||
* @param {string} path
|
||||
* @return {Promise<void>}
|
||||
*/
|
||||
async generateDataEncryptionKey(bucketName, path) {
|
||||
await this.#generateDataEncryptionKeyOptions(bucketName, path)
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} bucketName
|
||||
* @param {string} path
|
||||
* @return {Promise<SSECOptions>}
|
||||
*/
|
||||
async #generateDataEncryptionKeyOptions(bucketName, path) {
|
||||
const dataEncryptionKey = (
|
||||
await generateKey('aes', { length: 256 })
|
||||
).export()
|
||||
const dekPath = this.#settings.pathToDataEncryptionKeyPath(bucketName, path)
|
||||
await super.sendStream(
|
||||
dekPath.bucketName,
|
||||
dekPath.path,
|
||||
Stream.Readable.from([dataEncryptionKey]),
|
||||
{
|
||||
// Do not overwrite any objects if already created
|
||||
ifNoneMatch: '*',
|
||||
ssecOptions: await this.#keyEncryptionKeyOptions,
|
||||
}
|
||||
)
|
||||
return new SSECOptions(dataEncryptionKey)
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} bucketName
|
||||
* @param {string} path
|
||||
* @return {Promise<SSECOptions>}
|
||||
*/
|
||||
async #getExistingDataEncryptionKeyOptions(bucketName, path) {
|
||||
const dekPath = this.#settings.pathToDataEncryptionKeyPath(bucketName, path)
|
||||
const res = await super.getObjectStream(dekPath.bucketName, dekPath.path, {
|
||||
ssecOptions: await this.#keyEncryptionKeyOptions,
|
||||
})
|
||||
const buf = new WritableBuffer()
|
||||
await Stream.promises.pipeline(res, buf)
|
||||
return new SSECOptions(buf.getContents())
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} bucketName
|
||||
* @param {string} path
|
||||
* @return {Promise<SSECOptions>}
|
||||
*/
|
||||
async #getDataEncryptionKeyOptions(bucketName, path) {
|
||||
try {
|
||||
return await this.#getExistingDataEncryptionKeyOptions(bucketName, path)
|
||||
} catch (err) {
|
||||
if (err instanceof NotFoundError) {
|
||||
try {
|
||||
return await this.#generateDataEncryptionKeyOptions(bucketName, path)
|
||||
} catch (err2) {
|
||||
if (err2 instanceof AlreadyWrittenError) {
|
||||
// Concurrent initial write
|
||||
return await this.#getExistingDataEncryptionKeyOptions(
|
||||
bucketName,
|
||||
path
|
||||
)
|
||||
}
|
||||
throw err2
|
||||
}
|
||||
}
|
||||
throw err
|
||||
}
|
||||
}
|
||||
|
||||
async sendStream(bucketName, path, sourceStream, opts = {}) {
|
||||
const ssecOptions =
|
||||
opts.ssecOptions ||
|
||||
(await this.#getDataEncryptionKeyOptions(bucketName, path))
|
||||
return await super.sendStream(bucketName, path, sourceStream, {
|
||||
...opts,
|
||||
ssecOptions,
|
||||
})
|
||||
}
|
||||
|
||||
async getObjectStream(bucketName, path, opts = {}) {
|
||||
const ssecOptions =
|
||||
opts.ssecOptions ||
|
||||
(await this.#getExistingDataEncryptionKeyOptions(bucketName, path))
|
||||
return await super.getObjectStream(bucketName, path, {
|
||||
...opts,
|
||||
ssecOptions,
|
||||
})
|
||||
}
|
||||
|
||||
async getObjectSize(bucketName, path, opts = {}) {
|
||||
const ssecOptions =
|
||||
opts.ssecOptions ||
|
||||
(await this.#getExistingDataEncryptionKeyOptions(bucketName, path))
|
||||
return await super.getObjectSize(bucketName, path, { ...opts, ssecOptions })
|
||||
}
|
||||
|
||||
async directorySize(bucketName, path, continuationToken) {
|
||||
// Note: Listing a bucket does not require SSE-C credentials.
|
||||
return await super.directorySize(bucketName, path, continuationToken)
|
||||
}
|
||||
|
||||
async deleteDirectory(bucketName, path, continuationToken) {
|
||||
// Note: Listing/Deleting a prefix does not require SSE-C credentials.
|
||||
await super.deleteDirectory(bucketName, path, continuationToken)
|
||||
if (this.#settings.pathIsProjectFolder(bucketName, path)) {
|
||||
const dekPath = this.#settings.pathToDataEncryptionKeyPath(
|
||||
bucketName,
|
||||
path
|
||||
)
|
||||
await super.deleteObject(dekPath.bucketName, dekPath.path)
|
||||
}
|
||||
}
|
||||
|
||||
async getObjectMd5Hash(bucketName, path, opts = {}) {
|
||||
// The ETag in object metadata is not the MD5 content hash, skip the HEAD request.
|
||||
opts = { ...opts, etagIsNotMD5: true }
|
||||
return await super.getObjectMd5Hash(bucketName, path, opts)
|
||||
}
|
||||
|
||||
async copyObject(bucketName, sourcePath, destinationPath, opts = {}) {
|
||||
const ssecOptions =
|
||||
opts.ssecOptions ||
|
||||
(await this.#getDataEncryptionKeyOptions(bucketName, destinationPath))
|
||||
const ssecSrcOptions =
|
||||
opts.ssecSrcOptions ||
|
||||
(await this.#getExistingDataEncryptionKeyOptions(bucketName, sourcePath))
|
||||
return await super.copyObject(bucketName, sourcePath, destinationPath, {
|
||||
...opts,
|
||||
ssecOptions,
|
||||
ssecSrcOptions,
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} bucketName
|
||||
* @param {string} path
|
||||
* @return {Promise<string>}
|
||||
*/
|
||||
async getRedirectUrl(bucketName, path) {
|
||||
throw new NotImplementedError('signed links are not supported with SSE-C')
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper class for batch updates to avoid repeated fetching of the project path.
|
||||
*
|
||||
* A general "cache" for project keys is another alternative. For now, use a helper class.
|
||||
*/
|
||||
class CachedPerProjectEncryptedS3Persistor {
|
||||
/** @type SSECOptions */
|
||||
#projectKeyOptions
|
||||
/** @type PerProjectEncryptedS3Persistor */
|
||||
#parent
|
||||
|
||||
/**
|
||||
* @param {PerProjectEncryptedS3Persistor} parent
|
||||
* @param {SSECOptions} projectKeyOptions
|
||||
*/
|
||||
constructor(parent, projectKeyOptions) {
|
||||
this.#parent = parent
|
||||
this.#projectKeyOptions = projectKeyOptions
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} bucketName
|
||||
* @param {string} path
|
||||
* @param {string} fsPath
|
||||
*/
|
||||
async sendFile(bucketName, path, fsPath) {
|
||||
return await this.sendStream(bucketName, path, fs.createReadStream(fsPath))
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} bucketName
|
||||
* @param {string} path
|
||||
* @param {NodeJS.ReadableStream} sourceStream
|
||||
* @param {Object} opts
|
||||
* @param {string} [opts.contentType]
|
||||
* @param {string} [opts.contentEncoding]
|
||||
* @param {'*'} [opts.ifNoneMatch]
|
||||
* @param {SSECOptions} [opts.ssecOptions]
|
||||
* @param {string} [opts.sourceMd5]
|
||||
* @return {Promise<void>}
|
||||
*/
|
||||
async sendStream(bucketName, path, sourceStream, opts = {}) {
|
||||
return await this.#parent.sendStream(bucketName, path, sourceStream, {
|
||||
...opts,
|
||||
ssecOptions: this.#projectKeyOptions,
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} bucketName
|
||||
* @param {string} path
|
||||
* @param {Object} opts
|
||||
* @param {number} [opts.start]
|
||||
* @param {number} [opts.end]
|
||||
* @param {string} [opts.contentEncoding]
|
||||
* @param {SSECOptions} [opts.ssecOptions]
|
||||
* @return {Promise<NodeJS.ReadableStream>}
|
||||
*/
|
||||
async getObjectStream(bucketName, path, opts = {}) {
|
||||
return await this.#parent.getObjectStream(bucketName, path, {
|
||||
...opts,
|
||||
ssecOptions: this.#projectKeyOptions,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = PerProjectEncryptedS3Persistor
|
||||
@@ -1,15 +1,18 @@
|
||||
const Logger = require('@overleaf/logger')
|
||||
const { SettingsError } = require('./Errors')
|
||||
const GcsPersistor = require('./GcsPersistor')
|
||||
const S3Persistor = require('./S3Persistor')
|
||||
const { S3Persistor } = require('./S3Persistor')
|
||||
const FSPersistor = require('./FSPersistor')
|
||||
const MigrationPersistor = require('./MigrationPersistor')
|
||||
const PerProjectEncryptedS3Persistor = require('./PerProjectEncryptedS3Persistor')
|
||||
|
||||
function getPersistor(backend, settings) {
|
||||
switch (backend) {
|
||||
case 'aws-sdk':
|
||||
case 's3':
|
||||
return new S3Persistor(settings.s3)
|
||||
case 's3SSEC':
|
||||
return new PerProjectEncryptedS3Persistor(settings.s3SSEC)
|
||||
case 'fs':
|
||||
return new FSPersistor({
|
||||
useSubdirectories: settings.useSubdirectories,
|
||||
|
||||
@@ -26,12 +26,14 @@ const SIZE_BUCKETS = [
|
||||
*/
|
||||
class ObserverStream extends Stream.Transform {
|
||||
/**
|
||||
* @param {string} metric prefix for metrics
|
||||
* @param {string} bucket name of source/target bucket
|
||||
* @param {string} hash optional hash algorithm, e.g. 'md5'
|
||||
* @param {Object} opts
|
||||
* @param {string} opts.metric prefix for metrics
|
||||
* @param {string} opts.bucket name of source/target bucket
|
||||
* @param {string} [opts.hash] optional hash algorithm, e.g. 'md5'
|
||||
*/
|
||||
constructor({ metric, bucket, hash = '' }) {
|
||||
constructor(opts) {
|
||||
super({ autoDestroy: true })
|
||||
const { metric, bucket, hash = '' } = opts
|
||||
|
||||
this.bytes = 0
|
||||
this.start = performance.now()
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
// @ts-check
|
||||
const http = require('http')
|
||||
const https = require('https')
|
||||
if (http.globalAgent.maxSockets < 300) {
|
||||
@@ -7,6 +8,7 @@ if (https.globalAgent.maxSockets < 300) {
|
||||
https.globalAgent.maxSockets = 300
|
||||
}
|
||||
|
||||
const Crypto = require('crypto')
|
||||
const Metrics = require('@overleaf/metrics')
|
||||
const AbstractPersistor = require('./AbstractPersistor')
|
||||
const PersistorHelper = require('./PersistorHelper')
|
||||
@@ -17,17 +19,75 @@ const S3 = require('aws-sdk/clients/s3')
|
||||
const { URL } = require('url')
|
||||
const { WriteError, ReadError, NotFoundError } = require('./Errors')
|
||||
|
||||
module.exports = class S3Persistor extends AbstractPersistor {
|
||||
/**
|
||||
* Wrapper with private fields to avoid revealing them on console, JSON.stringify or similar.
|
||||
*/
|
||||
class SSECOptions {
|
||||
#keyAsBuffer
|
||||
#keyMD5
|
||||
|
||||
/**
|
||||
* @param {Buffer} keyAsBuffer
|
||||
*/
|
||||
constructor(keyAsBuffer) {
|
||||
this.#keyAsBuffer = keyAsBuffer
|
||||
this.#keyMD5 = Crypto.createHash('md5').update(keyAsBuffer).digest('base64')
|
||||
}
|
||||
|
||||
getPutOptions() {
|
||||
return {
|
||||
SSECustomerKey: this.#keyAsBuffer,
|
||||
SSECustomerKeyMD5: this.#keyMD5,
|
||||
SSECustomerAlgorithm: 'AES256',
|
||||
}
|
||||
}
|
||||
|
||||
getGetOptions() {
|
||||
return {
|
||||
SSECustomerKey: this.#keyAsBuffer,
|
||||
SSECustomerKeyMD5: this.#keyMD5,
|
||||
SSECustomerAlgorithm: 'AES256',
|
||||
}
|
||||
}
|
||||
|
||||
getCopyOptions() {
|
||||
return {
|
||||
CopySourceSSECustomerKey: this.#keyAsBuffer,
|
||||
CopySourceSSECustomerKeyMD5: this.#keyMD5,
|
||||
CopySourceSSECustomerAlgorithm: 'AES256',
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class S3Persistor extends AbstractPersistor {
|
||||
constructor(settings = {}) {
|
||||
super()
|
||||
|
||||
this.settings = settings
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} bucketName
|
||||
* @param {string} key
|
||||
* @param {string} fsPath
|
||||
* @return {Promise<void>}
|
||||
*/
|
||||
async sendFile(bucketName, key, fsPath) {
|
||||
return await this.sendStream(bucketName, key, fs.createReadStream(fsPath))
|
||||
await this.sendStream(bucketName, key, fs.createReadStream(fsPath))
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} bucketName
|
||||
* @param {string} key
|
||||
* @param {NodeJS.ReadableStream} readStream
|
||||
* @param {Object} opts
|
||||
* @param {string} [opts.contentType]
|
||||
* @param {string} [opts.contentEncoding]
|
||||
* @param {'*'} [opts.ifNoneMatch]
|
||||
* @param {SSECOptions} [opts.ssecOptions]
|
||||
* @param {string} [opts.sourceMd5]
|
||||
* @return {Promise<void>}
|
||||
*/
|
||||
async sendStream(bucketName, key, readStream, opts = {}) {
|
||||
try {
|
||||
const observeOptions = {
|
||||
@@ -55,6 +115,9 @@ module.exports = class S3Persistor extends AbstractPersistor {
|
||||
if (opts.ifNoneMatch === '*') {
|
||||
uploadOptions.IfNoneMatch = '*'
|
||||
}
|
||||
if (opts.ssecOptions) {
|
||||
Object.assign(uploadOptions, opts.ssecOptions.getPutOptions())
|
||||
}
|
||||
|
||||
// if we have an md5 hash, pass this to S3 to verify the upload - otherwise
|
||||
// we rely on the S3 client's checksum calculation to validate the upload
|
||||
@@ -78,6 +141,16 @@ module.exports = class S3Persistor extends AbstractPersistor {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} bucketName
|
||||
* @param {string} key
|
||||
* @param {Object} opts
|
||||
* @param {number} [opts.start]
|
||||
* @param {number} [opts.end]
|
||||
* @param {string} [opts.contentEncoding]
|
||||
* @param {SSECOptions} [opts.ssecOptions]
|
||||
* @return {Promise<NodeJS.ReadableStream>}
|
||||
*/
|
||||
async getObjectStream(bucketName, key, opts) {
|
||||
opts = opts || {}
|
||||
|
||||
@@ -88,6 +161,9 @@ module.exports = class S3Persistor extends AbstractPersistor {
|
||||
if (opts.start != null && opts.end != null) {
|
||||
params.Range = `bytes=${opts.start}-${opts.end}`
|
||||
}
|
||||
if (opts.ssecOptions) {
|
||||
Object.assign(params, opts.ssecOptions.getGetOptions())
|
||||
}
|
||||
const observer = new PersistorHelper.ObserverStream({
|
||||
metric: 's3.ingress', // ingress from S3 to us
|
||||
bucket: bucketName,
|
||||
@@ -102,10 +178,10 @@ module.exports = class S3Persistor extends AbstractPersistor {
|
||||
switch (statusCode) {
|
||||
case 200: // full response
|
||||
case 206: // partial response
|
||||
return resolve()
|
||||
return resolve(undefined)
|
||||
case 403: // AccessDenied is handled the same as NoSuchKey
|
||||
case 404: // NoSuchKey
|
||||
return reject(new NotFoundError())
|
||||
return reject(new NotFoundError('not found'))
|
||||
default:
|
||||
return reject(new Error('non success status: ' + statusCode))
|
||||
}
|
||||
@@ -131,17 +207,22 @@ module.exports = class S3Persistor extends AbstractPersistor {
|
||||
return pass
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} bucketName
|
||||
* @param {string} key
|
||||
* @return {Promise<string>}
|
||||
*/
|
||||
async getRedirectUrl(bucketName, key) {
|
||||
const expiresSeconds = Math.round(this.settings.signedUrlExpiryInMs / 1000)
|
||||
try {
|
||||
const url = await this._getClientForBucket(
|
||||
bucketName
|
||||
).getSignedUrlPromise('getObject', {
|
||||
Bucket: bucketName,
|
||||
Key: key,
|
||||
Expires: expiresSeconds,
|
||||
})
|
||||
return url
|
||||
return await this._getClientForBucket(bucketName).getSignedUrlPromise(
|
||||
'getObject',
|
||||
{
|
||||
Bucket: bucketName,
|
||||
Key: key,
|
||||
Expires: expiresSeconds,
|
||||
}
|
||||
)
|
||||
} catch (err) {
|
||||
throw PersistorHelper.wrapError(
|
||||
err,
|
||||
@@ -152,6 +233,12 @@ module.exports = class S3Persistor extends AbstractPersistor {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} bucketName
|
||||
* @param {string} key
|
||||
* @param {string} [continuationToken]
|
||||
* @return {Promise<void>}
|
||||
*/
|
||||
async deleteDirectory(bucketName, key, continuationToken) {
|
||||
let response
|
||||
const options = { Bucket: bucketName, Prefix: key }
|
||||
@@ -172,8 +259,8 @@ module.exports = class S3Persistor extends AbstractPersistor {
|
||||
)
|
||||
}
|
||||
|
||||
const objects = response.Contents.map(item => ({ Key: item.Key }))
|
||||
if (objects.length) {
|
||||
const objects = response.Contents?.map(item => ({ Key: item.Key || '' }))
|
||||
if (objects?.length) {
|
||||
try {
|
||||
await this._getClientForBucket(bucketName)
|
||||
.deleteObjects({
|
||||
@@ -203,12 +290,22 @@ module.exports = class S3Persistor extends AbstractPersistor {
|
||||
}
|
||||
}
|
||||
|
||||
async getObjectSize(bucketName, key) {
|
||||
/**
|
||||
* @param {string} bucketName
|
||||
* @param {string} key
|
||||
* @param {Object} opts
|
||||
* @param {SSECOptions} [opts.ssecOptions]
|
||||
* @return {Promise<S3.HeadObjectOutput>}
|
||||
*/
|
||||
async #headObject(bucketName, key, opts = {}) {
|
||||
const params = { Bucket: bucketName, Key: key }
|
||||
if (opts.ssecOptions) {
|
||||
Object.assign(params, opts.ssecOptions.getGetOptions())
|
||||
}
|
||||
try {
|
||||
const response = await this._getClientForBucket(bucketName)
|
||||
.headObject({ Bucket: bucketName, Key: key })
|
||||
return await this._getClientForBucket(bucketName)
|
||||
.headObject(params)
|
||||
.promise()
|
||||
return response.ContentLength
|
||||
} catch (err) {
|
||||
throw PersistorHelper.wrapError(
|
||||
err,
|
||||
@@ -219,19 +316,39 @@ module.exports = class S3Persistor extends AbstractPersistor {
|
||||
}
|
||||
}
|
||||
|
||||
async getObjectMd5Hash(bucketName, key) {
|
||||
/**
|
||||
* @param {string} bucketName
|
||||
* @param {string} key
|
||||
* @param {Object} opts
|
||||
* @param {SSECOptions} [opts.ssecOptions]
|
||||
* @return {Promise<number>}
|
||||
*/
|
||||
async getObjectSize(bucketName, key, opts = {}) {
|
||||
const response = await this.#headObject(bucketName, key, opts)
|
||||
return response.ContentLength || 0
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} bucketName
|
||||
* @param {string} key
|
||||
* @param {Object} opts
|
||||
* @param {SSECOptions} [opts.ssecOptions]
|
||||
* @param {boolean} [opts.etagIsNotMD5]
|
||||
* @return {Promise<string>}
|
||||
*/
|
||||
async getObjectMd5Hash(bucketName, key, opts = {}) {
|
||||
try {
|
||||
const response = await this._getClientForBucket(bucketName)
|
||||
.headObject({ Bucket: bucketName, Key: key })
|
||||
.promise()
|
||||
const md5 = S3Persistor._md5FromResponse(response)
|
||||
if (md5) {
|
||||
return md5
|
||||
if (!opts.etagIsNotMD5) {
|
||||
const response = await this.#headObject(bucketName, key, opts)
|
||||
const md5 = S3Persistor._md5FromResponse(response)
|
||||
if (md5) {
|
||||
return md5
|
||||
}
|
||||
}
|
||||
// etag is not in md5 format
|
||||
Metrics.inc('s3.md5Download')
|
||||
return await PersistorHelper.calculateStreamMd5(
|
||||
await this.getObjectStream(bucketName, key)
|
||||
await this.getObjectStream(bucketName, key, opts)
|
||||
)
|
||||
} catch (err) {
|
||||
throw PersistorHelper.wrapError(
|
||||
@@ -243,6 +360,11 @@ module.exports = class S3Persistor extends AbstractPersistor {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} bucketName
|
||||
* @param {string} key
|
||||
* @return {Promise<void>}
|
||||
*/
|
||||
async deleteObject(bucketName, key) {
|
||||
try {
|
||||
await this._getClientForBucket(bucketName)
|
||||
@@ -259,12 +381,27 @@ module.exports = class S3Persistor extends AbstractPersistor {
|
||||
}
|
||||
}
|
||||
|
||||
async copyObject(bucketName, sourceKey, destKey) {
|
||||
/**
|
||||
* @param {string} bucketName
|
||||
* @param {string} sourceKey
|
||||
* @param {string} destKey
|
||||
* @param {Object} opts
|
||||
* @param {SSECOptions} [opts.ssecSrcOptions]
|
||||
* @param {SSECOptions} [opts.ssecOptions]
|
||||
* @return {Promise<void>}
|
||||
*/
|
||||
async copyObject(bucketName, sourceKey, destKey, opts = {}) {
|
||||
const params = {
|
||||
Bucket: bucketName,
|
||||
Key: destKey,
|
||||
CopySource: `${bucketName}/${sourceKey}`,
|
||||
}
|
||||
if (opts.ssecSrcOptions) {
|
||||
Object.assign(params, opts.ssecSrcOptions.getCopyOptions())
|
||||
}
|
||||
if (opts.ssecOptions) {
|
||||
Object.assign(params, opts.ssecOptions.getPutOptions())
|
||||
}
|
||||
try {
|
||||
await this._getClientForBucket(bucketName).copyObject(params).promise()
|
||||
} catch (err) {
|
||||
@@ -277,9 +414,16 @@ module.exports = class S3Persistor extends AbstractPersistor {
|
||||
}
|
||||
}
|
||||
|
||||
async checkIfObjectExists(bucketName, key) {
|
||||
/**
|
||||
* @param {string} bucketName
|
||||
* @param {string} key
|
||||
* @param {Object} opts
|
||||
* @param {SSECOptions} [opts.ssecOptions]
|
||||
* @return {Promise<boolean>}
|
||||
*/
|
||||
async checkIfObjectExists(bucketName, key, opts) {
|
||||
try {
|
||||
await this.getObjectSize(bucketName, key)
|
||||
await this.getObjectSize(bucketName, key, opts)
|
||||
return true
|
||||
} catch (err) {
|
||||
if (err instanceof NotFoundError) {
|
||||
@@ -294,6 +438,12 @@ module.exports = class S3Persistor extends AbstractPersistor {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} bucketName
|
||||
* @param {string} key
|
||||
* @param {string} [continuationToken]
|
||||
* @return {Promise<number>}
|
||||
*/
|
||||
async directorySize(bucketName, key, continuationToken) {
|
||||
try {
|
||||
const options = {
|
||||
@@ -307,7 +457,8 @@ module.exports = class S3Persistor extends AbstractPersistor {
|
||||
.listObjectsV2(options)
|
||||
.promise()
|
||||
|
||||
const size = response.Contents.reduce((acc, item) => item.Size + acc, 0)
|
||||
const size =
|
||||
response.Contents?.reduce((acc, item) => (item.Size || 0) + acc, 0) || 0
|
||||
if (response.IsTruncated) {
|
||||
return (
|
||||
size +
|
||||
@@ -329,6 +480,12 @@ module.exports = class S3Persistor extends AbstractPersistor {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} bucket
|
||||
* @param {Object} [clientOptions]
|
||||
* @return {S3}
|
||||
* @private
|
||||
*/
|
||||
_getClientForBucket(bucket, clientOptions) {
|
||||
return new S3(
|
||||
this._buildClientOptions(
|
||||
@@ -338,6 +495,12 @@ module.exports = class S3Persistor extends AbstractPersistor {
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {Object} bucketCredentials
|
||||
* @param {Object} clientOptions
|
||||
* @return {Object}
|
||||
* @private
|
||||
*/
|
||||
_buildClientOptions(bucketCredentials, clientOptions) {
|
||||
const options = clientOptions || {}
|
||||
|
||||
@@ -376,6 +539,11 @@ module.exports = class S3Persistor extends AbstractPersistor {
|
||||
return options
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {S3.HeadObjectOutput} response
|
||||
* @return {string|null}
|
||||
* @private
|
||||
*/
|
||||
static _md5FromResponse(response) {
|
||||
const md5 = (response.ETag || '').replace(/[ "]/g, '')
|
||||
if (!md5.match(/^[a-f0-9]{32}$/)) {
|
||||
@@ -385,3 +553,8 @@ module.exports = class S3Persistor extends AbstractPersistor {
|
||||
return md5
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
S3Persistor,
|
||||
SSECOptions,
|
||||
}
|
||||
|
||||
@@ -32,7 +32,7 @@ describe('PersistorManager', function () {
|
||||
Settings = {}
|
||||
const requires = {
|
||||
'./GcsPersistor': GcsPersistor,
|
||||
'./S3Persistor': S3Persistor,
|
||||
'./S3Persistor': { S3Persistor },
|
||||
'./FSPersistor': FSPersistor,
|
||||
'@overleaf/logger': {
|
||||
info() {},
|
||||
|
||||
@@ -159,7 +159,7 @@ describe('S3PersistorTests', function () {
|
||||
crypto,
|
||||
},
|
||||
globals: { console, Buffer },
|
||||
}))(settings)
|
||||
}).S3Persistor)(settings)
|
||||
})
|
||||
|
||||
describe('getObjectStream', function () {
|
||||
|
||||
@@ -2,7 +2,7 @@ const { Writable, Readable, PassThrough, Transform } = require('stream')
|
||||
|
||||
/**
|
||||
* A writable stream that stores all data written to it in a node Buffer.
|
||||
* @extends stream.Writable
|
||||
* @extends Writable
|
||||
* @example
|
||||
* const { WritableBuffer } = require('@overleaf/stream-utils')
|
||||
* const bufferStream = new WritableBuffer()
|
||||
@@ -43,7 +43,7 @@ class WritableBuffer extends Writable {
|
||||
|
||||
/**
|
||||
* A readable stream created from a string.
|
||||
* @extends stream.Readable
|
||||
* @extends Readable
|
||||
* @example
|
||||
* const { ReadableString } = require('@overleaf/stream-utils')
|
||||
* const stringStream = new ReadableString('hello world')
|
||||
@@ -66,7 +66,7 @@ class SizeExceededError extends Error {}
|
||||
|
||||
/**
|
||||
* Limited size stream which will emit a SizeExceededError if the size is exceeded
|
||||
* @extends stream.Transform
|
||||
* @extends Transform
|
||||
*/
|
||||
class LimitedStream extends Transform {
|
||||
constructor(maxSize) {
|
||||
@@ -93,7 +93,7 @@ class AbortError extends Error {}
|
||||
|
||||
/**
|
||||
* TimeoutStream which will emit an AbortError if it exceeds a user specified timeout
|
||||
* @extends stream.PassThrough
|
||||
* @extends PassThrough
|
||||
*/
|
||||
class TimeoutStream extends PassThrough {
|
||||
constructor(timeout) {
|
||||
@@ -111,7 +111,7 @@ class TimeoutStream extends PassThrough {
|
||||
|
||||
/**
|
||||
* LoggerStream which will call the provided logger function when the stream exceeds a user specified limit. It will call the provided function again when flushing the stream and it exceeded the user specified limit before.
|
||||
* @extends stream.Transform
|
||||
* @extends Transform
|
||||
*/
|
||||
class LoggerStream extends Transform {
|
||||
/**
|
||||
|
||||
Reference in New Issue
Block a user