mirror of
https://github.com/yu-i-i/overleaf-cep.git
synced 2026-05-23 17:19:37 +02:00
[clsi-cache] add circuit breaker to clsi-cache requests (#29339)
Stage timeouts: - frontend waits 5s - web/clsi waits 4s - clsi-cache waits 3s This should ensure that the frontend can receive a valid response after any of the backend requests failed. The circuit breaker will remain closed for TIMEOUT + jitter of 0-3 times the TIMEOUT of the respective service. This should avoid the bulk of traffic to fail and occasionally issue retries without hammering the instances while down. Also do not try the next backend when the abort signal has expired. GitOrigin-RevId: d612125616a9e416beff2f4c6d7f30066b5b9d6d
This commit is contained in:
@@ -17,6 +17,11 @@ const { MeteredStream } = require('@overleaf/stream-utils')
|
||||
const { CACHE_SUBDIR } = require('./OutputCacheManager')
|
||||
const { isExtraneousFile } = require('./ResourceWriter')
|
||||
|
||||
const TIMEOUT = 5_000
|
||||
/**
|
||||
* @type {Map<string, number>}
|
||||
*/
|
||||
const lastFailures = new Map()
|
||||
const TIMING_BUCKETS = [
|
||||
0, 10, 100, 1000, 2000, 5000, 10000, 15000, 20000, 30000,
|
||||
]
|
||||
@@ -35,6 +40,25 @@ function getShard(projectId) {
|
||||
return Settings.apis.clsiCache.shards[idx]
|
||||
}
|
||||
|
||||
function checkCircuitBreaker(url) {
|
||||
const lastFailure = lastFailures.get(url) ?? 0
|
||||
if (lastFailure) {
|
||||
// Circuit breaker that avoids retries for 5-20s.
|
||||
const retryDelay = TIMEOUT * (1 + 3 * Math.random())
|
||||
if (performance.now() - lastFailure < retryDelay) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
function tripCircuitBreaker(url) {
|
||||
lastFailures.set(url, performance.now()) // The shard is unhealthy. Refresh timestamp of last failure.
|
||||
}
|
||||
function closeCircuitBreaker(url) {
|
||||
lastFailures.delete(url) // The shard is back up.
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} projectId
|
||||
* @param {string} userId
|
||||
@@ -61,6 +85,7 @@ function notifyCLSICacheAboutBuild({
|
||||
if (!Settings.apis.clsiCache.enabled) return undefined
|
||||
if (!OBJECT_ID_REGEX.test(projectId)) return undefined
|
||||
const { url, shard } = getShard(projectId)
|
||||
if (checkCircuitBreaker(url)) return undefined
|
||||
|
||||
/**
|
||||
* @param {[{path: string}]} files
|
||||
@@ -102,13 +127,18 @@ function notifyCLSICacheAboutBuild({
|
||||
method: 'POST',
|
||||
body,
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
signal: AbortSignal.timeout(15_000),
|
||||
}).catch(err => {
|
||||
logger.warn(
|
||||
{ err, projectId, userId, buildId },
|
||||
'enqueue for clsi cache failed'
|
||||
)
|
||||
signal: AbortSignal.timeout(TIMEOUT),
|
||||
})
|
||||
.then(() => {
|
||||
closeCircuitBreaker()
|
||||
})
|
||||
.catch(err => {
|
||||
tripCircuitBreaker()
|
||||
logger.warn(
|
||||
{ err, projectId, userId, buildId },
|
||||
'enqueue for clsi cache failed'
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
// PDF preview
|
||||
@@ -201,6 +231,8 @@ async function downloadOutputDotSynctexFromCompileCache(
|
||||
) {
|
||||
if (!Settings.apis.clsiCache.enabled) return false
|
||||
if (!OBJECT_ID_REGEX.test(projectId)) return false
|
||||
const { url } = getShard(projectId)
|
||||
if (checkCircuitBreaker(url)) return false
|
||||
|
||||
const timer = new Metrics.Timer(
|
||||
'clsi_cache_download',
|
||||
@@ -211,19 +243,21 @@ async function downloadOutputDotSynctexFromCompileCache(
|
||||
let stream
|
||||
try {
|
||||
stream = await fetchStream(
|
||||
`${getShard(projectId).url}/project/${projectId}/${
|
||||
`${url}/project/${projectId}/${
|
||||
userId ? `user/${userId}/` : ''
|
||||
}build/${editorId}-${buildId}/search/output/output.synctex.gz`,
|
||||
{
|
||||
method: 'GET',
|
||||
signal: AbortSignal.timeout(10_000),
|
||||
signal: AbortSignal.timeout(TIMEOUT),
|
||||
}
|
||||
)
|
||||
} catch (err) {
|
||||
if (err instanceof RequestFailedError && err.response.status === 404) {
|
||||
closeCircuitBreaker()
|
||||
timer.done({ status: 'not-found' })
|
||||
return false
|
||||
}
|
||||
tripCircuitBreaker()
|
||||
timer.done({ status: 'error' })
|
||||
throw err
|
||||
}
|
||||
@@ -240,11 +274,13 @@ async function downloadOutputDotSynctexFromCompileCache(
|
||||
)
|
||||
await fs.promises.rename(tmp, dst)
|
||||
} catch (err) {
|
||||
tripCircuitBreaker()
|
||||
try {
|
||||
await fs.promises.unlink(tmp)
|
||||
} catch {}
|
||||
throw err
|
||||
}
|
||||
closeCircuitBreaker()
|
||||
timer.done({ status: 'success' })
|
||||
return true
|
||||
}
|
||||
@@ -258,10 +294,9 @@ async function downloadOutputDotSynctexFromCompileCache(
|
||||
async function downloadLatestCompileCache(projectId, userId, compileDir) {
|
||||
if (!Settings.apis.clsiCache.enabled) return false
|
||||
if (!OBJECT_ID_REGEX.test(projectId)) return false
|
||||
const { url } = getShard(projectId)
|
||||
if (checkCircuitBreaker(url)) return false
|
||||
|
||||
const url = `${getShard(projectId).url}/project/${projectId}/${
|
||||
userId ? `user/${userId}/` : ''
|
||||
}latest/output/output.tar.gz`
|
||||
const timer = new Metrics.Timer(
|
||||
'clsi_cache_download',
|
||||
1,
|
||||
@@ -270,54 +305,71 @@ async function downloadLatestCompileCache(projectId, userId, compileDir) {
|
||||
)
|
||||
let stream
|
||||
try {
|
||||
stream = await fetchStream(url, {
|
||||
method: 'GET',
|
||||
signal: AbortSignal.timeout(10_000),
|
||||
})
|
||||
stream = await fetchStream(
|
||||
`${url}/project/${projectId}/${
|
||||
userId ? `user/${userId}/` : ''
|
||||
}latest/output/output.tar.gz`,
|
||||
{
|
||||
method: 'GET',
|
||||
signal: AbortSignal.timeout(TIMEOUT),
|
||||
}
|
||||
)
|
||||
} catch (err) {
|
||||
if (err instanceof RequestFailedError && err.response.status === 404) {
|
||||
closeCircuitBreaker()
|
||||
timer.done({ status: 'not-found' })
|
||||
return false
|
||||
}
|
||||
tripCircuitBreaker()
|
||||
timer.done({ status: 'error' })
|
||||
throw err
|
||||
}
|
||||
let n = 0
|
||||
let abort = false
|
||||
await pipeline(
|
||||
stream,
|
||||
new MeteredStream(Metrics, 'clsi_cache_egress', { path: 'output.tar.gz' }),
|
||||
createGunzip(),
|
||||
tarFs.extract(compileDir, {
|
||||
// use ignore hook for counting entries (files+folders) and validation.
|
||||
// Include folders as they incur mkdir calls.
|
||||
ignore(_, header) {
|
||||
if (abort) return true // log once
|
||||
n++
|
||||
if (n > MAX_ENTRIES_IN_OUTPUT_TAR) {
|
||||
abort = true
|
||||
logger.warn(
|
||||
{
|
||||
url,
|
||||
compileDir,
|
||||
},
|
||||
'too many entries in tar-ball from clsi-cache'
|
||||
)
|
||||
} else if (header.type !== 'file' && header.type !== 'directory') {
|
||||
abort = true
|
||||
logger.warn(
|
||||
{
|
||||
url,
|
||||
compileDir,
|
||||
entryType: header.type,
|
||||
},
|
||||
'unexpected entry in tar-ball from clsi-cache'
|
||||
)
|
||||
}
|
||||
return abort
|
||||
},
|
||||
})
|
||||
)
|
||||
try {
|
||||
await pipeline(
|
||||
stream,
|
||||
new MeteredStream(Metrics, 'clsi_cache_egress', {
|
||||
path: 'output.tar.gz',
|
||||
}),
|
||||
createGunzip(),
|
||||
tarFs.extract(compileDir, {
|
||||
// use ignore hook for counting entries (files+folders) and validation.
|
||||
// Include folders as they incur mkdir calls.
|
||||
ignore(_, header) {
|
||||
if (abort) return true // log once
|
||||
n++
|
||||
if (n > MAX_ENTRIES_IN_OUTPUT_TAR) {
|
||||
abort = true
|
||||
logger.warn(
|
||||
{
|
||||
projectId,
|
||||
userId,
|
||||
compileDir,
|
||||
},
|
||||
'too many entries in tar-ball from clsi-cache'
|
||||
)
|
||||
} else if (header.type !== 'file' && header.type !== 'directory') {
|
||||
abort = true
|
||||
logger.warn(
|
||||
{
|
||||
projectId,
|
||||
userId,
|
||||
compileDir,
|
||||
entryType: header.type,
|
||||
},
|
||||
'unexpected entry in tar-ball from clsi-cache'
|
||||
)
|
||||
}
|
||||
return abort
|
||||
},
|
||||
})
|
||||
)
|
||||
} catch (err) {
|
||||
tripCircuitBreaker()
|
||||
throw err
|
||||
}
|
||||
closeCircuitBreaker()
|
||||
Metrics.count('clsi_cache_download_entries', n)
|
||||
timer.done({ status: 'success' })
|
||||
return !abort
|
||||
|
||||
@@ -9,6 +9,13 @@ const Settings = require('@overleaf/settings')
|
||||
const OError = require('@overleaf/o-error')
|
||||
const { NotFoundError, InvalidNameError } = require('../Errors/Errors')
|
||||
|
||||
const TIMEOUT = 4_000
|
||||
|
||||
/**
|
||||
* @type {Map<string, number>}
|
||||
*/
|
||||
const lastFailures = new Map()
|
||||
|
||||
/**
|
||||
* Keep in sync with validateFilename in services/clsi-cache/app/js/utils.js
|
||||
*
|
||||
@@ -70,7 +77,7 @@ async function clearCache(projectId, userId) {
|
||||
try {
|
||||
await fetchNothing(u, {
|
||||
method: 'DELETE',
|
||||
signal: AbortSignal.timeout(15_000),
|
||||
signal: AbortSignal.timeout(TIMEOUT),
|
||||
})
|
||||
} catch (err) {
|
||||
throw OError.tag(err, 'clear clsi-cache', { url, shard })
|
||||
@@ -94,7 +101,7 @@ async function getOutputFile(
|
||||
userId,
|
||||
buildId,
|
||||
filename,
|
||||
signal = AbortSignal.timeout(15_000)
|
||||
signal = AbortSignal.timeout(TIMEOUT)
|
||||
) {
|
||||
validateFilename(filename)
|
||||
if (!/^[a-f0-9-]+$/.test(buildId)) {
|
||||
@@ -122,7 +129,7 @@ async function getLatestOutputFile(
|
||||
projectId,
|
||||
userId,
|
||||
filename,
|
||||
signal = AbortSignal.timeout(15_000)
|
||||
signal = AbortSignal.timeout(TIMEOUT)
|
||||
) {
|
||||
validateFilename(filename)
|
||||
|
||||
@@ -154,11 +161,23 @@ async function getRedirectWithFallback(
|
||||
projectId,
|
||||
userId,
|
||||
path,
|
||||
signal = AbortSignal.timeout(15_000)
|
||||
signal = AbortSignal.timeout(TIMEOUT)
|
||||
) {
|
||||
// Avoid hitting the same instance first all the time.
|
||||
const instances = _.shuffle(Settings.apis.clsiCache.instances)
|
||||
for (const { url, shard } of instances) {
|
||||
if (signal.aborted) {
|
||||
break // Stop trying the next backend when the signal has expired.
|
||||
}
|
||||
const lastFailure = lastFailures.get(url) ?? 0
|
||||
if (lastFailure) {
|
||||
// Circuit breaker that avoids retries for 4-16s.
|
||||
const retryDelay = TIMEOUT * (1 + 3 * Math.random())
|
||||
if (performance.now() - lastFailure < retryDelay) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
const u = new URL(url)
|
||||
u.pathname = path
|
||||
try {
|
||||
@@ -168,6 +187,7 @@ async function getRedirectWithFallback(
|
||||
} = await fetchRedirectWithResponse(u, {
|
||||
signal,
|
||||
})
|
||||
lastFailures.delete(url) // The shard is back up.
|
||||
let allFilesRaw = headers.get('X-All-Files')
|
||||
if (!allFilesRaw.startsWith('[')) {
|
||||
allFilesRaw = Buffer.from(allFilesRaw, 'base64url').toString()
|
||||
@@ -183,8 +203,10 @@ async function getRedirectWithFallback(
|
||||
}
|
||||
} catch (err) {
|
||||
if (err instanceof RequestFailedError && err.response.status === 404) {
|
||||
lastFailures.delete(url) // The shard is back up.
|
||||
break // No clsi-cache instance has cached something for this project/user.
|
||||
}
|
||||
lastFailures.set(url, performance.now()) // The shard is unhealthy. Refresh timestamp of last failure.
|
||||
logger.warn(
|
||||
{ err, projectId, userId, url, shard },
|
||||
'getLatestOutputFile from clsi-cache failed'
|
||||
@@ -239,6 +261,7 @@ async function prepareCacheSource(
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
TIMEOUT,
|
||||
getEgressLabel,
|
||||
clearCache,
|
||||
getOutputFile,
|
||||
|
||||
@@ -195,7 +195,7 @@ async function prepareClsiCache(
|
||||
const features = await UserGetter.promises.getUserFeatures(userId)
|
||||
if (features.compileGroup !== 'priority') return
|
||||
|
||||
const signal = AbortSignal.timeout(5_000)
|
||||
const signal = AbortSignal.timeout(ClsiCacheHandler.TIMEOUT)
|
||||
let lastUpdated
|
||||
let shard = _.shuffle(Settings.apis.clsiCache.instances)[0].shard
|
||||
if (sourceProjectId) {
|
||||
|
||||
@@ -378,7 +378,9 @@ export const LocalCompileProvider: FC<React.PropsWithChildren> = ({
|
||||
if (initialCompileFromCache && !pendingInitialCompileFromCache) {
|
||||
setPendingInitialCompileFromCache(true)
|
||||
setCompileFromCacheStartedAt(performance.now())
|
||||
getJSON(`/project/${projectId}/output/cached/output.overleaf.json`)
|
||||
getJSON(`/project/${projectId}/output/cached/output.overleaf.json`, {
|
||||
signal: AbortSignal.timeout(5_000),
|
||||
})
|
||||
.then((data: any) => {
|
||||
// Hand data over to next effect, it will wait for project/doc loading.
|
||||
setDataFromCache(data)
|
||||
|
||||
Reference in New Issue
Block a user