Files
overleaf-cep/libraries/stream-utils/index.js
Jakob Ackermann 5a6c066847 [web] allow admins to clone projects with ranges and entire history (#32739)
* [web] add consistent aria-label to editing/reviewing toggle

* [docstore] add endpoint for getting all docs with ranges

* [history-v1] fix schema of chunkId when deleting old history chunk

* [web] skip duplicate project lookup for resolving rootDocPath

* [web] ignore new limits for root doc path when making debug copy

* [web] allow admins to clone projects with ranges and entire history

* [web] fix tests

* [history-v1] re-order params for cloning project

* [web] fix duplicate import of logger after merge

* [project-history] re-order params for cloning project history metadata

GitOrigin-RevId: 7fa35b4f90885dd453150a348d491ba0ec8de412
2026-04-15 08:05:49 +00:00

253 lines
5.4 KiB
JavaScript

const { Writable, Readable, PassThrough, Transform } = require('node:stream')
/**
* A writable stream that stores all data written to it in a node Buffer.
* @extends Writable
* @example
* const { WritableBuffer } = require('@overleaf/stream-utils')
* const bufferStream = new WritableBuffer()
* bufferStream.write('hello')
* bufferStream.write('world')
* bufferStream.end()
* bufferStream.contents().toString() // 'helloworld'
*/
class WritableBuffer extends Writable {
constructor(options) {
super(options)
this._buffers = []
this._size = 0
}
_write(chunk, encoding, callback) {
this._buffers.push(chunk)
this._size += chunk.length
callback()
}
_final(callback) {
callback()
}
size() {
return this._size
}
getContents() {
return Buffer.concat(this._buffers)
}
contents() {
return Buffer.concat(this._buffers)
}
}
/**
* A readable stream created from a string.
* @extends Readable
* @example
* const { ReadableString } = require('@overleaf/stream-utils')
* const stringStream = new ReadableString('hello world')
* stringStream.on('data', chunk => console.log(chunk.toString()))
* stringStream.on('end', () => console.log('done'))
*/
class ReadableString extends Readable {
constructor(string, options) {
super(options)
this._string = string
}
_read(size) {
this.push(this._string)
this.push(null)
}
}
class SizeExceededError extends Error {}
/**
* Limited size stream which will emit a SizeExceededError if the size is exceeded
* @extends Transform
*/
class LimitedStream extends Transform {
constructor(maxSize) {
super()
this.maxSize = maxSize
this.size = 0
}
_transform(chunk, encoding, callback) {
this.size += chunk.byteLength
if (this.size > this.maxSize) {
callback(
new SizeExceededError(
`exceeded stream size limit of ${this.maxSize}: ${this.size}`
)
)
} else {
callback(null, chunk)
}
}
}
class AbortError extends Error {}
/**
* TimeoutStream which will emit an AbortError if it exceeds a user specified timeout
* @extends PassThrough
*/
class TimeoutStream extends PassThrough {
constructor(timeout) {
super()
this.t = setTimeout(() => {
this.destroy(new AbortError('stream timed out'))
}, timeout)
}
_final(callback) {
clearTimeout(this.t)
callback()
}
}
/**
* LoggerStream which will call the provided logger function when the stream exceeds a user specified limit. It will call the provided function again when flushing the stream and it exceeded the user specified limit before.
* @extends Transform
*/
class LoggerStream extends Transform {
/**
* Constructor.
* @param {number} maxSize
* @param {function(currentSizeOfStream: number, isFlush: boolean)} fn
* @param {Object?} options optional options for the Transform stream
*/
constructor(maxSize, fn, options) {
super(options)
this.fn = fn
this.size = 0
this.maxSize = maxSize
this.logged = false
}
_transform(chunk, encoding, callback) {
this.size += chunk.byteLength
if (this.size > this.maxSize && !this.logged) {
this.fn(this.size)
this.logged = true
}
callback(null, chunk)
}
_flush(callback) {
if (this.size > this.maxSize) {
this.fn(this.size, true)
}
callback()
}
}
class MeteredStream extends Transform {
#Metrics
#metric
#labels
constructor(Metrics, metric, labels) {
super()
this.#Metrics = Metrics
this.#metric = metric
this.#labels = labels
}
_transform(chunk, encoding, callback) {
this.#Metrics.count(this.#metric, chunk.byteLength, 1, this.#labels)
callback(null, chunk)
}
}
class IncrementalResponse {
#res
#ac
#timeout
#logger
#label
#info
constructor({ res, timeout, label, info, logger }) {
this.#res = res
this.#logger = logger
this.#label = label
this.#info = info
this.#ac = new AbortController()
this.#timeout = setTimeout(() => {
this.#logger.warn({ ...this.#info, timeout }, `${this.#label}: aborting`)
this.sendUpdate(
`error: ${label}: aborting after ${this.#humanReadableTimeout(timeout)}`
)
this.#ac.abort()
}, timeout)
}
signal() {
return this.#ac.signal
}
end() {
this.#ac.abort()
clearTimeout(this.#timeout)
try {
this.#res.end()
} catch {
try {
this.#res.destroy()
} catch {}
}
}
sendUpdate(msg) {
try {
this.#res.write(msg + '\n')
} catch (err) {
this.#ac.abort()
this.#logger.warn(
{ err, ...this.#info },
`${this.#label}: failed to send progress update`
)
}
}
fail(err) {
const aborted = this.#ac.signal.aborted
this.#ac.abort()
if (!aborted) {
this.#logger.err({ err, ...this.#info }, `${this.#label}: error`)
this.sendUpdate(`error: ${this.#label}`)
}
this.end()
}
#humanReadableTimeout(timeout) {
let ms = timeout
const minutes = Math.floor(ms / 60_000)
ms -= minutes * 60_000
const seconds = Math.floor(ms / 1_000)
ms -= seconds * 1_000
let t = ''
if (minutes) t += `${minutes}min`
if (seconds) t += `${seconds}s`
if (ms) t += `${ms}ms`
return t
}
}
// Export our classes
module.exports = {
WritableBuffer,
ReadableString,
LoggerStream,
LimitedStream,
TimeoutStream,
MeteredStream,
SizeExceededError,
AbortError,
IncrementalResponse,
}