mirror of
https://github.com/yu-i-i/overleaf-cep.git
synced 2026-06-06 07:39:02 +02:00
[web] gracefully handle broken histories when compiling from history (#32386)
* [web] gracefully handle broken histories when compiling from history * [web] trim down schema GitOrigin-RevId: 97d59b31eb25644d7de1194a45281def6982b130
This commit is contained in:
@@ -244,6 +244,24 @@ export function getUpdates(req, res, next) {
|
||||
)
|
||||
}
|
||||
|
||||
const getResyncPendingSchema = z.object({
|
||||
params: z.object({
|
||||
project_id: zz.objectId(),
|
||||
}),
|
||||
})
|
||||
|
||||
export function getResyncPending(req, res, next) {
|
||||
const {
|
||||
params: { project_id: projectId },
|
||||
} = parseReq(req, getResyncPendingSchema)
|
||||
SyncManager.getResyncState(projectId, (err, state) => {
|
||||
if (err) return next(err)
|
||||
res.json({
|
||||
resyncPending: state.isSyncOngoing(),
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
const latestVersionSchema = z.object({
|
||||
params: z.object({
|
||||
project_id: zz.objectId().or(z.coerce.number()),
|
||||
|
||||
@@ -25,6 +25,11 @@ export function initialize(app) {
|
||||
|
||||
app.post('/project/:project_id/flush', HttpController.flushProject)
|
||||
|
||||
app.get(
|
||||
'/project/:project_id/resync-pending',
|
||||
HttpController.getResyncPending
|
||||
)
|
||||
|
||||
app.post('/project/:project_id/resync', HttpController.resyncProject)
|
||||
|
||||
app.get('/project/:project_id/dump', HttpController.dumpProject)
|
||||
|
||||
@@ -91,7 +91,7 @@ async function startHardResync(projectId, options = {}) {
|
||||
async function startResyncWithoutLock(projectId, options) {
|
||||
await ErrorRecorder.promises.recordSyncStart(projectId)
|
||||
|
||||
const syncState = await _getResyncState(projectId)
|
||||
const syncState = await getResyncState(projectId)
|
||||
if (syncState.isSyncOngoing()) {
|
||||
throw new OError('sync ongoing')
|
||||
}
|
||||
@@ -109,7 +109,11 @@ async function startResyncWithoutLock(projectId, options) {
|
||||
await setResyncState(projectId, syncState)
|
||||
}
|
||||
|
||||
async function _getResyncState(projectId) {
|
||||
/**
|
||||
* @param {string} projectId
|
||||
* @return {Promise<SyncState>}
|
||||
*/
|
||||
async function getResyncState(projectId) {
|
||||
const rawSyncState = await db.projectHistorySyncState.findOne({
|
||||
project_id: new ObjectId(projectId.toString()),
|
||||
})
|
||||
@@ -185,7 +189,7 @@ async function clearResyncStateIfAllAfter(projectId, date) {
|
||||
}
|
||||
|
||||
async function skipUpdatesDuringSync(projectId, updates) {
|
||||
const syncState = await _getResyncState(projectId)
|
||||
const syncState = await getResyncState(projectId)
|
||||
if (!syncState.isSyncOngoing()) {
|
||||
logger.debug({ projectId }, 'not skipping updates: no resync in progress')
|
||||
// don't return syncState when unchanged
|
||||
@@ -229,7 +233,7 @@ async function expandSyncUpdates(
|
||||
return updates
|
||||
}
|
||||
|
||||
const syncState = await _getResyncState(projectId)
|
||||
const syncState = await getResyncState(projectId)
|
||||
|
||||
// compute the current snapshot from the most recent chunk
|
||||
const snapshotFiles =
|
||||
@@ -1284,6 +1288,7 @@ function trackingDirectivesEqual(a, b) {
|
||||
|
||||
// EXPORTS
|
||||
|
||||
const getResyncStateCb = callbackify(getResyncState)
|
||||
const startResyncCb = callbackify(startResync)
|
||||
const startResyncWithoutLockCb = callbackify(startResyncWithoutLock)
|
||||
const startHardResyncCb = callbackify(startHardResync)
|
||||
@@ -1327,6 +1332,7 @@ const expandSyncUpdatesCb = (
|
||||
}
|
||||
|
||||
export {
|
||||
getResyncStateCb as getResyncState,
|
||||
startResyncCb as startResync,
|
||||
startResyncWithoutLockCb as startResyncWithoutLock,
|
||||
startHardResyncCb as startHardResync,
|
||||
@@ -1337,6 +1343,7 @@ export {
|
||||
}
|
||||
|
||||
export const promises = {
|
||||
getResyncState,
|
||||
startResync,
|
||||
startResyncWithoutLock,
|
||||
startHardResync,
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
import { callbackify } from 'node:util'
|
||||
import { callbackifyMultiResult } from '@overleaf/promise-utils'
|
||||
import {
|
||||
fetchStream,
|
||||
fetchString,
|
||||
fetchStringWithResponse,
|
||||
fetchStream,
|
||||
RequestFailedError,
|
||||
} from '@overleaf/fetch-utils'
|
||||
import Settings from '@overleaf/settings'
|
||||
@@ -755,21 +755,37 @@ async function _buildRequest(projectId, userId, options) {
|
||||
|
||||
if (options.compileFromHistory && baseHistoryVersion === -1) {
|
||||
// full sync
|
||||
return await _buildRequestFromHistoryFull(
|
||||
projectId,
|
||||
historyId,
|
||||
options,
|
||||
project
|
||||
)
|
||||
try {
|
||||
return await _buildRequestFromHistoryFull(
|
||||
projectId,
|
||||
historyId,
|
||||
options,
|
||||
project
|
||||
)
|
||||
} catch (err) {
|
||||
logger.warn(
|
||||
{ err, projectId, historyId },
|
||||
'failed to compose history-full request'
|
||||
)
|
||||
// fall back to old compile mode
|
||||
}
|
||||
} else if (options.compileFromHistory) {
|
||||
// incremental sync
|
||||
return await _buildRequestFromHistoryIncremental(
|
||||
projectId,
|
||||
historyId,
|
||||
options,
|
||||
project,
|
||||
baseHistoryVersion
|
||||
)
|
||||
try {
|
||||
return await _buildRequestFromHistoryIncremental(
|
||||
projectId,
|
||||
historyId,
|
||||
options,
|
||||
project,
|
||||
baseHistoryVersion
|
||||
)
|
||||
} catch (err) {
|
||||
logger.warn(
|
||||
{ err, projectId, historyId, baseHistoryVersion },
|
||||
'failed to compose history-incremental request'
|
||||
)
|
||||
// fall back to old compile mode
|
||||
}
|
||||
}
|
||||
|
||||
if (options.incrementalCompilesEnabled || options.syncType != null) {
|
||||
@@ -895,12 +911,18 @@ async function _buildRequestFromHistoryFull(
|
||||
project
|
||||
) {
|
||||
await HistoryManager.promises.flushProject(projectId)
|
||||
const {
|
||||
chunk: {
|
||||
history: { snapshot: rawSnapshot, changes: rawChanges },
|
||||
startVersion,
|
||||
const [
|
||||
{
|
||||
chunk: {
|
||||
history: { snapshot: rawSnapshot, changes: rawChanges },
|
||||
startVersion,
|
||||
},
|
||||
},
|
||||
} = await HistoryManager.promises.getLatestHistoryWithHistoryId(historyId)
|
||||
/* ensureNoResyncPending throws */
|
||||
] = await Promise.all([
|
||||
HistoryManager.promises.getLatestHistoryWithHistoryId(historyId),
|
||||
HistoryManager.promises.ensureNoResyncPending(projectId),
|
||||
])
|
||||
const rawChangeOperations = _rawChangeOperationsFromChanges(rawChanges)
|
||||
const globalBlobs = _collectGlobalBlobs(rawChangeOperations)
|
||||
for (const { hash, rangesHash } of Object.values(rawSnapshot.files)) {
|
||||
@@ -937,22 +959,26 @@ async function _buildRequestFromHistoryIncremental(
|
||||
let size = 0
|
||||
while (hasMore) {
|
||||
let changes
|
||||
;({ changes, hasMore } =
|
||||
await HistoryManager.promises.getChangesWithHistoryId(historyId, {
|
||||
since,
|
||||
}))
|
||||
;[{ changes, hasMore } /* resyncPending throws */] = await Promise.all([
|
||||
HistoryManager.promises.getChangesWithHistoryId(historyId, { since }),
|
||||
HistoryManager.promises.ensureNoResyncPending(projectId),
|
||||
])
|
||||
since += changes.length
|
||||
const newRawChangeOperations = _rawChangeOperationsFromChanges(changes)
|
||||
size += Buffer.from(JSON.stringify(newRawChangeOperations)).byteLength
|
||||
if (size > 6.5 * 1024 * 1024) {
|
||||
// clsi has a payload limit of 7MiB. Do not send too many operations.
|
||||
// Fall back to sending the latest snapshot instead.
|
||||
return await _buildRequestFromHistoryFull(
|
||||
projectId,
|
||||
historyId,
|
||||
options,
|
||||
project
|
||||
)
|
||||
try {
|
||||
return await _buildRequestFromHistoryFull(
|
||||
projectId,
|
||||
historyId,
|
||||
options,
|
||||
project
|
||||
)
|
||||
} catch (err) {
|
||||
throw OError.tag(err, 'upgrade to history-full failed', { size })
|
||||
}
|
||||
}
|
||||
rawChangeOperations.push(...newRawChangeOperations)
|
||||
}
|
||||
|
||||
@@ -290,6 +290,13 @@ async function getLatestHistoryWithHistoryId(historyId) {
|
||||
)
|
||||
}
|
||||
|
||||
async function ensureNoResyncPending(projectId) {
|
||||
const { resyncPending } = await fetchJson(
|
||||
`${settings.apis.project_history.url}/project/${projectId}/resync-pending`
|
||||
)
|
||||
if (resyncPending) throw new OError('broken history with pending resync')
|
||||
}
|
||||
|
||||
/**
|
||||
* Get history changes since a given version
|
||||
*
|
||||
@@ -463,5 +470,6 @@ export default {
|
||||
getProjectBlobStats,
|
||||
getBlobStats,
|
||||
getLatestHistoryWithHistoryId,
|
||||
ensureNoResyncPending,
|
||||
},
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user