Tidy up 'getReadyPipeline'

This commit is contained in:
Simon Detheridge
2020-03-26 16:44:46 +00:00
parent 1f037ef653
commit a1ae68f6b5
3 changed files with 16 additions and 21 deletions

View File

@@ -94,33 +94,28 @@ async function verifyMd5(persistor, bucket, key, sourceMd5, destMd5 = null) {
// resolves when a stream is 'readable', or rejects if the stream throws an error
// before that happens - this lets us handle protocol-level errors before trying
// to read them - these can come from the call to pipeline or the stream itself
// to read them
function getReadyPipeline(...streams) {
return new Promise((resolve, reject) => {
const lastStream = streams.slice(-1)[0]
let resolvedOrErrored = false
const onError = function(err) {
const handler = function(err) {
if (!resolvedOrErrored) {
resolvedOrErrored = true
reject(
wrapError(err, 'error before stream became ready', {}, ReadError)
)
}
}
const onStreamReady = function() {
if (!resolvedOrErrored) {
resolvedOrErrored = true
lastStream.removeListener('readable', onStreamReady)
lastStream.removeListener('error', onError)
lastStream.removeListener('readable', handler)
if (err) {
return reject(
wrapError(err, 'error before stream became ready', {}, ReadError)
)
}
resolve(lastStream)
}
}
pipeline(...streams).catch(onError)
lastStream.on('readable', onStreamReady)
lastStream.on('error', onError)
pipeline(...streams).catch(handler)
lastStream.on('readable', handler)
})
}

View File

@@ -202,7 +202,7 @@ describe('GcsPersistorTests', function() {
beforeEach(async function() {
Transform.prototype.on = sinon.stub()
Transform.prototype.on.withArgs('error').yields(GcsNotFoundError)
Stream.pipeline.yields(GcsNotFoundError)
try {
stream = await GcsPersistor.promises.getFileStream(bucket, key)
} catch (err) {
@@ -232,7 +232,7 @@ describe('GcsPersistorTests', function() {
beforeEach(async function() {
Transform.prototype.on = sinon.stub()
Transform.prototype.on.withArgs('error').yields(genericError)
Stream.pipeline.yields(genericError)
try {
stream = await GcsPersistor.promises.getFileStream(bucket, key)
} catch (err) {

View File

@@ -292,7 +292,7 @@ describe('S3PersistorTests', function() {
beforeEach(async function() {
Transform.prototype.on = sinon.stub()
Transform.prototype.on.withArgs('error').yields(S3NotFoundError)
Stream.pipeline.yields(S3NotFoundError)
try {
stream = await S3Persistor.promises.getFileStream(bucket, key)
} catch (err) {
@@ -322,7 +322,7 @@ describe('S3PersistorTests', function() {
beforeEach(async function() {
Transform.prototype.on = sinon.stub()
Transform.prototype.on.withArgs('error').yields(S3AccessDeniedError)
Stream.pipeline.yields(S3AccessDeniedError)
try {
stream = await S3Persistor.promises.getFileStream(bucket, key)
} catch (err) {
@@ -352,7 +352,7 @@ describe('S3PersistorTests', function() {
beforeEach(async function() {
Transform.prototype.on = sinon.stub()
Transform.prototype.on.withArgs('error').yields(genericError)
Stream.pipeline.yields(genericError)
try {
stream = await S3Persistor.promises.getFileStream(bucket, key)
} catch (err) {