diff --git a/services/filestore/app/js/MigrationPersistor.js b/services/filestore/app/js/MigrationPersistor.js index 3a4789693b..3ddc762922 100644 --- a/services/filestore/app/js/MigrationPersistor.js +++ b/services/filestore/app/js/MigrationPersistor.js @@ -1,10 +1,12 @@ const metrics = require('metrics-sharelatex') const Settings = require('settings-sharelatex') const logger = require('logger-sharelatex') -const Minipass = require('minipass') -const { callbackify } = require('util') +const Stream = require('stream') +const { callbackify, promisify } = require('util') const { NotFoundError, WriteError } = require('./Errors') +const pipeline = promisify(Stream.pipeline) + // Persistor that wraps two other persistors. Talks to the 'primary' by default, // but will fall back to an older persistor in the case of a not-found error. // If `Settings.filestore.fallback.copyOnMiss` is set, this will copy files from the fallback @@ -29,14 +31,86 @@ module.exports = function(primary, fallback) { } } + async function getFileStreamWithFallback(bucket, key, opts) { + const shouldCopy = + Settings.filestore.fallback.copyOnMiss && !opts.start && !opts.end + + try { + return await primary.promises.getFileStream(bucket, key, opts) + } catch (err) { + if (err instanceof NotFoundError) { + const fallbackBucket = _getFallbackBucket(bucket) + const fallbackStream = await fallback.promises.getFileStream( + fallbackBucket, + key, + opts + ) + // tee the stream to the client, and as a copy to the primary (if necessary) + // start listening on both straight away so that we don't consume bytes + // in one place before the other + const returnStream = new Stream.PassThrough() + pipeline(fallbackStream, returnStream) + + if (shouldCopy) { + const copyStream = new Stream.PassThrough() + pipeline(fallbackStream, copyStream) + + _copyStreamFromFallbackAndVerify( + copyStream, + fallbackBucket, + bucket, + key, + key + ).catch(() => { + // swallow errors, as this runs in the background and will log a warning + }) + } + return returnStream + } + throw err + } + } + async function copyFileWithFallback(bucket, sourceKey, destKey) { try { return await primary.promises.copyFile(bucket, sourceKey, destKey) } catch (err) { if (err instanceof NotFoundError) { const fallbackBucket = _getFallbackBucket(bucket) - return _copyFileFromFallback(fallbackBucket, bucket, sourceKey, destKey) + const fallbackStream = await fallback.promises.getFileStream( + fallbackBucket, + sourceKey, + {} + ) + + const copyStream = new Stream.PassThrough() + pipeline(fallbackStream, copyStream) + + if (Settings.filestore.fallback.copyOnMiss) { + const missStream = new Stream.PassThrough() + pipeline(fallbackStream, missStream) + + // copy from sourceKey -> sourceKey + _copyStreamFromFallbackAndVerify( + missStream, + fallbackBucket, + bucket, + sourceKey, + sourceKey + ).then(() => { + // swallow errors, as this runs in the background and will log a warning + }) + } + // copy from sourceKey -> destKey + return _copyStreamFromFallbackAndVerify( + copyStream, + fallbackBucket, + bucket, + sourceKey, + destKey + ) } + throw err } } @@ -44,20 +118,29 @@ module.exports = function(primary, fallback) { return Settings.filestore.fallback.buckets[bucket] } - function _wrapFallbackMethod(method, enableCopy = true) { + function _wrapFallbackMethod(method) { return async function(bucket, key, ...moreArgs) { try { return await primary.promises[method](bucket, key, ...moreArgs) } catch (err) { if (err instanceof NotFoundError) { const fallbackBucket = _getFallbackBucket(bucket) - if (Settings.filestore.fallback.copyOnMiss && enableCopy) { - // run in background - _copyFileFromFallback(fallbackBucket, bucket, key, key).catch( - err => { - logger.warn({ err }, 'failed to copy file from fallback') - } + if (Settings.filestore.fallback.copyOnMiss) { + const fallbackStream = await fallback.promises.getFileStream( + fallbackBucket, + key, + {} ) + // run in background + _copyStreamFromFallbackAndVerify( + fallbackStream, + fallbackBucket, + bucket, + key, + key + ).catch(err => { + logger.warn({ err }, 'failed to copy file from fallback') + }) } return fallback.promises[method](fallbackBucket, key, ...moreArgs) } @@ -66,32 +149,7 @@ module.exports = function(primary, fallback) { } } - async function _getFileStreamAndCopyIfRequired(bucketName, key, opts) { - const shouldCopy = - Settings.filestore.fallback.copyOnMiss && !opts.start && !opts.end - - try { - return await primary.promises.getFileStream(bucketName, key, opts) - } catch (err) { - if (err instanceof NotFoundError) { - const fallbackBucket = _getFallbackBucket(bucketName) - if (shouldCopy) { - return _copyFileFromFallback( - fallbackBucket, - bucketName, - key, - key, - true - ) - } else { - return fallback.promises.getFileStream(fallbackBucket, key, opts) - } - } - throw err - } - } - - async function _copyFromFallbackStreamAndVerify( + async function _copyStreamFromFallbackAndVerify( stream, sourceBucket, destBucket, @@ -139,63 +197,12 @@ module.exports = function(primary, fallback) { } } - async function _copyFileFromFallback( - sourceBucket, - destBucket, - sourceKey, - destKey, - returnStream = false - ) { - metrics.inc('fallback.copy') - const sourceStream = await fallback.promises.getFileStream( - sourceBucket, - sourceKey, - {} - ) - - if (!returnStream) { - return _copyFromFallbackStreamAndVerify( - sourceStream, - sourceBucket, - destBucket, - sourceKey, - destKey - ) - } - - const tee = new Minipass() - const clientStream = new Minipass() - const copyStream = new Minipass() - - tee.pipe(clientStream) - tee.pipe(copyStream) - - // copy the file in the background - _copyFromFallbackStreamAndVerify( - copyStream, - sourceBucket, - destBucket, - sourceKey, - destKey - ).catch( - // the error handler in this method will log a metric and a warning, so - // we don't need to do anything extra here, but catching it will prevent - // unhandled promise rejection warnings - () => {} - ) - - // start piping the source stream into the tee after everything is set up, - // otherwise one stream may consume bytes that don't arrive at the other - sourceStream.pipe(tee) - return clientStream - } - return { primaryPersistor: primary, fallbackPersistor: fallback, sendFile: primary.sendFile, sendStream: primary.sendStream, - getFileStream: callbackify(_getFileStreamAndCopyIfRequired), + getFileStream: callbackify(getFileStreamWithFallback), getFileMd5Hash: callbackify(_wrapFallbackMethod('getFileMd5Hash')), deleteDirectory: callbackify( _wrapMethodOnBothPersistors('deleteDirectory') @@ -208,7 +215,7 @@ module.exports = function(primary, fallback) { promises: { sendFile: primary.promises.sendFile, sendStream: primary.promises.sendStream, - getFileStream: _getFileStreamAndCopyIfRequired, + getFileStream: getFileStreamWithFallback, getFileMd5Hash: _wrapFallbackMethod('getFileMd5Hash'), deleteDirectory: _wrapMethodOnBothPersistors('deleteDirectory'), getFileSize: _wrapFallbackMethod('getFileSize'), diff --git a/services/filestore/app/js/S3Persistor.js b/services/filestore/app/js/S3Persistor.js index ef465da25c..a10251a642 100644 --- a/services/filestore/app/js/S3Persistor.js +++ b/services/filestore/app/js/S3Persistor.js @@ -7,13 +7,13 @@ const settings = require('settings-sharelatex') const metrics = require('metrics-sharelatex') const logger = require('logger-sharelatex') -const Minipass = require('minipass') const meter = require('stream-meter') +const Stream = require('stream') const crypto = require('crypto') const fs = require('fs') const S3 = require('aws-sdk/clients/s3') const { URL } = require('url') -const { callbackify } = require('util') +const { callbackify, promisify } = require('util') const { WriteError, ReadError, @@ -46,6 +46,8 @@ module.exports = { } } +const pipeline = promisify(Stream.pipeline) + function hexToBase64(hex) { return Buffer.from(hex, 'hex').toString('base64') } @@ -68,7 +70,6 @@ async function sendFile(bucketName, key, fsPath) { async function sendStream(bucketName, key, readStream, sourceMd5) { try { // if there is no supplied md5 hash, we calculate the hash as the data passes through - const passthroughStream = new Minipass() let hashPromise let b64Hash @@ -77,29 +78,24 @@ async function sendStream(bucketName, key, readStream, sourceMd5) { } else { const hash = crypto.createHash('md5') hash.setEncoding('hex') - passthroughStream.pipe(hash) + pipeline(readStream, hash) hashPromise = new Promise((resolve, reject) => { - passthroughStream.on('end', () => { + readStream.on('end', () => { hash.end() resolve(hash.read()) }) - passthroughStream.on('error', err => { + readStream.on('error', err => { reject(err) }) }) } const meteredStream = meter() - passthroughStream.pipe(meteredStream) meteredStream.on('finish', () => { metrics.count('s3.egress', meteredStream.bytes) }) - // pipe the readstream through minipass, which can write to both the metered - // stream (which goes on to S3) and the md5 generator if necessary - // - we do this last so that a listener streams does not consume data meant - // for both destinations - readStream.pipe(passthroughStream) + pipeline(readStream, meteredStream) // if we have an md5 hash, pass this to S3 to verify the upload const uploadOptions = { diff --git a/services/filestore/test/acceptance/js/FilestoreTests.js b/services/filestore/test/acceptance/js/FilestoreTests.js index 1c96445a3a..1d927618e5 100644 --- a/services/filestore/test/acceptance/js/FilestoreTests.js +++ b/services/filestore/test/acceptance/js/FilestoreTests.js @@ -497,16 +497,16 @@ describe('Filestore', function() { ) }) - it('should fetch the file', async function() { - const res = await rp.get(fileUrl) - expect(res.body).to.equal(constantFileContent) - }) - describe('when copyOnMiss is disabled', function() { beforeEach(function() { Settings.filestore.fallback.copyOnMiss = false }) + it('should fetch the file', async function() { + const res = await rp.get(fileUrl) + expect(res.body).to.equal(constantFileContent) + }) + it('should not copy the file to the primary', async function() { await rp.get(fileUrl) @@ -523,6 +523,11 @@ describe('Filestore', function() { Settings.filestore.fallback.copyOnMiss = true }) + it('should fetch the file', async function() { + const res = await rp.get(fileUrl) + expect(res.body).to.equal(constantFileContent) + }) + it('copies the file to the primary', async function() { await rp.get(fileUrl) // wait for the file to copy in the background @@ -578,21 +583,51 @@ describe('Filestore', function() { ) }) - it('should not copy the old file to the new bucket', async function() { - await expectPersistorNotToHaveFile( - app.persistor.primaryPersistor, - bucket, - fileKey - ) + describe('when copyOnMiss is false', function() { + beforeEach(function() { + Settings.filestore.fallback.copyOnMiss = false + }) + + it('should create a new file in the new bucket', async function() { + await expectPersistorToHaveFile( + app.persistor.primaryPersistor, + bucket, + newFileKey, + constantFileContent + ) + }) + + it('should not copy the old file to the primary with the old key', async function() { + await expectPersistorNotToHaveFile( + app.persistor.primaryPersistor, + bucket, + fileKey + ) + }) }) - it('should create a new file in the new bucket', async function() { - await expectPersistorToHaveFile( - app.persistor.primaryPersistor, - bucket, - newFileKey, - constantFileContent - ) + describe('when copyOnMiss is true', function() { + beforeEach(function() { + Settings.filestore.fallback.copyOnMiss = true + }) + + it('should create a new file in the new bucket', async function() { + await expectPersistorToHaveFile( + app.persistor.primaryPersistor, + bucket, + newFileKey, + constantFileContent + ) + }) + + it('should copy the old file to the primary with the old key', async function() { + await expectPersistorToHaveFile( + app.persistor.primaryPersistor, + bucket, + fileKey, + constantFileContent + ) + }) }) }) }) diff --git a/services/filestore/test/unit/js/MigrationPersistorTests.js b/services/filestore/test/unit/js/MigrationPersistorTests.js index 83159f38ad..db8401c78c 100644 --- a/services/filestore/test/unit/js/MigrationPersistorTests.js +++ b/services/filestore/test/unit/js/MigrationPersistorTests.js @@ -26,8 +26,8 @@ describe('MigrationPersistorTests', function() { let Metrics, Settings, Logger, + Stream, MigrationPersistor, - Minipass, fileStream, newPersistor @@ -82,24 +82,22 @@ describe('MigrationPersistorTests', function() { inc: sinon.stub() } + Stream = { + pipeline: sinon.stub().yields(), + PassThrough: sinon.stub() + } + Logger = { warn: sinon.stub() } - Minipass = sinon.stub() - Minipass.prototype.on = sinon - .stub() - .withArgs('end') - .yields() - Minipass.prototype.pipe = sinon.stub() - MigrationPersistor = SandboxedModule.require(modulePath, { requires: { 'settings-sharelatex': Settings, + stream: Stream, './Errors': Errors, 'metrics-sharelatex': Metrics, - 'logger-sharelatex': Logger, - minipass: Minipass + 'logger-sharelatex': Logger }, globals: { console } }) @@ -155,7 +153,7 @@ describe('MigrationPersistorTests', function() { }) it('should return the file stream', function() { - expect(response).to.equal(fileStream) + expect(response).to.be.an.instanceOf(Stream.PassThrough) }) it('should fetch the file from the primary persistor with the correct options', function() { @@ -215,13 +213,13 @@ describe('MigrationPersistorTests', function() { ).to.have.been.calledWithExactly( bucket, key, - sinon.match.instanceOf(Minipass), + sinon.match.instanceOf(Stream.PassThrough), md5 ) }) it('should send a stream to the client', function() { - expect(returnedStream).to.be.an.instanceOf(Minipass) + expect(returnedStream).to.be.an.instanceOf(Stream.PassThrough) }) }) @@ -476,7 +474,12 @@ describe('MigrationPersistorTests', function() { it('should send the file to the primary', function() { expect( primaryPersistor.promises.sendStream - ).to.have.been.calledWithExactly(bucket, destKey, fileStream, md5) + ).to.have.been.calledWithExactly( + bucket, + destKey, + sinon.match.instanceOf(Stream.PassThrough), + md5 + ) }) }) diff --git a/services/filestore/test/unit/js/S3PersistorTests.js b/services/filestore/test/unit/js/S3PersistorTests.js index 4f700c8797..b9711572c2 100644 --- a/services/filestore/test/unit/js/S3PersistorTests.js +++ b/services/filestore/test/unit/js/S3PersistorTests.js @@ -35,6 +35,7 @@ describe('S3PersistorTests', function() { Meter, MeteredStream, ReadStream, + Stream, S3Persistor, S3Client, S3ReadStream, @@ -43,7 +44,6 @@ describe('S3PersistorTests', function() { FileNotFoundError, EmptyPromise, settings, - Minipass, Hash, crypto @@ -61,6 +61,10 @@ describe('S3PersistorTests', function() { } } + Stream = { + pipeline: sinon.stub().yields() + } + EmptyPromise = { promise: sinon.stub().resolves() } @@ -70,7 +74,11 @@ describe('S3PersistorTests', function() { } ReadStream = { - pipe: sinon.stub().returns('readStream') + pipe: sinon.stub().returns('readStream'), + on: sinon + .stub() + .withArgs('end') + .yields() } FileNotFoundError = new Error('File not found') @@ -132,13 +140,6 @@ describe('S3PersistorTests', function() { createHash: sinon.stub().returns(Hash) } - Minipass = sinon.stub() - Minipass.prototype.on = sinon - .stub() - .withArgs('end') - .yields() - Minipass.prototype.pipe = sinon.stub() - Logger = { warn: sinon.stub() } @@ -151,8 +152,8 @@ describe('S3PersistorTests', function() { './Errors': Errors, fs: Fs, 'stream-meter': Meter, + stream: Stream, 'metrics-sharelatex': Metrics, - minipass: Minipass, crypto }, globals: { console } @@ -456,7 +457,10 @@ describe('S3PersistorTests', function() { }) it('should meter the stream', function() { - expect(Minipass.prototype.pipe).to.have.been.calledWith(MeteredStream) + expect(Stream.pipeline).to.have.been.calledWith( + ReadStream, + MeteredStream + ) }) it('should record an egress metric', function() { @@ -464,7 +468,7 @@ describe('S3PersistorTests', function() { }) it('calculates the md5 hash of the file', function() { - expect(Minipass.prototype.pipe).to.have.been.calledWith(Hash) + expect(Stream.pipeline).to.have.been.calledWith(ReadStream, Hash) }) }) @@ -479,7 +483,10 @@ describe('S3PersistorTests', function() { }) it('should not calculate the md5 hash of the file', function() { - expect(Minipass.prototype.pipe).not.to.have.been.calledWith(Hash) + expect(Stream.pipeline).not.to.have.been.calledWith( + sinon.match.any, + Hash + ) }) it('sends the hash in base64', function() {