Files
overleaf-cep/libraries/object-persistor/src/S3Persistor.js
T
Eric Mc Sween 523ff9c4cd Support metadata when uploading objects
Add contentType and contentEncoding options to sendStream(). These
options will set the corresponding metadata on the object.

This changes the API. The fourth argument to sendStream() used to be the
source md5. Now, it's an options object and the source md5 is a property
of that object.
2020-07-08 08:13:53 -04:00

360 lines
8.9 KiB
JavaScript

const http = require('http')
const https = require('https')
if (http.globalAgent.maxSockets < 300) {
http.globalAgent.maxSockets = 300
}
if (https.globalAgent.maxSockets < 300) {
https.globalAgent.maxSockets = 300
}
const AbstractPersistor = require('./AbstractPersistor')
const PersistorHelper = require('./PersistorHelper')
const fs = require('fs')
const S3 = require('aws-sdk/clients/s3')
const { URL } = require('url')
const {
WriteError,
ReadError,
NotFoundError,
SettingsError
} = require('./Errors')
module.exports = class S3Persistor extends AbstractPersistor {
constructor(settings = {}) {
super()
this.settings = settings
}
async sendFile(bucketName, key, fsPath) {
return this.sendStream(bucketName, key, fs.createReadStream(fsPath))
}
async sendStream(bucketName, key, readStream, opts = {}) {
try {
// egress from us to S3
const observeOptions = {
metric: 's3.egress',
Metrics: this.settings.Metrics
}
const observer = new PersistorHelper.ObserverStream(observeOptions)
// observer will catch errors, clean up and log a warning
readStream.pipe(observer)
// if we have an md5 hash, pass this to S3 to verify the upload
const uploadOptions = {
Bucket: bucketName,
Key: key,
Body: observer
}
if (opts.contentType) {
uploadOptions.ContentType = opts.contentType
}
if (opts.contentEncoding) {
uploadOptions.ContentEncoding = opts.contentEncoding
}
// if we have an md5 hash, pass this to S3 to verify the upload - otherwise
// we rely on the S3 client's checksum calculation to validate the upload
const clientOptions = {}
if (opts.sourceMd5) {
uploadOptions.ContentMD5 = PersistorHelper.hexToBase64(opts.sourceMd5)
} else {
clientOptions.computeChecksums = true
}
await this._getClientForBucket(bucketName, clientOptions)
.upload(uploadOptions, { partSize: this.settings.partSize })
.promise()
} catch (err) {
throw PersistorHelper.wrapError(
err,
'upload to S3 failed',
{ bucketName, key },
WriteError
)
}
}
async getObjectStream(bucketName, key, opts) {
opts = opts || {}
const params = {
Bucket: bucketName,
Key: key
}
if (opts.start != null && opts.end != null) {
params.Range = `bytes=${opts.start}-${opts.end}`
}
const stream = this._getClientForBucket(bucketName)
.getObject(params)
.createReadStream()
// ingress from S3 to us
const observer = new PersistorHelper.ObserverStream({
metric: 's3.ingress',
Metrics: this.settings.Metrics
})
try {
// wait for the pipeline to be ready, to catch non-200s
await PersistorHelper.getReadyPipeline(stream, observer)
return observer
} catch (err) {
throw PersistorHelper.wrapError(
err,
'error reading file from S3',
{ bucketName, key, opts },
ReadError
)
}
}
async getRedirectUrl() {
// not implemented
return null
}
async deleteDirectory(bucketName, key, continuationToken) {
let response
const options = { Bucket: bucketName, Prefix: key }
if (continuationToken) {
options.ContinuationToken = continuationToken
}
try {
response = await this._getClientForBucket(bucketName)
.listObjectsV2(options)
.promise()
} catch (err) {
throw PersistorHelper.wrapError(
err,
'failed to list objects in S3',
{ bucketName, key },
ReadError
)
}
const objects = response.Contents.map((item) => ({ Key: item.Key }))
if (objects.length) {
try {
await this._getClientForBucket(bucketName)
.deleteObjects({
Bucket: bucketName,
Delete: {
Objects: objects,
Quiet: true
}
})
.promise()
} catch (err) {
throw PersistorHelper.wrapError(
err,
'failed to delete objects in S3',
{ bucketName, key },
WriteError
)
}
}
if (response.IsTruncated) {
await this.deleteDirectory(
bucketName,
key,
response.NextContinuationToken
)
}
}
async getObjectSize(bucketName, key) {
try {
const response = await this._getClientForBucket(bucketName)
.headObject({ Bucket: bucketName, Key: key })
.promise()
return response.ContentLength
} catch (err) {
throw PersistorHelper.wrapError(
err,
'error getting size of s3 object',
{ bucketName, key },
ReadError
)
}
}
async getObjectMd5Hash(bucketName, key) {
try {
const response = await this._getClientForBucket(bucketName)
.headObject({ Bucket: bucketName, Key: key })
.promise()
const md5 = S3Persistor._md5FromResponse(response)
if (md5) {
return md5
}
// etag is not in md5 format
if (this.settings.Metrics) {
this.settings.Metrics.inc('s3.md5Download')
}
return PersistorHelper.calculateStreamMd5(
await this.getObjectStream(bucketName, key)
)
} catch (err) {
throw PersistorHelper.wrapError(
err,
'error getting hash of s3 object',
{ bucketName, key },
ReadError
)
}
}
async deleteObject(bucketName, key) {
try {
await this._getClientForBucket(bucketName)
.deleteObject({ Bucket: bucketName, Key: key })
.promise()
} catch (err) {
// s3 does not give us a NotFoundError here
throw PersistorHelper.wrapError(
err,
'failed to delete file in S3',
{ bucketName, key },
WriteError
)
}
}
async copyObject(bucketName, sourceKey, destKey) {
const params = {
Bucket: bucketName,
Key: destKey,
CopySource: `${bucketName}/${sourceKey}`
}
try {
await this._getClientForBucket(bucketName).copyObject(params).promise()
} catch (err) {
throw PersistorHelper.wrapError(
err,
'failed to copy file in S3',
params,
WriteError
)
}
}
async checkIfObjectExists(bucketName, key) {
try {
await this.getObjectSize(bucketName, key)
return true
} catch (err) {
if (err instanceof NotFoundError) {
return false
}
throw PersistorHelper.wrapError(
err,
'error checking whether S3 object exists',
{ bucketName, key },
ReadError
)
}
}
async directorySize(bucketName, key, continuationToken) {
try {
const options = {
Bucket: bucketName,
Prefix: key
}
if (continuationToken) {
options.ContinuationToken = continuationToken
}
const response = await this._getClientForBucket(bucketName)
.listObjectsV2(options)
.promise()
const size = response.Contents.reduce((acc, item) => item.Size + acc, 0)
if (response.IsTruncated) {
return (
size +
(await this.directorySize(
bucketName,
key,
response.NextContinuationToken
))
)
}
return size
} catch (err) {
throw PersistorHelper.wrapError(
err,
'error getting directory size in S3',
{ bucketName, key },
ReadError
)
}
}
_getClientForBucket(bucket, clientOptions) {
if (this.settings.bucketCreds && this.settings.bucketCreds[bucket]) {
return new S3(
this._buildClientOptions(
this.settings.bucketCreds[bucket],
clientOptions
)
)
}
// no specific credentials for the bucket
if (this.settings.key) {
return new S3(this._buildClientOptions(null, clientOptions))
}
throw new SettingsError(
'no bucket-specific or default credentials provided',
{ bucket }
)
}
_buildClientOptions(bucketCredentials, clientOptions) {
const options = clientOptions || {}
if (bucketCredentials) {
options.credentials = {
accessKeyId: bucketCredentials.auth_key,
secretAccessKey: bucketCredentials.auth_secret
}
} else {
options.credentials = {
accessKeyId: this.settings.key,
secretAccessKey: this.settings.secret
}
}
if (this.settings.endpoint) {
const endpoint = new URL(this.settings.endpoint)
options.endpoint = this.settings.endpoint
options.sslEnabled = endpoint.protocol === 'https'
}
// path-style access is only used for acceptance tests
if (this.settings.pathStyle) {
options.s3ForcePathStyle = true
}
return options
}
static _md5FromResponse(response) {
const md5 = (response.ETag || '').replace(/[ "]/g, '')
if (!md5.match(/^[a-f0-9]{32}$/)) {
return null
}
return md5
}
}