Merge pull request #17378 from overleaf/td-stream-buffer-consolidation

Stream and buffer consolidation

GitOrigin-RevId: 284f411e6403e190d2dda3d9ebf806b1935b3949
This commit is contained in:
Tim Down
2025-02-24 15:27:48 +00:00
committed by Copybot
parent 58a098c67a
commit e080d39e50
4 changed files with 55 additions and 33 deletions

View File

@@ -74,9 +74,7 @@ module.exports = OutputFileOptimiser = {
logger.debug({ args }, 'running qpdf command')
const timer = new Metrics.Timer('qpdf')
const proc = spawn('qpdf', args)
let stdout = ''
proc.stdout.setEncoding('utf8').on('data', chunk => (stdout += chunk))
const proc = spawn('qpdf', args, { stdio: 'ignore' })
callback = _.once(callback) // avoid double call back for error and close event
proc.on('error', function (err) {
logger.warn({ err, args }, 'qpdf failed')

View File

@@ -8,6 +8,7 @@ const { ReadableString } = require('@overleaf/stream-utils')
const RangeManager = require('./RangeManager')
const PersistorManager = require('./PersistorManager')
const pMap = require('p-map')
const { streamToBuffer } = require('./StreamToBuffer').promises
const { BSON } = require('mongodb-legacy')
const PARALLEL_JOBS = Settings.parallelArchiveJobs
@@ -136,7 +137,7 @@ async function getDoc(projectId, docId) {
key
)
stream.resume()
const buffer = await _streamToBuffer(projectId, docId, stream)
const buffer = await streamToBuffer(projectId, docId, stream)
const md5 = crypto.createHash('md5').update(buffer).digest('hex')
if (sourceMd5 !== md5) {
throw new Errors.Md5MismatchError('md5 mismatch when downloading doc', {
@@ -187,34 +188,6 @@ async function destroyProject(projectId) {
await Promise.all(tasks)
}
async function _streamToBuffer(projectId, docId, stream) {
const chunks = []
let size = 0
let logged = false
const logIfTooLarge = finishedReading => {
if (size <= Settings.max_doc_length) return
// Log progress once and then again at the end.
if (logged && !finishedReading) return
logger.warn(
{ projectId, docId, size, finishedReading },
'potentially large doc pulled down from gcs'
)
logged = true
}
return await new Promise((resolve, reject) => {
stream.on('data', chunk => {
size += chunk.byteLength
logIfTooLarge(false)
chunks.push(chunk)
})
stream.on('error', reject)
stream.on('end', () => {
logIfTooLarge(true)
resolve(Buffer.concat(chunks))
})
})
}
function _deserializeArchivedDoc(buffer) {
const doc = JSON.parse(buffer)

View File

@@ -0,0 +1,28 @@
const { LoggerStream, WritableBuffer } = require('@overleaf/stream-utils')
const Settings = require('@overleaf/settings')
const logger = require('@overleaf/logger/logging-manager')
const { pipeline } = require('node:stream/promises')
const { callbackify } = require('node:util')
module.exports = {
streamToBuffer: callbackify(streamToBuffer),
promises: {
streamToBuffer,
},
}
async function streamToBuffer(projectId, docId, stream) {
const loggerTransform = new LoggerStream(
Settings.max_doc_length,
(size, isFlush) => {
logger.warn(
{ projectId, docId, size, finishedReading: isFlush },
'potentially large doc pulled down from gcs'
)
}
)
const buffer = new WritableBuffer()
await pipeline(stream, loggerTransform, buffer)
return buffer.contents()
}

View File

@@ -4,6 +4,7 @@ const modulePath = '../../../app/js/DocArchiveManager.js'
const SandboxedModule = require('sandboxed-module')
const { ObjectId } = require('mongodb-legacy')
const Errors = require('../../../app/js/Errors')
const StreamToBuffer = require('../../../app/js/StreamToBuffer').promises
describe('DocArchiveManager', function () {
let DocArchiveManager,
@@ -22,7 +23,8 @@ describe('DocArchiveManager', function () {
md5Sum,
projectId,
readStream,
stream
stream,
streamToBuffer
beforeEach(function () {
md5Sum = 'decafbad'
@@ -154,6 +156,26 @@ describe('DocArchiveManager', function () {
},
}
// Wrap streamToBuffer so that we can pass in something that it expects (in
// this case, a Promise) rather than a stubbed stream object
streamToBuffer = {
promises: {
streamToBuffer: async () => {
const inputStream = new Promise(resolve => {
stream.on('data', data => resolve(data))
})
const value = await StreamToBuffer.streamToBuffer(
'testProjectId',
'testDocId',
inputStream
)
return value
},
},
}
DocArchiveManager = SandboxedModule.require(modulePath, {
requires: {
'@overleaf/settings': Settings,
@@ -163,6 +185,7 @@ describe('DocArchiveManager', function () {
'./RangeManager': RangeManager,
'./PersistorManager': PersistorManager,
'./Errors': Errors,
'./StreamToBuffer': streamToBuffer,
},
})
})