diff --git a/services/history-v1/api/controllers/project_import.js b/services/history-v1/api/controllers/project_import.js index 5dec84d843..9d47fe06a9 100644 --- a/services/history-v1/api/controllers/project_import.js +++ b/services/history-v1/api/controllers/project_import.js @@ -130,7 +130,9 @@ async function importChanges(req, res, next) { } if (returnSnapshot === 'none') { - res.status(HTTPStatus.CREATED).json({}) + res.status(HTTPStatus.CREATED).json({ + resyncNeeded: result.resyncNeeded, + }) } else { const rawSnapshot = await buildResultSnapshot(result && result.currentChunk) res.status(HTTPStatus.CREATED).json(rawSnapshot) diff --git a/services/history-v1/storage/lib/persist_changes.js b/services/history-v1/storage/lib/persist_changes.js index b17d7d8b37..47798531b2 100644 --- a/services/history-v1/storage/lib/persist_changes.js +++ b/services/history-v1/storage/lib/persist_changes.js @@ -82,6 +82,7 @@ async function persistChanges(projectId, allChanges, limits, clientEndVersion) { let originalEndVersion let changesToPersist + let resyncNeeded = false limits = limits || {} _.defaults(limits, { @@ -166,12 +167,11 @@ async function persistChanges(projectId, allChanges, limits, clientEndVersion) { const actualHash = content != null ? getContentHash(content) : null logger.debug({ expectedHash, actualHash }, 'validating content hash') if (actualHash !== expectedHash) { - throw new InvalidChangeError('content hash mismatch', { - projectId, - path, - expectedHash, - actualHash, - }) + logger.warn( + { projectId, path, expectedHash, actualHash }, + 'content hash mismatch' + ) + resyncNeeded = true } // Remove the content hash from the change before storing it in the chunk. @@ -300,6 +300,7 @@ async function persistChanges(projectId, allChanges, limits, clientEndVersion) { numberOfChangesPersisted: numberOfChangesToPersist, originalEndVersion, currentChunk, + resyncNeeded, } } else { return null diff --git a/services/history-v1/test/acceptance/js/api/project_import.test.js b/services/history-v1/test/acceptance/js/api/project_import.test.js index 216fb527fa..fb173238f8 100644 --- a/services/history-v1/test/acceptance/js/api/project_import.test.js +++ b/services/history-v1/test/acceptance/js/api/project_import.test.js @@ -52,6 +52,6 @@ describe('project import', function () { }) expect(importResponse.status).to.equal(HTTPStatus.CREATED) - expect(importResponse.obj).to.deep.equal({}) + expect(importResponse.obj).to.deep.equal({ resyncNeeded: false }) }) }) diff --git a/services/history-v1/test/acceptance/js/storage/persist_changes.test.js b/services/history-v1/test/acceptance/js/storage/persist_changes.test.js index aa56dc8c2a..0bb8836cc1 100644 --- a/services/history-v1/test/acceptance/js/storage/persist_changes.test.js +++ b/services/history-v1/test/acceptance/js/storage/persist_changes.test.js @@ -58,6 +58,7 @@ describe('persistChanges', function () { numberOfChangesPersisted: 1, originalEndVersion: 0, currentChunk, + resyncNeeded: false, }) const chunk = await chunkStore.loadLatest(projectId) @@ -106,6 +107,7 @@ describe('persistChanges', function () { numberOfChangesPersisted: 2, originalEndVersion: 0, currentChunk, + resyncNeeded: false, }) const chunk = await chunkStore.loadLatest(projectId) @@ -147,6 +149,7 @@ describe('persistChanges', function () { numberOfChangesPersisted: 2, originalEndVersion: 0, currentChunk, + resyncNeeded: false, }) const chunk = await chunkStore.loadLatest(projectId) @@ -213,7 +216,7 @@ describe('persistChanges', function () { expect(result.numberOfChangesPersisted).to.equal(1) }) - it('rejects a change with an invalid hash', async function () { + it('turns on the resyncNeeded flag if content hash validation fails', async function () { const limitsToPersistImmediately = { minChangeTimestamp: farFuture, maxChangeTimestamp: farFuture, @@ -235,9 +238,13 @@ describe('persistChanges', function () { ) const changes = [change] - await expect( - persistChanges(projectId, changes, limitsToPersistImmediately, 0) - ).to.be.rejectedWith(storage.InvalidChangeError) + const result = await persistChanges( + projectId, + changes, + limitsToPersistImmediately, + 0 + ) + expect(result.resyncNeeded).to.be.true }) }) }) diff --git a/services/project-history/app/js/HistoryStoreManager.js b/services/project-history/app/js/HistoryStoreManager.js index fe9c9e3d2d..bb41dfb3c0 100644 --- a/services/project-history/app/js/HistoryStoreManager.js +++ b/services/project-history/app/js/HistoryStoreManager.js @@ -249,7 +249,7 @@ export function sendChanges( method: 'POST', json: changes, }, - error => { + (error, response) => { if (error) { OError.tag(error, 'failed to send changes to v1', { projectId, @@ -261,7 +261,7 @@ export function sendChanges( }) return callback(error) } - callback() + callback(null, { resyncNeeded: response?.resyncNeeded ?? false }) } ) } diff --git a/services/project-history/app/js/UpdatesProcessor.js b/services/project-history/app/js/UpdatesProcessor.js index b52fac7af6..a76241d7ca 100644 --- a/services/project-history/app/js/UpdatesProcessor.js +++ b/services/project-history/app/js/UpdatesProcessor.js @@ -85,7 +85,7 @@ export function startResyncAndProcessUpdatesUnderLock( }) }) }, - (flushError, queueSize) => { + (flushError, { queueSize } = {}) => { if (flushError) { OError.tag(flushError) ErrorRecorder.record(projectId, queueSize, flushError, recordError => { @@ -132,7 +132,7 @@ export function processUpdatesForProject(projectId, callback) { releaseLock ) }, - (flushError, queueSize) => { + (flushError, { queueSize, resyncNeeded } = {}) => { if (flushError) { OError.tag(flushError) ErrorRecorder.record( @@ -167,7 +167,15 @@ export function processUpdatesForProject(projectId, callback) { 'failed to clear error' ) } - callback() + if (resyncNeeded) { + logger.warn( + { projectId }, + 'Resyncing project as requested by full project history' + ) + resyncProject(projectId, callback) + } else { + callback() + } }) } if (queueSize > 0) { @@ -198,7 +206,7 @@ export function resyncProject(projectId, callback) { releaseLock ) }, - (flushError, queueSize) => { + (flushError, { queueSize } = {}) => { if (flushError) { ErrorRecorder.record( projectId, @@ -247,7 +255,7 @@ export function processUpdatesForProjectUsingBisect( releaseLock ) }, - (flushError, queueSize) => { + (flushError, { queueSize } = {}) => { if (amountToProcess === 0 || queueSize === 0) { // no further processing possible if (flushError != null) { @@ -298,7 +306,7 @@ export function processSingleUpdateForProject(projectId, callback) { ) => { _countAndProcessUpdates(projectId, extendLock, 1, releaseLock) }, - (flushError, queueSize) => { + (flushError, { queueSize } = {}) => { // no need to clear the flush marker when single stepping // it will be cleared up on the next background flush if // the queue is empty @@ -339,18 +347,34 @@ _mocks._countAndProcessUpdates = ( } if (queueSize > 0) { logger.debug({ projectId, queueSize }, 'processing uncompressed updates') + + let resyncNeeded = false RedisManager.getUpdatesInBatches( projectId, batchSize, (updates, cb) => { - _processUpdatesBatch(projectId, updates, extendLock, cb) + _processUpdatesBatch( + projectId, + updates, + extendLock, + (err, flushResponse) => { + if (err) { + return cb(err) + } + + if (flushResponse.resyncNeeded) { + resyncNeeded = true + } + cb() + } + ) }, error => { // Unconventional callback signature. The caller needs the queue size // even when an error is thrown in order to record the queue size in // the projectHistoryFailures collection. We'll have to find another // way to achieve this when we promisify. - callback(error, queueSize) + callback(error, { queueSize, resyncNeeded }) } ) } else { @@ -376,15 +400,21 @@ function _processUpdatesBatch(projectId, updates, extendLock, callback) { { projectId }, 'discarding updates as project does not use history' ) - return callback() + return callback(null, {}) } - _processUpdates(projectId, historyId, updates, extendLock, error => { - if (error != null) { - return callback(OError.tag(error)) + _processUpdates( + projectId, + historyId, + updates, + extendLock, + (error, flushResponse) => { + if (error != null) { + return callback(OError.tag(error)) + } + callback(null, flushResponse) } - callback() - }) + ) }) } @@ -536,6 +566,8 @@ export function _processUpdates( if (error != null) { return callback(error) } + + let resyncNeeded = false async.waterfall( [ cb => { @@ -646,7 +678,13 @@ export function _processUpdates( projectHistoryId, changes, baseVersion, - cb + (err, response) => { + if (err) { + return cb(err) + } + resyncNeeded = response.resyncNeeded + cb() + } ) }) }, @@ -657,7 +695,11 @@ export function _processUpdates( ], error => { profile.end() - callback(error) + if (error) { + callback(error) + } else { + callback(null, { resyncNeeded }) + } } ) } diff --git a/services/project-history/test/unit/js/UpdatesManager/UpdatesProcessorTests.js b/services/project-history/test/unit/js/UpdatesManager/UpdatesProcessorTests.js index 137169bfcf..6f148e5a8d 100644 --- a/services/project-history/test/unit/js/UpdatesManager/UpdatesProcessorTests.js +++ b/services/project-history/test/unit/js/UpdatesManager/UpdatesProcessorTests.js @@ -13,7 +13,7 @@ describe('UpdatesProcessor', function () { } this.HistoryStoreManager = { getMostRecentVersion: sinon.stub(), - sendChanges: sinon.stub().yields(), + sendChanges: sinon.stub().yields(null, {}), } this.LockManager = { runWithLock: sinon.spy((key, runner, callback) => @@ -109,7 +109,7 @@ describe('UpdatesProcessor', function () { this.queueSize = 445 this.UpdatesProcessor._mocks._countAndProcessUpdates = sinon .stub() - .callsArgWith(3, this.error, this.queueSize) + .callsArgWith(3, this.error, { queueSize: this.queueSize }) }) describe('when there is no existing error', function () {