From 366ce97169addea6cc65b34a7cb24cf8886897b8 Mon Sep 17 00:00:00 2001 From: Simon Detheridge Date: Wed, 12 Feb 2020 11:00:45 +0000 Subject: [PATCH] Add GCS Persistor --- services/filestore/app/js/GcsPersistor.js | 295 ++++++++++++++++++ services/filestore/app/js/PersistorManager.js | 2 + 2 files changed, 297 insertions(+) create mode 100644 services/filestore/app/js/GcsPersistor.js diff --git a/services/filestore/app/js/GcsPersistor.js b/services/filestore/app/js/GcsPersistor.js new file mode 100644 index 0000000000..350b0d451c --- /dev/null +++ b/services/filestore/app/js/GcsPersistor.js @@ -0,0 +1,295 @@ +const settings = require('settings-sharelatex') +const metrics = require('metrics-sharelatex') +const fs = require('fs') +const { promisify } = require('util') +const Stream = require('stream') +const { Storage } = require('@google-cloud/storage') +const { callbackify } = require('util') +const { WriteError, ReadError, NotFoundError } = require('./Errors') +const PersistorHelper = require('./PersistorHelper') + +const pipeline = promisify(Stream.pipeline) + +function base64ToHex(base64) { + return Buffer.from(base64, 'base64').toString('hex') +} + +// both of these settings will be null by default except for tests +// that's OK - GCS uses the locally-configured service account by default +const storage = new Storage(settings.filestore.gcs) +// workaround for broken uploads with custom endpoints: +// https://github.com/googleapis/nodejs-storage/issues/898 +if (settings.filestore.gcs.apiEndpoint) { + storage.interceptors.push({ + request: function(reqOpts) { + const url = new URL(reqOpts.uri) + url.host = settings.filestore.gcs.apiEndpoint + reqOpts.uri = url.toString() + return reqOpts + } + }) +} + +const GcsPersistor = { + sendFile: callbackify(sendFile), + sendStream: callbackify(sendStream), + getFileStream: callbackify(getFileStream), + getFileMd5Hash: callbackify(getFileMd5Hash), + deleteDirectory: callbackify(deleteDirectory), + getFileSize: callbackify(getFileSize), + deleteFile: callbackify(deleteFile), + copyFile: callbackify(copyFile), + checkIfFileExists: callbackify(checkIfFileExists), + directorySize: callbackify(directorySize), + promises: { + sendFile, + sendStream, + getFileStream, + getFileMd5Hash, + deleteDirectory, + getFileSize, + deleteFile, + copyFile, + checkIfFileExists, + directorySize + } +} + +module.exports = GcsPersistor + +async function sendFile(bucket, key, fsPath) { + let readStream + try { + readStream = fs.createReadStream(fsPath) + } catch (err) { + throw PersistorHelper.wrapError( + err, + 'error reading file from disk', + { bucketName: bucket, key, fsPath }, + ReadError + ) + } + return sendStream(bucket, key, readStream) +} + +async function sendStream(bucket, key, readStream, sourceMd5) { + try { + let hashPromise + + // if there is no supplied md5 hash, we calculate the hash as the data passes through + if (!sourceMd5) { + hashPromise = PersistorHelper.calculateStreamMd5(readStream) + } + + const meteredStream = PersistorHelper.getMeteredStream( + readStream, + (_, byteCount) => { + metrics.count('gcs.egress', byteCount) + } + ) + + const writeOptions = { + resumable: false // recommended by Google + } + + if (sourceMd5) { + writeOptions.validation = 'md5' + writeOptions.metadata = { + md5Hash: sourceMd5 + } + } + + const uploadStream = storage + .bucket(bucket) + .file(key) + .createWriteStream(writeOptions) + + await pipeline(meteredStream, uploadStream) + + // if we didn't have an md5 hash, we should compare our computed one with Google's + // as we couldn't tell GCS about it beforehand + if (hashPromise) { + sourceMd5 = await hashPromise + // throws on mismatch + await PersistorHelper.verifyMd5(GcsPersistor, bucket, key, sourceMd5) + } + } catch (err) { + throw PersistorHelper.wrapError( + err, + 'upload to GCS failed', + { bucket, key }, + WriteError + ) + } +} + +async function getFileStream(bucket, key, opts) { + if (opts.end) { + // S3 (and http range headers) treat 'end' as inclusive, so increase this by 1 + opts.end++ + } + const stream = storage + .bucket(bucket) + .file(key) + .createReadStream(opts) + + const meteredStream = PersistorHelper.getMeteredStream(stream, (_, bytes) => { + // ignore the error parameter and just log the byte count + metrics.count('gcs.ingress', bytes) + }) + + try { + await PersistorHelper.waitForStreamReady(stream) + return meteredStream + } catch (err) { + throw PersistorHelper.wrapError( + err, + 'error reading file from GCS', + { bucket, key, opts }, + ReadError + ) + } +} + +async function getFileSize(bucket, key) { + try { + const metadata = await storage + .bucket(bucket) + .file(key) + .getMetadata() + return metadata[0].size + } catch (err) { + throw PersistorHelper.wrapError( + err, + 'error getting size of GCS object', + { bucket, key }, + ReadError + ) + } +} + +async function getFileMd5Hash(bucket, key) { + try { + const metadata = await storage + .bucket(bucket) + .file(key) + .getMetadata() + return base64ToHex(metadata[0].md5Hash) + } catch (err) { + throw PersistorHelper.wrapError( + err, + 'error getting hash of GCS object', + { bucket, key }, + ReadError + ) + } +} + +async function deleteFile(bucket, key) { + try { + await storage + .bucket(bucket) + .file(key) + .delete() + } catch (err) { + const error = PersistorHelper.wrapError( + err, + 'error deleting GCS object', + { bucket, key }, + WriteError + ) + if (!(error instanceof NotFoundError)) { + throw error + } + } +} + +async function deleteDirectory(bucket, key) { + let files + + try { + const response = await storage.bucket(bucket).getFiles({ directory: key }) + files = response[0] + } catch (err) { + const error = PersistorHelper.wrapError( + err, + 'failed to list objects in GCS', + { bucket, key }, + ReadError + ) + if (error instanceof NotFoundError) { + return + } + throw error + } + + for (const index in files) { + try { + await files[index].delete() + } catch (err) { + const error = PersistorHelper.wrapError( + err, + 'failed to delete object in GCS', + { bucket, key }, + WriteError + ) + if (!(error instanceof NotFoundError)) { + throw error + } + } + } +} + +async function directorySize(bucket, key) { + let files + + try { + const response = await storage.bucket(bucket).getFiles({ directory: key }) + files = response[0] + } catch (err) { + throw PersistorHelper.wrapError( + err, + 'failed to list objects in GCS', + { bucket, key }, + ReadError + ) + } + + return files.reduce((acc, file) => Number(file.metadata.size) + acc, 0) +} + +async function checkIfFileExists(bucket, key) { + try { + const response = await storage + .bucket(bucket) + .file(key) + .exists() + return response[0] + } catch (err) { + throw PersistorHelper.wrapError( + err, + 'error checking if file exists in GCS', + { bucket, key }, + ReadError + ) + } +} + +async function copyFile(bucket, sourceKey, destKey) { + try { + const src = storage.bucket(bucket).file(sourceKey) + const dest = storage.bucket(bucket).file(destKey) + await src.copy(dest) + } catch (err) { + // fake-gcs-server has a bug that returns an invalid response when the file does not exist + if (err.message === 'Cannot parse response as JSON: not found\n') { + err.code = 404 + } + throw PersistorHelper.wrapError( + err, + 'failed to copy file in GCS', + { bucket, sourceKey, destKey }, + WriteError + ) + } +} diff --git a/services/filestore/app/js/PersistorManager.js b/services/filestore/app/js/PersistorManager.js index 32f6cd41f8..d26ab77a92 100644 --- a/services/filestore/app/js/PersistorManager.js +++ b/services/filestore/app/js/PersistorManager.js @@ -19,6 +19,8 @@ function getPersistor(backend) { return require('./S3Persistor') case 'fs': return require('./FSPersistor') + case 'gcs': + return require('./GcsPersistor') default: throw new Error(`unknown filestore backend: ${backend}`) }