Merge pull request #25406 from overleaf/em-content-hash-validation-resync

Resync project when content hash validation fails

GitOrigin-RevId: ea9b5a78f89c55276fd67835bc262717bc778e92
This commit is contained in:
Brian Gough
2025-05-08 10:12:34 +01:00
committed by Copybot
parent 2ccdb74d20
commit 6eada92966
7 changed files with 84 additions and 32 deletions
@@ -130,7 +130,9 @@ async function importChanges(req, res, next) {
}
if (returnSnapshot === 'none') {
res.status(HTTPStatus.CREATED).json({})
res.status(HTTPStatus.CREATED).json({
resyncNeeded: result.resyncNeeded,
})
} else {
const rawSnapshot = await buildResultSnapshot(result && result.currentChunk)
res.status(HTTPStatus.CREATED).json(rawSnapshot)
@@ -82,6 +82,7 @@ async function persistChanges(projectId, allChanges, limits, clientEndVersion) {
let originalEndVersion
let changesToPersist
let resyncNeeded = false
limits = limits || {}
_.defaults(limits, {
@@ -166,12 +167,11 @@ async function persistChanges(projectId, allChanges, limits, clientEndVersion) {
const actualHash = content != null ? getContentHash(content) : null
logger.debug({ expectedHash, actualHash }, 'validating content hash')
if (actualHash !== expectedHash) {
throw new InvalidChangeError('content hash mismatch', {
projectId,
path,
expectedHash,
actualHash,
})
logger.warn(
{ projectId, path, expectedHash, actualHash },
'content hash mismatch'
)
resyncNeeded = true
}
// Remove the content hash from the change before storing it in the chunk.
@@ -300,6 +300,7 @@ async function persistChanges(projectId, allChanges, limits, clientEndVersion) {
numberOfChangesPersisted: numberOfChangesToPersist,
originalEndVersion,
currentChunk,
resyncNeeded,
}
} else {
return null
@@ -52,6 +52,6 @@ describe('project import', function () {
})
expect(importResponse.status).to.equal(HTTPStatus.CREATED)
expect(importResponse.obj).to.deep.equal({})
expect(importResponse.obj).to.deep.equal({ resyncNeeded: false })
})
})
@@ -58,6 +58,7 @@ describe('persistChanges', function () {
numberOfChangesPersisted: 1,
originalEndVersion: 0,
currentChunk,
resyncNeeded: false,
})
const chunk = await chunkStore.loadLatest(projectId)
@@ -106,6 +107,7 @@ describe('persistChanges', function () {
numberOfChangesPersisted: 2,
originalEndVersion: 0,
currentChunk,
resyncNeeded: false,
})
const chunk = await chunkStore.loadLatest(projectId)
@@ -147,6 +149,7 @@ describe('persistChanges', function () {
numberOfChangesPersisted: 2,
originalEndVersion: 0,
currentChunk,
resyncNeeded: false,
})
const chunk = await chunkStore.loadLatest(projectId)
@@ -213,7 +216,7 @@ describe('persistChanges', function () {
expect(result.numberOfChangesPersisted).to.equal(1)
})
it('rejects a change with an invalid hash', async function () {
it('turns on the resyncNeeded flag if content hash validation fails', async function () {
const limitsToPersistImmediately = {
minChangeTimestamp: farFuture,
maxChangeTimestamp: farFuture,
@@ -235,9 +238,13 @@ describe('persistChanges', function () {
)
const changes = [change]
await expect(
persistChanges(projectId, changes, limitsToPersistImmediately, 0)
).to.be.rejectedWith(storage.InvalidChangeError)
const result = await persistChanges(
projectId,
changes,
limitsToPersistImmediately,
0
)
expect(result.resyncNeeded).to.be.true
})
})
})
@@ -249,7 +249,7 @@ export function sendChanges(
method: 'POST',
json: changes,
},
error => {
(error, response) => {
if (error) {
OError.tag(error, 'failed to send changes to v1', {
projectId,
@@ -261,7 +261,7 @@ export function sendChanges(
})
return callback(error)
}
callback()
callback(null, { resyncNeeded: response?.resyncNeeded ?? false })
}
)
}
@@ -85,7 +85,7 @@ export function startResyncAndProcessUpdatesUnderLock(
})
})
},
(flushError, queueSize) => {
(flushError, { queueSize } = {}) => {
if (flushError) {
OError.tag(flushError)
ErrorRecorder.record(projectId, queueSize, flushError, recordError => {
@@ -132,7 +132,7 @@ export function processUpdatesForProject(projectId, callback) {
releaseLock
)
},
(flushError, queueSize) => {
(flushError, { queueSize, resyncNeeded } = {}) => {
if (flushError) {
OError.tag(flushError)
ErrorRecorder.record(
@@ -167,7 +167,15 @@ export function processUpdatesForProject(projectId, callback) {
'failed to clear error'
)
}
callback()
if (resyncNeeded) {
logger.warn(
{ projectId },
'Resyncing project as requested by full project history'
)
resyncProject(projectId, callback)
} else {
callback()
}
})
}
if (queueSize > 0) {
@@ -198,7 +206,7 @@ export function resyncProject(projectId, callback) {
releaseLock
)
},
(flushError, queueSize) => {
(flushError, { queueSize } = {}) => {
if (flushError) {
ErrorRecorder.record(
projectId,
@@ -247,7 +255,7 @@ export function processUpdatesForProjectUsingBisect(
releaseLock
)
},
(flushError, queueSize) => {
(flushError, { queueSize } = {}) => {
if (amountToProcess === 0 || queueSize === 0) {
// no further processing possible
if (flushError != null) {
@@ -298,7 +306,7 @@ export function processSingleUpdateForProject(projectId, callback) {
) => {
_countAndProcessUpdates(projectId, extendLock, 1, releaseLock)
},
(flushError, queueSize) => {
(flushError, { queueSize } = {}) => {
// no need to clear the flush marker when single stepping
// it will be cleared up on the next background flush if
// the queue is empty
@@ -339,18 +347,34 @@ _mocks._countAndProcessUpdates = (
}
if (queueSize > 0) {
logger.debug({ projectId, queueSize }, 'processing uncompressed updates')
let resyncNeeded = false
RedisManager.getUpdatesInBatches(
projectId,
batchSize,
(updates, cb) => {
_processUpdatesBatch(projectId, updates, extendLock, cb)
_processUpdatesBatch(
projectId,
updates,
extendLock,
(err, flushResponse) => {
if (err) {
return cb(err)
}
if (flushResponse.resyncNeeded) {
resyncNeeded = true
}
cb()
}
)
},
error => {
// Unconventional callback signature. The caller needs the queue size
// even when an error is thrown in order to record the queue size in
// the projectHistoryFailures collection. We'll have to find another
// way to achieve this when we promisify.
callback(error, queueSize)
callback(error, { queueSize, resyncNeeded })
}
)
} else {
@@ -376,15 +400,21 @@ function _processUpdatesBatch(projectId, updates, extendLock, callback) {
{ projectId },
'discarding updates as project does not use history'
)
return callback()
return callback(null, {})
}
_processUpdates(projectId, historyId, updates, extendLock, error => {
if (error != null) {
return callback(OError.tag(error))
_processUpdates(
projectId,
historyId,
updates,
extendLock,
(error, flushResponse) => {
if (error != null) {
return callback(OError.tag(error))
}
callback(null, flushResponse)
}
callback()
})
)
})
}
@@ -536,6 +566,8 @@ export function _processUpdates(
if (error != null) {
return callback(error)
}
let resyncNeeded = false
async.waterfall(
[
cb => {
@@ -646,7 +678,13 @@ export function _processUpdates(
projectHistoryId,
changes,
baseVersion,
cb
(err, response) => {
if (err) {
return cb(err)
}
resyncNeeded = response.resyncNeeded
cb()
}
)
})
},
@@ -657,7 +695,11 @@ export function _processUpdates(
],
error => {
profile.end()
callback(error)
if (error) {
callback(error)
} else {
callback(null, { resyncNeeded })
}
}
)
}
@@ -13,7 +13,7 @@ describe('UpdatesProcessor', function () {
}
this.HistoryStoreManager = {
getMostRecentVersion: sinon.stub(),
sendChanges: sinon.stub().yields(),
sendChanges: sinon.stub().yields(null, {}),
}
this.LockManager = {
runWithLock: sinon.spy((key, runner, callback) =>
@@ -109,7 +109,7 @@ describe('UpdatesProcessor', function () {
this.queueSize = 445
this.UpdatesProcessor._mocks._countAndProcessUpdates = sinon
.stub()
.callsArgWith(3, this.error, this.queueSize)
.callsArgWith(3, this.error, { queueSize: this.queueSize })
})
describe('when there is no existing error', function () {