mirror of
https://github.com/yu-i-i/overleaf-cep.git
synced 2026-05-23 17:19:37 +02:00
* [web] add consistent aria-label to editing/reviewing toggle * [docstore] add endpoint for getting all docs with ranges * [history-v1] fix schema of chunkId when deleting old history chunk * [web] skip duplicate project lookup for resolving rootDocPath * [web] ignore new limits for root doc path when making debug copy * [web] allow admins to clone projects with ranges and entire history * [web] fix tests * [history-v1] re-order params for cloning project * [web] fix duplicate import of logger after merge * [project-history] re-order params for cloning project history metadata GitOrigin-RevId: 7fa35b4f90885dd453150a348d491ba0ec8de412
253 lines
5.4 KiB
JavaScript
253 lines
5.4 KiB
JavaScript
const { Writable, Readable, PassThrough, Transform } = require('node:stream')
|
|
|
|
/**
|
|
* A writable stream that stores all data written to it in a node Buffer.
|
|
* @extends Writable
|
|
* @example
|
|
* const { WritableBuffer } = require('@overleaf/stream-utils')
|
|
* const bufferStream = new WritableBuffer()
|
|
* bufferStream.write('hello')
|
|
* bufferStream.write('world')
|
|
* bufferStream.end()
|
|
* bufferStream.contents().toString() // 'helloworld'
|
|
*/
|
|
class WritableBuffer extends Writable {
|
|
constructor(options) {
|
|
super(options)
|
|
this._buffers = []
|
|
this._size = 0
|
|
}
|
|
|
|
_write(chunk, encoding, callback) {
|
|
this._buffers.push(chunk)
|
|
this._size += chunk.length
|
|
callback()
|
|
}
|
|
|
|
_final(callback) {
|
|
callback()
|
|
}
|
|
|
|
size() {
|
|
return this._size
|
|
}
|
|
|
|
getContents() {
|
|
return Buffer.concat(this._buffers)
|
|
}
|
|
|
|
contents() {
|
|
return Buffer.concat(this._buffers)
|
|
}
|
|
}
|
|
|
|
/**
|
|
* A readable stream created from a string.
|
|
* @extends Readable
|
|
* @example
|
|
* const { ReadableString } = require('@overleaf/stream-utils')
|
|
* const stringStream = new ReadableString('hello world')
|
|
* stringStream.on('data', chunk => console.log(chunk.toString()))
|
|
* stringStream.on('end', () => console.log('done'))
|
|
*/
|
|
class ReadableString extends Readable {
|
|
constructor(string, options) {
|
|
super(options)
|
|
this._string = string
|
|
}
|
|
|
|
_read(size) {
|
|
this.push(this._string)
|
|
this.push(null)
|
|
}
|
|
}
|
|
|
|
class SizeExceededError extends Error {}
|
|
|
|
/**
|
|
* Limited size stream which will emit a SizeExceededError if the size is exceeded
|
|
* @extends Transform
|
|
*/
|
|
class LimitedStream extends Transform {
|
|
constructor(maxSize) {
|
|
super()
|
|
this.maxSize = maxSize
|
|
this.size = 0
|
|
}
|
|
|
|
_transform(chunk, encoding, callback) {
|
|
this.size += chunk.byteLength
|
|
if (this.size > this.maxSize) {
|
|
callback(
|
|
new SizeExceededError(
|
|
`exceeded stream size limit of ${this.maxSize}: ${this.size}`
|
|
)
|
|
)
|
|
} else {
|
|
callback(null, chunk)
|
|
}
|
|
}
|
|
}
|
|
|
|
class AbortError extends Error {}
|
|
|
|
/**
|
|
* TimeoutStream which will emit an AbortError if it exceeds a user specified timeout
|
|
* @extends PassThrough
|
|
*/
|
|
class TimeoutStream extends PassThrough {
|
|
constructor(timeout) {
|
|
super()
|
|
this.t = setTimeout(() => {
|
|
this.destroy(new AbortError('stream timed out'))
|
|
}, timeout)
|
|
}
|
|
|
|
_final(callback) {
|
|
clearTimeout(this.t)
|
|
callback()
|
|
}
|
|
}
|
|
|
|
/**
|
|
* LoggerStream which will call the provided logger function when the stream exceeds a user specified limit. It will call the provided function again when flushing the stream and it exceeded the user specified limit before.
|
|
* @extends Transform
|
|
*/
|
|
class LoggerStream extends Transform {
|
|
/**
|
|
* Constructor.
|
|
* @param {number} maxSize
|
|
* @param {function(currentSizeOfStream: number, isFlush: boolean)} fn
|
|
* @param {Object?} options optional options for the Transform stream
|
|
*/
|
|
constructor(maxSize, fn, options) {
|
|
super(options)
|
|
this.fn = fn
|
|
this.size = 0
|
|
this.maxSize = maxSize
|
|
this.logged = false
|
|
}
|
|
|
|
_transform(chunk, encoding, callback) {
|
|
this.size += chunk.byteLength
|
|
if (this.size > this.maxSize && !this.logged) {
|
|
this.fn(this.size)
|
|
this.logged = true
|
|
}
|
|
callback(null, chunk)
|
|
}
|
|
|
|
_flush(callback) {
|
|
if (this.size > this.maxSize) {
|
|
this.fn(this.size, true)
|
|
}
|
|
callback()
|
|
}
|
|
}
|
|
|
|
class MeteredStream extends Transform {
|
|
#Metrics
|
|
#metric
|
|
#labels
|
|
|
|
constructor(Metrics, metric, labels) {
|
|
super()
|
|
this.#Metrics = Metrics
|
|
this.#metric = metric
|
|
this.#labels = labels
|
|
}
|
|
|
|
_transform(chunk, encoding, callback) {
|
|
this.#Metrics.count(this.#metric, chunk.byteLength, 1, this.#labels)
|
|
callback(null, chunk)
|
|
}
|
|
}
|
|
|
|
class IncrementalResponse {
|
|
#res
|
|
#ac
|
|
#timeout
|
|
#logger
|
|
#label
|
|
#info
|
|
constructor({ res, timeout, label, info, logger }) {
|
|
this.#res = res
|
|
this.#logger = logger
|
|
this.#label = label
|
|
this.#info = info
|
|
this.#ac = new AbortController()
|
|
this.#timeout = setTimeout(() => {
|
|
this.#logger.warn({ ...this.#info, timeout }, `${this.#label}: aborting`)
|
|
this.sendUpdate(
|
|
`error: ${label}: aborting after ${this.#humanReadableTimeout(timeout)}`
|
|
)
|
|
this.#ac.abort()
|
|
}, timeout)
|
|
}
|
|
|
|
signal() {
|
|
return this.#ac.signal
|
|
}
|
|
|
|
end() {
|
|
this.#ac.abort()
|
|
clearTimeout(this.#timeout)
|
|
try {
|
|
this.#res.end()
|
|
} catch {
|
|
try {
|
|
this.#res.destroy()
|
|
} catch {}
|
|
}
|
|
}
|
|
|
|
sendUpdate(msg) {
|
|
try {
|
|
this.#res.write(msg + '\n')
|
|
} catch (err) {
|
|
this.#ac.abort()
|
|
this.#logger.warn(
|
|
{ err, ...this.#info },
|
|
`${this.#label}: failed to send progress update`
|
|
)
|
|
}
|
|
}
|
|
|
|
fail(err) {
|
|
const aborted = this.#ac.signal.aborted
|
|
this.#ac.abort()
|
|
if (!aborted) {
|
|
this.#logger.err({ err, ...this.#info }, `${this.#label}: error`)
|
|
this.sendUpdate(`error: ${this.#label}`)
|
|
}
|
|
this.end()
|
|
}
|
|
|
|
#humanReadableTimeout(timeout) {
|
|
let ms = timeout
|
|
const minutes = Math.floor(ms / 60_000)
|
|
ms -= minutes * 60_000
|
|
const seconds = Math.floor(ms / 1_000)
|
|
ms -= seconds * 1_000
|
|
let t = ''
|
|
if (minutes) t += `${minutes}min`
|
|
if (seconds) t += `${seconds}s`
|
|
if (ms) t += `${ms}ms`
|
|
return t
|
|
}
|
|
}
|
|
|
|
// Export our classes
|
|
|
|
module.exports = {
|
|
WritableBuffer,
|
|
ReadableString,
|
|
LoggerStream,
|
|
LimitedStream,
|
|
TimeoutStream,
|
|
MeteredStream,
|
|
SizeExceededError,
|
|
AbortError,
|
|
IncrementalResponse,
|
|
}
|