mirror of
https://github.com/yu-i-i/overleaf-cep.git
synced 2026-05-23 17:19:37 +02:00
* [clsi] initial implementation of compile from history * [clsi] copy changes * [saas-e2e] extend test case with nested folder * [saas-e2e] add test case for tracked changes * [web] fix accumulating changes from multiple chunks * [web] optimize size check for compile request payload * [clsi] deduplicate globalBlobs * [clsi] add validation for request body details * [clsi] add metrics for compile from history * [clsi] download binary files concurrently * [clsi] skip download of empty file blob * [clsi] break down e2e compile time metric by compileFromHistory GitOrigin-RevId: 0dadef93e89d8a172c35cb130a1042d9d1bec42a
520 lines
14 KiB
JavaScript
520 lines
14 KiB
JavaScript
// @ts-check
|
|
import crypto from 'node:crypto'
|
|
import fs from 'node:fs'
|
|
import Path from 'node:path'
|
|
import { pipeline } from 'node:stream/promises'
|
|
import { crc32, createGzip, createGunzip } from 'node:zlib'
|
|
import tarFs from 'tar-fs'
|
|
import _ from 'lodash'
|
|
import {
|
|
fetchNothing,
|
|
fetchStream,
|
|
RequestFailedError,
|
|
} from '@overleaf/fetch-utils'
|
|
import logger from '@overleaf/logger'
|
|
import Metrics from '@overleaf/metrics'
|
|
import Settings from '@overleaf/settings'
|
|
import { MeteredStream } from '@overleaf/stream-utils'
|
|
import OutputCacheManager from './OutputCacheManager.js'
|
|
import ResourceWriter from './ResourceWriter.js'
|
|
import OError from '@overleaf/o-error'
|
|
|
|
const { CACHE_SUBDIR } = OutputCacheManager
|
|
const { isExtraneousFile } = ResourceWriter
|
|
|
|
const TIMEOUT = 5_000
|
|
/**
|
|
* @type {Map<string, number>}
|
|
*/
|
|
const lastFailures = new Map()
|
|
const TIMING_BUCKETS = [
|
|
0, 10, 100, 1000, 2000, 5000, 10000, 15000, 20000, 30000,
|
|
]
|
|
const MAX_ENTRIES_IN_OUTPUT_TAR = 100
|
|
const MAX_BLG_FILES = 50
|
|
const OBJECT_ID_REGEX = /^[0-9a-f]{24}$/
|
|
|
|
const MIGRATE_FROM = new Date('2026-01-14').getTime()
|
|
const MIGRATE_UNTIL = new Date('2026-01-21').getTime()
|
|
|
|
/**
|
|
* @param {string} projectId
|
|
* @return {{shard: string, url: string}|undefined}
|
|
*/
|
|
function getAvailableShard(projectId) {
|
|
// Layout of mongodb object id bytes:
|
|
// [timestamp 4bytes][random per machine 5bytes][counter 3bytes]
|
|
// [32bit 4bytes]
|
|
const last4Bytes = Buffer.from(projectId, 'hex').subarray(8, 12)
|
|
const counter = last4Bytes.readUInt32BE()
|
|
|
|
let shards = Settings.apis.clsiCache.shards.slice(
|
|
0,
|
|
Settings.apis.clsiCache.currentShards
|
|
)
|
|
const now = Date.now()
|
|
if (
|
|
now > MIGRATE_FROM &&
|
|
now < MIGRATE_UNTIL &&
|
|
(counter % 100) / 100 <
|
|
(MIGRATE_UNTIL - now) / (MIGRATE_UNTIL - MIGRATE_FROM)
|
|
) {
|
|
shards = Settings.apis.clsiCache.shards.slice(
|
|
0,
|
|
Settings.apis.clsiCache.desiredShards
|
|
)
|
|
}
|
|
|
|
let i = 0
|
|
while (shards.length > 0) {
|
|
const idx = crc32(`${projectId}-${i++}`) % shards.length
|
|
const candidate = shards[idx]
|
|
if (!isCircuitBreakerTripped(candidate.url)) return candidate
|
|
shards.splice(idx, 1)
|
|
}
|
|
return undefined
|
|
}
|
|
|
|
/**
|
|
* @param {string} url
|
|
* @return {boolean}
|
|
*/
|
|
function isCircuitBreakerTripped(url) {
|
|
const lastFailure = lastFailures.get(url) ?? 0
|
|
if (lastFailure) {
|
|
// Circuit breaker that avoids retries for 5-20s.
|
|
const retryDelay = TIMEOUT * (1 + 3 * Math.random())
|
|
if (performance.now() - lastFailure < retryDelay) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
/**
|
|
* @param {string} url
|
|
*/
|
|
function tripCircuitBreaker(url) {
|
|
lastFailures.set(url, performance.now()) // The shard is unhealthy. Refresh timestamp of last failure.
|
|
}
|
|
|
|
/**
|
|
* @param {string} url
|
|
*/
|
|
function closeCircuitBreaker(url) {
|
|
lastFailures.delete(url) // The shard is back up.
|
|
}
|
|
|
|
/**
|
|
* @param {Object} opts
|
|
* @param {string} opts.projectId
|
|
* @param {string} opts.userId
|
|
* @param {string} opts.buildId
|
|
* @param {string} opts.editorId
|
|
* @param {[{path: string}]} opts.outputFiles
|
|
* @param {string} opts.compileGroup
|
|
* @param {Record<string, number>} opts.stats
|
|
* @param {Record<string, number>} opts.timings
|
|
* @param {Record<string, any>} opts.options
|
|
* @return {string | undefined}
|
|
*/
|
|
function notifyCLSICacheAboutBuild({
|
|
projectId,
|
|
userId,
|
|
buildId,
|
|
editorId,
|
|
outputFiles,
|
|
compileGroup,
|
|
stats,
|
|
timings,
|
|
options,
|
|
}) {
|
|
if (!Settings.apis.clsiCache.enabled) return undefined
|
|
if (!OBJECT_ID_REGEX.test(projectId)) return undefined
|
|
const shardCfg = getAvailableShard(projectId)
|
|
if (!shardCfg) return undefined
|
|
const { url, shard } = shardCfg
|
|
|
|
/**
|
|
* @param {{path: string}[]} files
|
|
*/
|
|
const enqueue = files => {
|
|
const body = Buffer.from(
|
|
JSON.stringify({
|
|
projectId,
|
|
userId,
|
|
buildId,
|
|
editorId,
|
|
files,
|
|
downloadHost: Settings.apis.clsi.downloadHost,
|
|
clsiServerId: Settings.apis.clsi.clsiServerId,
|
|
compileGroup,
|
|
stats,
|
|
timings,
|
|
options,
|
|
})
|
|
)
|
|
const bodySize = body.byteLength
|
|
if (bodySize > 10_000_000) {
|
|
const outputPDF = files.find(f => f.path === 'output.pdf')
|
|
logger.warn(
|
|
{
|
|
projectId,
|
|
userId,
|
|
bodySize,
|
|
nFiles: files.length,
|
|
outputPDFSize:
|
|
outputPDF && Buffer.from(JSON.stringify(outputPDF)).byteLength,
|
|
nPDFCachingRanges:
|
|
outputPDF &&
|
|
'ranges' in outputPDF &&
|
|
Array.isArray(outputPDF.ranges) &&
|
|
outputPDF.ranges.length,
|
|
},
|
|
'large clsi-cache request'
|
|
)
|
|
}
|
|
Metrics.count('clsi_cache_enqueue_files', files.length)
|
|
fetchNothing(`${url}/enqueue`, {
|
|
method: 'POST',
|
|
body,
|
|
headers: { 'Content-Type': 'application/json' },
|
|
signal: AbortSignal.timeout(TIMEOUT),
|
|
})
|
|
.then(() => {
|
|
closeCircuitBreaker(url)
|
|
})
|
|
.catch(err => {
|
|
tripCircuitBreaker(url)
|
|
logger.warn(
|
|
{ err, projectId, userId, buildId, shard },
|
|
'enqueue for clsi-cache failed'
|
|
)
|
|
})
|
|
}
|
|
|
|
// PDF preview
|
|
enqueue(
|
|
outputFiles
|
|
.filter(
|
|
f =>
|
|
f.path === 'output.pdf' ||
|
|
f.path === 'output.log' ||
|
|
f.path === 'output.synctex.gz'
|
|
)
|
|
.concat(
|
|
outputFiles.filter(f => f.path.endsWith('.blg')).slice(0, MAX_BLG_FILES)
|
|
)
|
|
.map(f => {
|
|
const lean = { path: f.path }
|
|
if (f.path === 'output.pdf') {
|
|
Object.assign(lean, _.pick(f, 'path', 'size', 'contentId', 'ranges'))
|
|
}
|
|
return lean
|
|
})
|
|
)
|
|
|
|
// Compile Cache
|
|
buildTarball({ projectId, userId, buildId, outputFiles })
|
|
.then(() => {
|
|
enqueue([{ path: 'output.tar.gz' }])
|
|
})
|
|
.catch(err => {
|
|
if (isENOENT(err)) return
|
|
logger.warn(
|
|
{ err, projectId, userId, buildId, shard },
|
|
'build output.tar.gz for clsi-cache failed'
|
|
)
|
|
})
|
|
|
|
copyHistorySnapshot({ projectId, userId, buildId })
|
|
.then(() => {
|
|
enqueue([{ path: 'history-resync.json.gz' }])
|
|
})
|
|
.catch(err => {
|
|
if (isENOENT(err)) return
|
|
logger.warn(
|
|
{ err, projectId, userId, buildId, shard },
|
|
'copy history-resync.json.gz for clsi-cache failed'
|
|
)
|
|
})
|
|
|
|
return shard
|
|
}
|
|
|
|
/**
|
|
* @param {Object} opts
|
|
* @param {string} opts.projectId
|
|
* @param {string} opts.userId
|
|
* @param {string} opts.buildId
|
|
* @return {Promise<void>}
|
|
*/
|
|
async function copyHistorySnapshot({ projectId, userId, buildId }) {
|
|
const src = Path.join(
|
|
Settings.path.clsiCacheDir,
|
|
userId ? `${projectId}-${userId}` : projectId,
|
|
'history.json.gz'
|
|
)
|
|
const outputDir = getOutputDir({ projectId, userId, buildId })
|
|
const dst = Path.join(outputDir, 'history-resync.json.gz')
|
|
await fs.promises.cp(src, dst)
|
|
}
|
|
/**
|
|
* @param {Object} opts
|
|
* @param {string} opts.projectId
|
|
* @param {string} opts.userId
|
|
* @param {string} opts.buildId
|
|
* @param {[{path: string}]} opts.outputFiles
|
|
* @return {Promise<void>}
|
|
*/
|
|
async function buildTarball({ projectId, userId, buildId, outputFiles }) {
|
|
const timer = new Metrics.Timer('clsi_cache_build', 1, {}, TIMING_BUCKETS)
|
|
const outputDir = getOutputDir({ projectId, userId, buildId })
|
|
|
|
const files = outputFiles.filter(f => !isExtraneousFile(f.path))
|
|
if (files.length > MAX_ENTRIES_IN_OUTPUT_TAR) {
|
|
Metrics.inc('clsi_cache_build_too_many_entries')
|
|
throw new OError('too many output files for output.tar.gz', {
|
|
nFiles: files.length,
|
|
})
|
|
}
|
|
Metrics.count('clsi_cache_build_files', files.length)
|
|
|
|
const path = Path.join(outputDir, 'output.tar.gz')
|
|
try {
|
|
await pipeline(
|
|
tarFs.pack(outputDir, { entries: files.map(f => f.path) }),
|
|
createGzip(),
|
|
fs.createWriteStream(path)
|
|
)
|
|
} catch (err) {
|
|
try {
|
|
await fs.promises.unlink(path)
|
|
} catch (e) {}
|
|
throw err
|
|
} finally {
|
|
timer.done()
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @param {string} projectId
|
|
* @param {string} userId
|
|
* @param {string} editorId
|
|
* @param {string} buildId
|
|
* @param {string} outputDir
|
|
* @return {Promise<boolean>}
|
|
*/
|
|
async function downloadOutputDotSynctexFromCompileCache(
|
|
projectId,
|
|
userId,
|
|
editorId,
|
|
buildId,
|
|
outputDir
|
|
) {
|
|
const requestPath = `/project/${projectId}/${
|
|
userId ? `user/${userId}/` : ''
|
|
}build/${editorId}-${buildId}/search/output/output.synctex.gz`
|
|
return await downloadSingleFile(projectId, requestPath, outputDir, 'synctex')
|
|
}
|
|
|
|
/**
|
|
* @param {string} projectId
|
|
* @param {string} userId
|
|
* @param {string} cacheDir
|
|
* @return {Promise<boolean>}
|
|
*/
|
|
async function downloadHistorySnapshot(projectId, userId, cacheDir) {
|
|
const requestPath = `/project/${projectId}/${
|
|
userId ? `user/${userId}/` : ''
|
|
}latest/output/history-resync.json.gz`
|
|
return await downloadSingleFile(projectId, requestPath, cacheDir, 'snapshot')
|
|
}
|
|
|
|
/**
|
|
* @param {string} projectId
|
|
* @param {string} requestPath
|
|
* @param {string} outputDir
|
|
* @param {string} label
|
|
* @return {Promise<boolean>}
|
|
*/
|
|
async function downloadSingleFile(projectId, requestPath, outputDir, label) {
|
|
if (!Settings.apis.clsiCache.enabled) return false
|
|
if (!OBJECT_ID_REGEX.test(projectId)) return false
|
|
const shardCfg = getAvailableShard(projectId)
|
|
if (!shardCfg) return false
|
|
const { url, shard } = shardCfg
|
|
|
|
const timer = new Metrics.Timer(
|
|
'clsi_cache_download',
|
|
1,
|
|
{ method: label },
|
|
TIMING_BUCKETS
|
|
)
|
|
const u = new URL(url)
|
|
u.pathname = requestPath
|
|
let stream
|
|
try {
|
|
stream = await fetchStream(u, {
|
|
method: 'GET',
|
|
signal: AbortSignal.timeout(TIMEOUT),
|
|
})
|
|
} catch (err) {
|
|
if (err instanceof RequestFailedError && err.response.status === 404) {
|
|
closeCircuitBreaker(url)
|
|
timer.done({ status: 'not-found' })
|
|
return false
|
|
}
|
|
tripCircuitBreaker(url)
|
|
timer.done({ status: 'error' })
|
|
throw OError.tag(err, 'download failed', { shard })
|
|
}
|
|
await fs.promises.mkdir(outputDir, { recursive: true })
|
|
const name = Path.basename(requestPath)
|
|
const dst = Path.join(outputDir, name)
|
|
const tmp = dst + crypto.randomUUID()
|
|
try {
|
|
await pipeline(
|
|
stream,
|
|
new MeteredStream(Metrics, 'clsi_cache_egress', {
|
|
path: name,
|
|
}),
|
|
fs.createWriteStream(tmp)
|
|
)
|
|
await fs.promises.rename(tmp, dst)
|
|
} catch (err) {
|
|
if (isENOENT(err)) return false
|
|
tripCircuitBreaker(url)
|
|
try {
|
|
await fs.promises.unlink(tmp)
|
|
} catch {}
|
|
throw OError.tag(err, 'stream failed', { shard })
|
|
}
|
|
closeCircuitBreaker(url)
|
|
timer.done({ status: 'success' })
|
|
return true
|
|
}
|
|
|
|
/**
|
|
* @param {string} projectId
|
|
* @param {string} userId
|
|
* @param {string} compileDir
|
|
* @return {Promise<boolean>}
|
|
*/
|
|
async function downloadLatestCompileCache(projectId, userId, compileDir) {
|
|
if (!Settings.apis.clsiCache.enabled) return false
|
|
if (!OBJECT_ID_REGEX.test(projectId)) return false
|
|
const shardCfg = getAvailableShard(projectId)
|
|
if (!shardCfg) return false
|
|
const { url, shard } = shardCfg
|
|
|
|
const timer = new Metrics.Timer(
|
|
'clsi_cache_download',
|
|
1,
|
|
{ method: 'tar' },
|
|
TIMING_BUCKETS
|
|
)
|
|
let stream
|
|
try {
|
|
stream = await fetchStream(
|
|
`${url}/project/${projectId}/${
|
|
userId ? `user/${userId}/` : ''
|
|
}latest/output/output.tar.gz`,
|
|
{
|
|
method: 'GET',
|
|
signal: AbortSignal.timeout(TIMEOUT),
|
|
}
|
|
)
|
|
} catch (err) {
|
|
if (err instanceof RequestFailedError && err.response.status === 404) {
|
|
closeCircuitBreaker(url)
|
|
timer.done({ status: 'not-found' })
|
|
return false
|
|
}
|
|
tripCircuitBreaker(url)
|
|
timer.done({ status: 'error' })
|
|
throw OError.tag(err, 'download failed', { shard })
|
|
}
|
|
let n = 0
|
|
let abort = false
|
|
try {
|
|
await pipeline(
|
|
stream,
|
|
new MeteredStream(Metrics, 'clsi_cache_egress', {
|
|
path: 'output.tar.gz',
|
|
}),
|
|
createGunzip(),
|
|
tarFs.extract(compileDir, {
|
|
// use ignore hook for counting entries (files+folders) and validation.
|
|
// Include folders as they incur mkdir calls.
|
|
ignore(_, header) {
|
|
if (abort) return true // log once
|
|
n++
|
|
if (n > MAX_ENTRIES_IN_OUTPUT_TAR) {
|
|
abort = true
|
|
logger.warn(
|
|
{
|
|
projectId,
|
|
userId,
|
|
compileDir,
|
|
},
|
|
'too many entries in tar-ball from clsi-cache'
|
|
)
|
|
} else if (header.type !== 'file' && header.type !== 'directory') {
|
|
abort = true
|
|
logger.warn(
|
|
{
|
|
projectId,
|
|
userId,
|
|
compileDir,
|
|
entryType: header.type,
|
|
},
|
|
'unexpected entry in tar-ball from clsi-cache'
|
|
)
|
|
}
|
|
return abort
|
|
},
|
|
})
|
|
)
|
|
} catch (err) {
|
|
if (isENOENT(err)) return false
|
|
tripCircuitBreaker(url)
|
|
throw OError.tag(err, 'stream failed', { shard })
|
|
}
|
|
closeCircuitBreaker(url)
|
|
Metrics.count('clsi_cache_download_entries', n)
|
|
timer.done({ status: 'success' })
|
|
return !abort
|
|
}
|
|
|
|
/**
|
|
* @param {Object} opts
|
|
* @param {string} opts.projectId
|
|
* @param {string} opts.userId
|
|
* @param {string} opts.buildId
|
|
* @return {string}
|
|
*/
|
|
function getOutputDir({ projectId, userId, buildId }) {
|
|
return Path.join(
|
|
Settings.path.outputDir,
|
|
userId ? `${projectId}-${userId}` : projectId,
|
|
CACHE_SUBDIR,
|
|
buildId
|
|
)
|
|
}
|
|
|
|
/**
|
|
* @param {unknown} err
|
|
* @return {boolean}
|
|
*/
|
|
function isENOENT(err) {
|
|
return err instanceof Error && 'code' in err && err.code === 'ENOENT'
|
|
}
|
|
|
|
export default {
|
|
notifyCLSICacheAboutBuild,
|
|
downloadLatestCompileCache,
|
|
downloadHistorySnapshot,
|
|
downloadOutputDotSynctexFromCompileCache,
|
|
}
|