Remove minipass as dependency and refactor to make things clearer

This commit is contained in:
Simon Detheridge
2020-02-03 15:55:17 +00:00
parent 5d5d325691
commit 9e0b378948
5 changed files with 194 additions and 146 deletions

View File

@@ -1,10 +1,12 @@
const metrics = require('metrics-sharelatex')
const Settings = require('settings-sharelatex')
const logger = require('logger-sharelatex')
const Minipass = require('minipass')
const { callbackify } = require('util')
const Stream = require('stream')
const { callbackify, promisify } = require('util')
const { NotFoundError, WriteError } = require('./Errors')
const pipeline = promisify(Stream.pipeline)
// Persistor that wraps two other persistors. Talks to the 'primary' by default,
// but will fall back to an older persistor in the case of a not-found error.
// If `Settings.filestore.fallback.copyOnMiss` is set, this will copy files from the fallback
@@ -29,14 +31,86 @@ module.exports = function(primary, fallback) {
}
}
async function getFileStreamWithFallback(bucket, key, opts) {
const shouldCopy =
Settings.filestore.fallback.copyOnMiss && !opts.start && !opts.end
try {
return await primary.promises.getFileStream(bucket, key, opts)
} catch (err) {
if (err instanceof NotFoundError) {
const fallbackBucket = _getFallbackBucket(bucket)
const fallbackStream = await fallback.promises.getFileStream(
fallbackBucket,
key,
opts
)
// tee the stream to the client, and as a copy to the primary (if necessary)
// start listening on both straight away so that we don't consume bytes
// in one place before the other
const returnStream = new Stream.PassThrough()
pipeline(fallbackStream, returnStream)
if (shouldCopy) {
const copyStream = new Stream.PassThrough()
pipeline(fallbackStream, copyStream)
_copyStreamFromFallbackAndVerify(
copyStream,
fallbackBucket,
bucket,
key,
key
).catch(() => {
// swallow errors, as this runs in the background and will log a warning
})
}
return returnStream
}
throw err
}
}
async function copyFileWithFallback(bucket, sourceKey, destKey) {
try {
return await primary.promises.copyFile(bucket, sourceKey, destKey)
} catch (err) {
if (err instanceof NotFoundError) {
const fallbackBucket = _getFallbackBucket(bucket)
return _copyFileFromFallback(fallbackBucket, bucket, sourceKey, destKey)
const fallbackStream = await fallback.promises.getFileStream(
fallbackBucket,
sourceKey,
{}
)
const copyStream = new Stream.PassThrough()
pipeline(fallbackStream, copyStream)
if (Settings.filestore.fallback.copyOnMiss) {
const missStream = new Stream.PassThrough()
pipeline(fallbackStream, missStream)
// copy from sourceKey -> sourceKey
_copyStreamFromFallbackAndVerify(
missStream,
fallbackBucket,
bucket,
sourceKey,
sourceKey
).then(() => {
// swallow errors, as this runs in the background and will log a warning
})
}
// copy from sourceKey -> destKey
return _copyStreamFromFallbackAndVerify(
copyStream,
fallbackBucket,
bucket,
sourceKey,
destKey
)
}
throw err
}
}
@@ -44,20 +118,29 @@ module.exports = function(primary, fallback) {
return Settings.filestore.fallback.buckets[bucket]
}
function _wrapFallbackMethod(method, enableCopy = true) {
function _wrapFallbackMethod(method) {
return async function(bucket, key, ...moreArgs) {
try {
return await primary.promises[method](bucket, key, ...moreArgs)
} catch (err) {
if (err instanceof NotFoundError) {
const fallbackBucket = _getFallbackBucket(bucket)
if (Settings.filestore.fallback.copyOnMiss && enableCopy) {
// run in background
_copyFileFromFallback(fallbackBucket, bucket, key, key).catch(
err => {
logger.warn({ err }, 'failed to copy file from fallback')
}
if (Settings.filestore.fallback.copyOnMiss) {
const fallbackStream = await fallback.promises.getFileStream(
fallbackBucket,
key,
{}
)
// run in background
_copyStreamFromFallbackAndVerify(
fallbackStream,
fallbackBucket,
bucket,
key,
key
).catch(err => {
logger.warn({ err }, 'failed to copy file from fallback')
})
}
return fallback.promises[method](fallbackBucket, key, ...moreArgs)
}
@@ -66,32 +149,7 @@ module.exports = function(primary, fallback) {
}
}
async function _getFileStreamAndCopyIfRequired(bucketName, key, opts) {
const shouldCopy =
Settings.filestore.fallback.copyOnMiss && !opts.start && !opts.end
try {
return await primary.promises.getFileStream(bucketName, key, opts)
} catch (err) {
if (err instanceof NotFoundError) {
const fallbackBucket = _getFallbackBucket(bucketName)
if (shouldCopy) {
return _copyFileFromFallback(
fallbackBucket,
bucketName,
key,
key,
true
)
} else {
return fallback.promises.getFileStream(fallbackBucket, key, opts)
}
}
throw err
}
}
async function _copyFromFallbackStreamAndVerify(
async function _copyStreamFromFallbackAndVerify(
stream,
sourceBucket,
destBucket,
@@ -139,63 +197,12 @@ module.exports = function(primary, fallback) {
}
}
async function _copyFileFromFallback(
sourceBucket,
destBucket,
sourceKey,
destKey,
returnStream = false
) {
metrics.inc('fallback.copy')
const sourceStream = await fallback.promises.getFileStream(
sourceBucket,
sourceKey,
{}
)
if (!returnStream) {
return _copyFromFallbackStreamAndVerify(
sourceStream,
sourceBucket,
destBucket,
sourceKey,
destKey
)
}
const tee = new Minipass()
const clientStream = new Minipass()
const copyStream = new Minipass()
tee.pipe(clientStream)
tee.pipe(copyStream)
// copy the file in the background
_copyFromFallbackStreamAndVerify(
copyStream,
sourceBucket,
destBucket,
sourceKey,
destKey
).catch(
// the error handler in this method will log a metric and a warning, so
// we don't need to do anything extra here, but catching it will prevent
// unhandled promise rejection warnings
() => {}
)
// start piping the source stream into the tee after everything is set up,
// otherwise one stream may consume bytes that don't arrive at the other
sourceStream.pipe(tee)
return clientStream
}
return {
primaryPersistor: primary,
fallbackPersistor: fallback,
sendFile: primary.sendFile,
sendStream: primary.sendStream,
getFileStream: callbackify(_getFileStreamAndCopyIfRequired),
getFileStream: callbackify(getFileStreamWithFallback),
getFileMd5Hash: callbackify(_wrapFallbackMethod('getFileMd5Hash')),
deleteDirectory: callbackify(
_wrapMethodOnBothPersistors('deleteDirectory')
@@ -208,7 +215,7 @@ module.exports = function(primary, fallback) {
promises: {
sendFile: primary.promises.sendFile,
sendStream: primary.promises.sendStream,
getFileStream: _getFileStreamAndCopyIfRequired,
getFileStream: getFileStreamWithFallback,
getFileMd5Hash: _wrapFallbackMethod('getFileMd5Hash'),
deleteDirectory: _wrapMethodOnBothPersistors('deleteDirectory'),
getFileSize: _wrapFallbackMethod('getFileSize'),

View File

@@ -7,13 +7,13 @@ const settings = require('settings-sharelatex')
const metrics = require('metrics-sharelatex')
const logger = require('logger-sharelatex')
const Minipass = require('minipass')
const meter = require('stream-meter')
const Stream = require('stream')
const crypto = require('crypto')
const fs = require('fs')
const S3 = require('aws-sdk/clients/s3')
const { URL } = require('url')
const { callbackify } = require('util')
const { callbackify, promisify } = require('util')
const {
WriteError,
ReadError,
@@ -46,6 +46,8 @@ module.exports = {
}
}
const pipeline = promisify(Stream.pipeline)
function hexToBase64(hex) {
return Buffer.from(hex, 'hex').toString('base64')
}
@@ -68,7 +70,6 @@ async function sendFile(bucketName, key, fsPath) {
async function sendStream(bucketName, key, readStream, sourceMd5) {
try {
// if there is no supplied md5 hash, we calculate the hash as the data passes through
const passthroughStream = new Minipass()
let hashPromise
let b64Hash
@@ -77,29 +78,24 @@ async function sendStream(bucketName, key, readStream, sourceMd5) {
} else {
const hash = crypto.createHash('md5')
hash.setEncoding('hex')
passthroughStream.pipe(hash)
pipeline(readStream, hash)
hashPromise = new Promise((resolve, reject) => {
passthroughStream.on('end', () => {
readStream.on('end', () => {
hash.end()
resolve(hash.read())
})
passthroughStream.on('error', err => {
readStream.on('error', err => {
reject(err)
})
})
}
const meteredStream = meter()
passthroughStream.pipe(meteredStream)
meteredStream.on('finish', () => {
metrics.count('s3.egress', meteredStream.bytes)
})
// pipe the readstream through minipass, which can write to both the metered
// stream (which goes on to S3) and the md5 generator if necessary
// - we do this last so that a listener streams does not consume data meant
// for both destinations
readStream.pipe(passthroughStream)
pipeline(readStream, meteredStream)
// if we have an md5 hash, pass this to S3 to verify the upload
const uploadOptions = {

View File

@@ -497,16 +497,16 @@ describe('Filestore', function() {
)
})
it('should fetch the file', async function() {
const res = await rp.get(fileUrl)
expect(res.body).to.equal(constantFileContent)
})
describe('when copyOnMiss is disabled', function() {
beforeEach(function() {
Settings.filestore.fallback.copyOnMiss = false
})
it('should fetch the file', async function() {
const res = await rp.get(fileUrl)
expect(res.body).to.equal(constantFileContent)
})
it('should not copy the file to the primary', async function() {
await rp.get(fileUrl)
@@ -523,6 +523,11 @@ describe('Filestore', function() {
Settings.filestore.fallback.copyOnMiss = true
})
it('should fetch the file', async function() {
const res = await rp.get(fileUrl)
expect(res.body).to.equal(constantFileContent)
})
it('copies the file to the primary', async function() {
await rp.get(fileUrl)
// wait for the file to copy in the background
@@ -578,21 +583,51 @@ describe('Filestore', function() {
)
})
it('should not copy the old file to the new bucket', async function() {
await expectPersistorNotToHaveFile(
app.persistor.primaryPersistor,
bucket,
fileKey
)
describe('when copyOnMiss is false', function() {
beforeEach(function() {
Settings.filestore.fallback.copyOnMiss = false
})
it('should create a new file in the new bucket', async function() {
await expectPersistorToHaveFile(
app.persistor.primaryPersistor,
bucket,
newFileKey,
constantFileContent
)
})
it('should not copy the old file to the primary with the old key', async function() {
await expectPersistorNotToHaveFile(
app.persistor.primaryPersistor,
bucket,
fileKey
)
})
})
it('should create a new file in the new bucket', async function() {
await expectPersistorToHaveFile(
app.persistor.primaryPersistor,
bucket,
newFileKey,
constantFileContent
)
describe('when copyOnMiss is true', function() {
beforeEach(function() {
Settings.filestore.fallback.copyOnMiss = true
})
it('should create a new file in the new bucket', async function() {
await expectPersistorToHaveFile(
app.persistor.primaryPersistor,
bucket,
newFileKey,
constantFileContent
)
})
it('should copy the old file to the primary with the old key', async function() {
await expectPersistorToHaveFile(
app.persistor.primaryPersistor,
bucket,
fileKey,
constantFileContent
)
})
})
})
})

View File

@@ -26,8 +26,8 @@ describe('MigrationPersistorTests', function() {
let Metrics,
Settings,
Logger,
Stream,
MigrationPersistor,
Minipass,
fileStream,
newPersistor
@@ -82,24 +82,22 @@ describe('MigrationPersistorTests', function() {
inc: sinon.stub()
}
Stream = {
pipeline: sinon.stub().yields(),
PassThrough: sinon.stub()
}
Logger = {
warn: sinon.stub()
}
Minipass = sinon.stub()
Minipass.prototype.on = sinon
.stub()
.withArgs('end')
.yields()
Minipass.prototype.pipe = sinon.stub()
MigrationPersistor = SandboxedModule.require(modulePath, {
requires: {
'settings-sharelatex': Settings,
stream: Stream,
'./Errors': Errors,
'metrics-sharelatex': Metrics,
'logger-sharelatex': Logger,
minipass: Minipass
'logger-sharelatex': Logger
},
globals: { console }
})
@@ -155,7 +153,7 @@ describe('MigrationPersistorTests', function() {
})
it('should return the file stream', function() {
expect(response).to.equal(fileStream)
expect(response).to.be.an.instanceOf(Stream.PassThrough)
})
it('should fetch the file from the primary persistor with the correct options', function() {
@@ -215,13 +213,13 @@ describe('MigrationPersistorTests', function() {
).to.have.been.calledWithExactly(
bucket,
key,
sinon.match.instanceOf(Minipass),
sinon.match.instanceOf(Stream.PassThrough),
md5
)
})
it('should send a stream to the client', function() {
expect(returnedStream).to.be.an.instanceOf(Minipass)
expect(returnedStream).to.be.an.instanceOf(Stream.PassThrough)
})
})
@@ -476,7 +474,12 @@ describe('MigrationPersistorTests', function() {
it('should send the file to the primary', function() {
expect(
primaryPersistor.promises.sendStream
).to.have.been.calledWithExactly(bucket, destKey, fileStream, md5)
).to.have.been.calledWithExactly(
bucket,
destKey,
sinon.match.instanceOf(Stream.PassThrough),
md5
)
})
})

View File

@@ -35,6 +35,7 @@ describe('S3PersistorTests', function() {
Meter,
MeteredStream,
ReadStream,
Stream,
S3Persistor,
S3Client,
S3ReadStream,
@@ -43,7 +44,6 @@ describe('S3PersistorTests', function() {
FileNotFoundError,
EmptyPromise,
settings,
Minipass,
Hash,
crypto
@@ -61,6 +61,10 @@ describe('S3PersistorTests', function() {
}
}
Stream = {
pipeline: sinon.stub().yields()
}
EmptyPromise = {
promise: sinon.stub().resolves()
}
@@ -70,7 +74,11 @@ describe('S3PersistorTests', function() {
}
ReadStream = {
pipe: sinon.stub().returns('readStream')
pipe: sinon.stub().returns('readStream'),
on: sinon
.stub()
.withArgs('end')
.yields()
}
FileNotFoundError = new Error('File not found')
@@ -132,13 +140,6 @@ describe('S3PersistorTests', function() {
createHash: sinon.stub().returns(Hash)
}
Minipass = sinon.stub()
Minipass.prototype.on = sinon
.stub()
.withArgs('end')
.yields()
Minipass.prototype.pipe = sinon.stub()
Logger = {
warn: sinon.stub()
}
@@ -151,8 +152,8 @@ describe('S3PersistorTests', function() {
'./Errors': Errors,
fs: Fs,
'stream-meter': Meter,
stream: Stream,
'metrics-sharelatex': Metrics,
minipass: Minipass,
crypto
},
globals: { console }
@@ -456,7 +457,10 @@ describe('S3PersistorTests', function() {
})
it('should meter the stream', function() {
expect(Minipass.prototype.pipe).to.have.been.calledWith(MeteredStream)
expect(Stream.pipeline).to.have.been.calledWith(
ReadStream,
MeteredStream
)
})
it('should record an egress metric', function() {
@@ -464,7 +468,7 @@ describe('S3PersistorTests', function() {
})
it('calculates the md5 hash of the file', function() {
expect(Minipass.prototype.pipe).to.have.been.calledWith(Hash)
expect(Stream.pipeline).to.have.been.calledWith(ReadStream, Hash)
})
})
@@ -479,7 +483,10 @@ describe('S3PersistorTests', function() {
})
it('should not calculate the md5 hash of the file', function() {
expect(Minipass.prototype.pipe).not.to.have.been.calledWith(Hash)
expect(Stream.pipeline).not.to.have.been.calledWith(
sinon.match.any,
Hash
)
})
it('sends the hash in base64', function() {