From e080d39e5063a62012f41bdbcbc9a5fefc4c7999 Mon Sep 17 00:00:00 2001 From: Tim Down <158919+timdown@users.noreply.github.com> Date: Mon, 24 Feb 2025 15:27:48 +0000 Subject: [PATCH] Merge pull request #17378 from overleaf/td-stream-buffer-consolidation Stream and buffer consolidation GitOrigin-RevId: 284f411e6403e190d2dda3d9ebf806b1935b3949 --- services/clsi/app/js/OutputFileOptimiser.js | 4 +-- services/docstore/app/js/DocArchiveManager.js | 31 ++----------------- services/docstore/app/js/StreamToBuffer.js | 28 +++++++++++++++++ .../test/unit/js/DocArchiveManagerTests.js | 25 ++++++++++++++- 4 files changed, 55 insertions(+), 33 deletions(-) create mode 100644 services/docstore/app/js/StreamToBuffer.js diff --git a/services/clsi/app/js/OutputFileOptimiser.js b/services/clsi/app/js/OutputFileOptimiser.js index 0548defc21..09ca98672d 100644 --- a/services/clsi/app/js/OutputFileOptimiser.js +++ b/services/clsi/app/js/OutputFileOptimiser.js @@ -74,9 +74,7 @@ module.exports = OutputFileOptimiser = { logger.debug({ args }, 'running qpdf command') const timer = new Metrics.Timer('qpdf') - const proc = spawn('qpdf', args) - let stdout = '' - proc.stdout.setEncoding('utf8').on('data', chunk => (stdout += chunk)) + const proc = spawn('qpdf', args, { stdio: 'ignore' }) callback = _.once(callback) // avoid double call back for error and close event proc.on('error', function (err) { logger.warn({ err, args }, 'qpdf failed') diff --git a/services/docstore/app/js/DocArchiveManager.js b/services/docstore/app/js/DocArchiveManager.js index 238672e711..4390afe18f 100644 --- a/services/docstore/app/js/DocArchiveManager.js +++ b/services/docstore/app/js/DocArchiveManager.js @@ -8,6 +8,7 @@ const { ReadableString } = require('@overleaf/stream-utils') const RangeManager = require('./RangeManager') const PersistorManager = require('./PersistorManager') const pMap = require('p-map') +const { streamToBuffer } = require('./StreamToBuffer').promises const { BSON } = require('mongodb-legacy') const PARALLEL_JOBS = Settings.parallelArchiveJobs @@ -136,7 +137,7 @@ async function getDoc(projectId, docId) { key ) stream.resume() - const buffer = await _streamToBuffer(projectId, docId, stream) + const buffer = await streamToBuffer(projectId, docId, stream) const md5 = crypto.createHash('md5').update(buffer).digest('hex') if (sourceMd5 !== md5) { throw new Errors.Md5MismatchError('md5 mismatch when downloading doc', { @@ -187,34 +188,6 @@ async function destroyProject(projectId) { await Promise.all(tasks) } -async function _streamToBuffer(projectId, docId, stream) { - const chunks = [] - let size = 0 - let logged = false - const logIfTooLarge = finishedReading => { - if (size <= Settings.max_doc_length) return - // Log progress once and then again at the end. - if (logged && !finishedReading) return - logger.warn( - { projectId, docId, size, finishedReading }, - 'potentially large doc pulled down from gcs' - ) - logged = true - } - return await new Promise((resolve, reject) => { - stream.on('data', chunk => { - size += chunk.byteLength - logIfTooLarge(false) - chunks.push(chunk) - }) - stream.on('error', reject) - stream.on('end', () => { - logIfTooLarge(true) - resolve(Buffer.concat(chunks)) - }) - }) -} - function _deserializeArchivedDoc(buffer) { const doc = JSON.parse(buffer) diff --git a/services/docstore/app/js/StreamToBuffer.js b/services/docstore/app/js/StreamToBuffer.js new file mode 100644 index 0000000000..7de146cd11 --- /dev/null +++ b/services/docstore/app/js/StreamToBuffer.js @@ -0,0 +1,28 @@ +const { LoggerStream, WritableBuffer } = require('@overleaf/stream-utils') +const Settings = require('@overleaf/settings') +const logger = require('@overleaf/logger/logging-manager') +const { pipeline } = require('node:stream/promises') +const { callbackify } = require('node:util') + +module.exports = { + streamToBuffer: callbackify(streamToBuffer), + promises: { + streamToBuffer, + }, +} + +async function streamToBuffer(projectId, docId, stream) { + const loggerTransform = new LoggerStream( + Settings.max_doc_length, + (size, isFlush) => { + logger.warn( + { projectId, docId, size, finishedReading: isFlush }, + 'potentially large doc pulled down from gcs' + ) + } + ) + + const buffer = new WritableBuffer() + await pipeline(stream, loggerTransform, buffer) + return buffer.contents() +} diff --git a/services/docstore/test/unit/js/DocArchiveManagerTests.js b/services/docstore/test/unit/js/DocArchiveManagerTests.js index 13046d86fc..a57f9806c8 100644 --- a/services/docstore/test/unit/js/DocArchiveManagerTests.js +++ b/services/docstore/test/unit/js/DocArchiveManagerTests.js @@ -4,6 +4,7 @@ const modulePath = '../../../app/js/DocArchiveManager.js' const SandboxedModule = require('sandboxed-module') const { ObjectId } = require('mongodb-legacy') const Errors = require('../../../app/js/Errors') +const StreamToBuffer = require('../../../app/js/StreamToBuffer').promises describe('DocArchiveManager', function () { let DocArchiveManager, @@ -22,7 +23,8 @@ describe('DocArchiveManager', function () { md5Sum, projectId, readStream, - stream + stream, + streamToBuffer beforeEach(function () { md5Sum = 'decafbad' @@ -154,6 +156,26 @@ describe('DocArchiveManager', function () { }, } + // Wrap streamToBuffer so that we can pass in something that it expects (in + // this case, a Promise) rather than a stubbed stream object + streamToBuffer = { + promises: { + streamToBuffer: async () => { + const inputStream = new Promise(resolve => { + stream.on('data', data => resolve(data)) + }) + + const value = await StreamToBuffer.streamToBuffer( + 'testProjectId', + 'testDocId', + inputStream + ) + + return value + }, + }, + } + DocArchiveManager = SandboxedModule.require(modulePath, { requires: { '@overleaf/settings': Settings, @@ -163,6 +185,7 @@ describe('DocArchiveManager', function () { './RangeManager': RangeManager, './PersistorManager': PersistorManager, './Errors': Errors, + './StreamToBuffer': streamToBuffer, }, }) })