From a512de5a59be863943328bc4c2d591eae2dd3bde Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Tue, 9 May 2023 10:10:28 +0100 Subject: [PATCH] Merge pull request #12904 from overleaf/bg-history-v1-streams-for-node-18 use pipeline for stream operations in history-v1 GitOrigin-RevId: 301a78c2c264d4951ab23054067d6be381778fcf --- services/history-v1/storage/lib/blob_hash.js | 13 ++-- .../history-v1/storage/lib/project_archive.js | 13 ++-- services/history-v1/storage/lib/streams.js | 73 +++++++++++-------- 3 files changed, 59 insertions(+), 40 deletions(-) diff --git a/services/history-v1/storage/lib/blob_hash.js b/services/history-v1/storage/lib/blob_hash.js index 334824c1e8..52dc8cb3e2 100644 --- a/services/history-v1/storage/lib/blob_hash.js +++ b/services/history-v1/storage/lib/blob_hash.js @@ -4,6 +4,7 @@ const BPromise = require('bluebird') const fs = BPromise.promisifyAll(require('fs')) const crypto = require('crypto') +const { pipeline } = require('stream') const assert = require('./assert') function getGitBlobHeader(byteLength) { @@ -34,12 +35,14 @@ exports.fromStream = BPromise.method(function blobHashFromStream( const hash = getBlobHash(byteLength) return new BPromise(function (resolve, reject) { - stream.on('end', function () { - hash.end() - resolve(hash.read()) + pipeline(stream, hash, function (err) { + if (err) { + reject(err) + } else { + hash.end() + resolve(hash.read()) + } }) - stream.on('error', reject) - stream.pipe(hash) }) }) diff --git a/services/history-v1/storage/lib/project_archive.js b/services/history-v1/storage/lib/project_archive.js index d6c51be0bd..6e0cd2b083 100644 --- a/services/history-v1/storage/lib/project_archive.js +++ b/services/history-v1/storage/lib/project_archive.js @@ -3,6 +3,7 @@ const Archive = require('archiver') const BPromise = require('bluebird') const fs = require('fs') +const { pipeline } = require('stream') const core = require('overleaf-editor-core') const Snapshot = core.Snapshot @@ -104,12 +105,14 @@ ProjectArchive.prototype.writeZip = function projectArchiveToZip( }) const streamArchiveToFile = new BPromise(function (resolve, reject) { - archive.on('error', reject) - const stream = fs.createWriteStream(zipFilePath) - stream.on('error', reject) - stream.on('finish', resolve) - archive.pipe(stream) + pipeline(archive, stream, function (err) { + if (err) { + reject(err) + } else { + resolve() + } + }) }) return BPromise.join(streamArchiveToFile, addFilesToArchiveAndFinalize) diff --git a/services/history-v1/storage/lib/streams.js b/services/history-v1/storage/lib/streams.js index e6adff7ba2..76c553c27f 100644 --- a/services/history-v1/storage/lib/streams.js +++ b/services/history-v1/storage/lib/streams.js @@ -8,17 +8,18 @@ const BPromise = require('bluebird') const zlib = require('zlib') const stringToStream = require('string-to-stream') - -function promiseWriteStreamFinish(writeStream) { - return new BPromise(function (resolve, reject) { - writeStream.on('finish', resolve) - writeStream.on('error', reject) - }) -} +const { pipeline, Writable } = require('stream') function promisePipe(readStream, writeStream) { - readStream.pipe(writeStream) - return promiseWriteStreamFinish(writeStream) + return new BPromise(function (resolve, reject) { + pipeline(readStream, writeStream, function (err) { + if (err) { + reject(err) + } else { + resolve() + } + }) + }) } /** @@ -32,22 +33,36 @@ function promisePipe(readStream, writeStream) { */ exports.promisePipe = promisePipe +class WritableBuffer extends Writable { + constructor(options) { + super(options) + this.buffers = [] + } + + _write(chunk, encoding, callback) { + this.buffers.push(chunk) + callback() + } + + _final(callback) { + callback() + } + + contents() { + return Buffer.concat(this.buffers) + } +} + function readStreamToBuffer(readStream) { return new BPromise(function (resolve, reject) { - const buffers = [] - readStream.on('readable', function () { - while (true) { - const buffer = this.read() - if (!buffer) { - break - } - buffers.push(buffer) + const bufferStream = new WritableBuffer() + pipeline(readStream, bufferStream, function (err) { + if (err) { + reject(err) + } else { + resolve(bufferStream.contents()) } }) - readStream.on('end', function () { - resolve(Buffer.concat(buffers)) - }) - readStream.on('error', reject) }) } @@ -62,17 +77,15 @@ exports.readStreamToBuffer = readStreamToBuffer function gunzipStreamToBuffer(readStream) { const gunzip = zlib.createGunzip() - const gunzipStream = readStream.pipe(gunzip) + const bufferStream = new WritableBuffer() return new BPromise(function (resolve, reject) { - const buffers = [] - gunzipStream.on('data', function (buffer) { - buffers.push(buffer) + pipeline(readStream, gunzip, bufferStream, function (err) { + if (err) { + reject(err) + } else { + resolve(bufferStream.contents()) + } }) - gunzipStream.on('end', function () { - resolve(Buffer.concat(buffers)) - }) - readStream.on('error', reject) - gunzipStream.on('error', reject) }) }