diff --git a/package-lock.json b/package-lock.json index 608f0ff2eb..f32cc5d920 100644 --- a/package-lock.json +++ b/package-lock.json @@ -43695,6 +43695,7 @@ "@overleaf/logger": "*", "@overleaf/metrics": "*", "@overleaf/o-error": "*", + "@overleaf/promise-utils": "*", "@overleaf/ranges-tracker": "*", "@overleaf/redis-wrapper": "*", "@overleaf/settings": "*", @@ -54189,6 +54190,7 @@ "@overleaf/logger": "*", "@overleaf/metrics": "*", "@overleaf/o-error": "*", + "@overleaf/promise-utils": "*", "@overleaf/ranges-tracker": "*", "@overleaf/redis-wrapper": "*", "@overleaf/settings": "*", diff --git a/services/document-updater/app/js/LockManager.js b/services/document-updater/app/js/LockManager.js index a65ab9f1e3..7b81101d4f 100644 --- a/services/document-updater/app/js/LockManager.js +++ b/services/document-updater/app/js/LockManager.js @@ -3,6 +3,7 @@ const redis = require('@overleaf/redis-wrapper') const rclient = redis.createClient(Settings.redis.lock) const keys = Settings.redis.lock.key_schema const RedisLocker = require('@overleaf/redis-wrapper/RedisLocker') +const { promisify } = require('@overleaf/promise-utils') module.exports = new RedisLocker({ rclient, @@ -16,3 +17,10 @@ module.exports = new RedisLocker({ metricsPrefix: 'doc', lockTTLSeconds: Settings.redisLockTTLSeconds, }) + +module.exports.promises = { + checkLock: promisify(module.exports.checkLock.bind(module.exports)), + getLock: promisify(module.exports.getLock.bind(module.exports)), + releaseLock: promisify(module.exports.releaseLock.bind(module.exports)), + tryLock: promisify(module.exports.tryLock.bind(module.exports)), +} diff --git a/services/document-updater/app/js/PersistenceManager.js b/services/document-updater/app/js/PersistenceManager.js index b5abb2b44f..efaf586946 100644 --- a/services/document-updater/app/js/PersistenceManager.js +++ b/services/document-updater/app/js/PersistenceManager.js @@ -2,6 +2,7 @@ const Settings = require('@overleaf/settings') const Errors = require('./Errors') const Metrics = require('./Metrics') const logger = require('@overleaf/logger') +const { promisifyAll } = require('@overleaf/promise-utils') const request = require('requestretry').defaults({ maxAttempts: 2, retryDelay: 10, @@ -175,3 +176,9 @@ function setDoc( } module.exports = { getDoc, setDoc } + +module.exports.promises = promisifyAll(module.exports, { + multiResult: { + getDoc: ['lines', 'version', 'ranges', 'pathname', 'projectHistoryId'], + }, +}) diff --git a/services/document-updater/app/js/ProjectFlusher.js b/services/document-updater/app/js/ProjectFlusher.js index 112f89273a..33d585b255 100644 --- a/services/document-updater/app/js/ProjectFlusher.js +++ b/services/document-updater/app/js/ProjectFlusher.js @@ -20,6 +20,7 @@ const async = require('async') const ProjectManager = require('./ProjectManager') const _ = require('lodash') const logger = require('@overleaf/logger') +const { promisifyAll } = require('@overleaf/promise-utils') const ProjectFlusher = { // iterate over keys asynchronously using redis scan (non-blocking) @@ -135,3 +136,4 @@ const ProjectFlusher = { } module.exports = ProjectFlusher +module.exports.promises = promisifyAll(ProjectFlusher) diff --git a/services/document-updater/app/js/ProjectManager.js b/services/document-updater/app/js/ProjectManager.js index 31e28f5943..3cf6aad1aa 100644 --- a/services/document-updater/app/js/ProjectManager.js +++ b/services/document-updater/app/js/ProjectManager.js @@ -6,6 +6,7 @@ const async = require('async') const logger = require('@overleaf/logger') const Metrics = require('./Metrics') const Errors = require('./Errors') +const { promisifyAll } = require('@overleaf/promise-utils') module.exports = { flushProjectWithLocks, @@ -17,6 +18,8 @@ module.exports = { updateProjectWithLocks, } +module.exports.promises = promisifyAll(module.exports) + function flushProjectWithLocks(projectId, _callback) { const timer = new Metrics.Timer('projectManager.flushProjectWithLocks') const callback = function (...args) { diff --git a/services/document-updater/app/js/RedisManager.js b/services/document-updater/app/js/RedisManager.js index 1f72cb8600..e7f84a18ab 100644 --- a/services/document-updater/app/js/RedisManager.js +++ b/services/document-updater/app/js/RedisManager.js @@ -10,6 +10,7 @@ const crypto = require('crypto') const async = require('async') const ProjectHistoryRedisManager = require('./ProjectHistoryRedisManager') const { docIsTooLarge } = require('./Limits') +const { promisifyAll } = require('@overleaf/promise-utils') // Sometimes Redis calls take an unexpectedly long time. We have to be // quick with Redis calls because we're holding a lock that expires @@ -617,3 +618,23 @@ module.exports = RedisManager = { return crypto.createHash('sha1').update(docLines, 'utf8').digest('hex') }, } + +module.exports.promises = promisifyAll(module.exports, { + multiResult: { + getDoc: [ + 'lines', + 'version', + 'ranges', + 'pathname', + 'projectHistoryId', + 'unflushedTime', + 'lastUpdatedAt', + 'lastUpdatedBy', + ], + getNextProjectToFlushAndDelete: [ + 'projectId', + 'flushTimestamp', + 'queueLength', + ], + }, +}) diff --git a/services/document-updater/package.json b/services/document-updater/package.json index 34c312ddba..612ab7460c 100644 --- a/services/document-updater/package.json +++ b/services/document-updater/package.json @@ -21,6 +21,7 @@ "@overleaf/logger": "*", "@overleaf/metrics": "*", "@overleaf/o-error": "*", + "@overleaf/promise-utils": "*", "@overleaf/ranges-tracker": "*", "@overleaf/redis-wrapper": "*", "@overleaf/settings": "*", diff --git a/services/document-updater/scripts/check_redis_mongo_sync_state.js b/services/document-updater/scripts/check_redis_mongo_sync_state.js new file mode 100644 index 0000000000..c04794d3a7 --- /dev/null +++ b/services/document-updater/scripts/check_redis_mongo_sync_state.js @@ -0,0 +1,309 @@ +const fs = require('fs') +const Path = require('path') +const _ = require('lodash') +const logger = require('@overleaf/logger') +const OError = require('@overleaf/o-error') +const LockManager = require('../app/js/LockManager') +const PersistenceManager = require('../app/js/PersistenceManager') +const ProjectFlusher = require('../app/js/ProjectFlusher') +const ProjectManager = require('../app/js/ProjectManager') +const RedisManager = require('../app/js/RedisManager') +const Settings = require('@overleaf/settings') + +const AUTO_FIX_VERSION_MISMATCH = + process.env.AUTO_FIX_VERSION_MISMATCH === 'true' +const SCRIPT_LOG_LEVEL = process.env.SCRIPT_LOG_LEVEL || 'warn' +const FLUSH_IN_SYNC_PROJECTS = process.env.FLUSH_IN_SYNC_PROJECTS === 'true' +const FOLDER = + process.env.FOLDER || '/tmp/overleaf-check-redis-mongo-sync-state' +const LIMIT = parseInt(process.env.LIMIT || '1000', 10) +const RETRIES = parseInt(process.env.RETRIES || '5', 10) +const WRITE_CONTENT = process.env.WRITE_CONTENT === 'true' + +process.env.LOG_LEVEL = SCRIPT_LOG_LEVEL +logger.initialize('check-redis-mongo-sync-state') + +const COMPARE_AND_SET = + 'if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("set", KEYS[1], ARGV[2]) else return 0 end' + +/** + * @typedef {Object} Doc + * @property {number} version + * @property {Array} lines + * @property {string} pathname + * @property {Object} ranges + */ + +class TryAgainError extends Error {} + +/** + * @param {string} docId + * @param {Doc} redisDoc + * @param {Doc} mongoDoc + * @return {Promise} + */ +async function updateDocVersionInRedis(docId, redisDoc, mongoDoc) { + const lockValue = await LockManager.promises.getLock(docId) + try { + const key = Settings.redis.documentupdater.key_schema.docVersion({ + doc_id: docId, + }) + const numberOfKeys = 1 + const ok = await RedisManager.rclient.eval( + COMPARE_AND_SET, + numberOfKeys, + key, + redisDoc.version, + mongoDoc.version + ) + if (!ok) { + throw new TryAgainError( + 'document has been updated, aborting overwrite. Try again.' + ) + } + } finally { + await LockManager.promises.releaseLock(docId, lockValue) + } +} + +/** + * @param {string} projectId + * @param {string} docId + * @return {Promise} + */ +async function processDoc(projectId, docId) { + const redisDoc = /** @type Doc */ await RedisManager.promises.getDoc( + projectId, + docId + ) + const mongoDoc = /** @type Doc */ await PersistenceManager.promises.getDoc( + projectId, + docId + ) + + if (mongoDoc.version < redisDoc.version) { + // mongo is behind, we can flush to mongo when all docs are processed. + return false + } + + mongoDoc.snapshot = mongoDoc.lines.join('\n') + redisDoc.snapshot = redisDoc.lines.join('\n') + if (!mongoDoc.ranges) mongoDoc.ranges = {} + if (!redisDoc.ranges) redisDoc.ranges = {} + + const sameLines = mongoDoc.snapshot === redisDoc.snapshot + const sameRanges = _.isEqual(mongoDoc.ranges, redisDoc.ranges) + if (sameLines && sameRanges) { + if (mongoDoc.version > redisDoc.version) { + // mongo is ahead, technically out of sync, but practically the content is identical + if (AUTO_FIX_VERSION_MISMATCH) { + console.log( + `Fixing out of sync doc version for doc ${docId} in project ${projectId}: mongo=${mongoDoc.version} > redis=${redisDoc.version}` + ) + await updateDocVersionInRedis(docId, redisDoc, mongoDoc) + return false + } else { + console.error( + `Detected out of sync redis and mongo version for doc ${docId} in project ${projectId}, auto-fixable via AUTO_FIX_VERSION_MISMATCH=true` + ) + return true + } + } else { + // same lines, same ranges, same version + return false + } + } + + const dir = Path.join(FOLDER, projectId, docId) + console.error( + `Detected out of sync redis and mongo content for doc ${docId} in project ${projectId}` + ) + if (!WRITE_CONTENT) return true + + console.log(`pathname: ${mongoDoc.pathname}`) + console.log(`mongo version: ${mongoDoc.version}`) + console.log(`redis version: ${redisDoc.version}`) + + await fs.promises.mkdir(dir, { recursive: true }) + + if (sameLines) { + console.log('mongo lines match redis lines') + } else { + console.log( + `mongo lines and redis lines out of sync, writing content into ${dir}` + ) + await fs.promises.writeFile( + Path.join(dir, 'mongo-snapshot.txt'), + mongoDoc.snapshot + ) + await fs.promises.writeFile( + Path.join(dir, 'redis-snapshot.txt'), + redisDoc.snapshot + ) + } + if (sameRanges) { + console.log('mongo ranges match redis ranges') + } else { + console.log( + `mongo ranges and redis ranges out of sync, writing content into ${dir}` + ) + await fs.promises.writeFile( + Path.join(dir, 'mongo-ranges.json'), + JSON.stringify(mongoDoc.ranges) + ) + await fs.promises.writeFile( + Path.join(dir, 'redis-ranges.json'), + JSON.stringify(redisDoc.ranges) + ) + } + console.log('---') + return true +} + +/** + * @param {string} projectId + * @return {Promise} + */ +async function processProject(projectId) { + const docIds = await RedisManager.promises.getDocIdsInProject(projectId) + + let outOfSync = 0 + for (const docId of docIds) { + let lastErr + for (let i = 0; i <= RETRIES; i++) { + try { + if (await processDoc(projectId, docId)) { + outOfSync++ + } + break + } catch (err) { + lastErr = err + } + } + if (lastErr) { + throw OError.tag(lastErr, 'process doc', { docId }) + } + } + if (outOfSync === 0 && FLUSH_IN_SYNC_PROJECTS) { + try { + await ProjectManager.promises.flushAndDeleteProjectWithLocks( + projectId, + {} + ) + } catch (err) { + throw OError.tag(err, 'flush project with only in-sync docs') + } + } + return outOfSync +} + +/** + * @param {Set} processed + * @param {Set} outOfSync + * @return {Promise<{perIterationOutOfSync: number, done: boolean}>} + */ +async function scanOnce(processed, outOfSync) { + const projectIds = await ProjectFlusher.promises.flushAllProjects({ + limit: LIMIT, + dryRun: true, + }) + + let perIterationOutOfSync = 0 + for (const projectId of projectIds) { + if (processed.has(projectId)) continue + processed.add(projectId) + + let perProjectOutOfSync = 0 + try { + perProjectOutOfSync = await processProject(projectId) + } catch (err) { + throw OError.tag(err, 'process project', { projectId }) + } + perIterationOutOfSync += perProjectOutOfSync + if (perProjectOutOfSync > 0) { + outOfSync.add(projectId) + } + } + + return { perIterationOutOfSync, done: projectIds.length < LIMIT } +} + +/** + * @return {Promise} + */ +async function main() { + if (!WRITE_CONTENT) { + console.warn() + console.warn( + ` Use WRITE_CONTENT=true to write the content of out of sync docs to FOLDER=${FOLDER}` + ) + console.warn() + } else { + console.log( + `Writing content for projects with out of sync docs into FOLDER=${FOLDER}` + ) + await fs.promises.mkdir(FOLDER, { recursive: true }) + const existing = await fs.promises.readdir(FOLDER) + if (existing.length > 0) { + console.warn() + console.warn( + ` Found existing entries in FOLDER=${FOLDER}. Please delete or move these before running the script again.` + ) + console.warn() + return 101 + } + } + if (LIMIT < 100) { + console.warn() + console.warn( + ` Using small LIMIT=${LIMIT}, this can take a while to SCAN in a large redis database.` + ) + console.warn() + } + + const processed = new Set() + const outOfSyncProjects = new Set() + let totalOutOfSyncDocs = 0 + while (true) { + const before = processed.size + const { perIterationOutOfSync, done } = await scanOnce( + processed, + outOfSyncProjects + ) + totalOutOfSyncDocs += perIterationOutOfSync + console.log(`Processed ${processed.size} projects`) + console.log( + `Found ${ + outOfSyncProjects.size + } projects with ${totalOutOfSyncDocs} out of sync docs: ${JSON.stringify( + Array.from(outOfSyncProjects) + )}` + ) + if (done) { + console.log('Finished iterating all projects in redis') + break + } + if (processed.size === before) { + console.error( + `Found too many un-flushed projects (LIMIT=${LIMIT}). Please fix the reported projects first, then try again.` + ) + if (!FLUSH_IN_SYNC_PROJECTS) { + console.error( + 'Use FLUSH_IN_SYNC_PROJECTS=true to flush projects that have been checked.' + ) + } + return 2 + } + } + return totalOutOfSyncDocs > 0 ? 1 : 0 +} + +main() + .then(code => { + process.exit(code) + }) + .catch(error => { + console.error(OError.getFullStack(error)) + console.error(OError.getFullInfo(error)) + process.exit(1) + }) diff --git a/services/document-updater/test/acceptance/js/CheckRedisMongoSyncStateTests.js b/services/document-updater/test/acceptance/js/CheckRedisMongoSyncStateTests.js new file mode 100644 index 0000000000..c230229d8d --- /dev/null +++ b/services/document-updater/test/acceptance/js/CheckRedisMongoSyncStateTests.js @@ -0,0 +1,265 @@ +const MockWebApi = require('./helpers/MockWebApi') +const DocUpdaterClient = require('./helpers/DocUpdaterClient') +const DocUpdaterApp = require('./helpers/DocUpdaterApp') +const { promisify } = require('util') +const { exec } = require('child_process') +const { expect } = require('chai') +const Settings = require('@overleaf/settings') +const fs = require('fs') +const Path = require('path') + +const rclient = require('@overleaf/redis-wrapper').createClient( + Settings.redis.documentupdater +) + +describe('CheckRedisMongoSyncState', function () { + beforeEach(function (done) { + DocUpdaterApp.ensureRunning(done) + }) + beforeEach(async function () { + await rclient.flushall() + }) + + async function runScript(options) { + let result + try { + result = await promisify(exec)( + Object.entries(options) + .map(([key, value]) => `${key}=${value}`) + .concat(['node', 'scripts/check_redis_mongo_sync_state.js']) + .join(' ') + ) + } catch (error) { + // includes details like exit code, stdErr and stdOut + return error + } + result.code = 0 + return result + } + + describe('without projects', function () { + it('should work when in sync', async function () { + const result = await runScript({}) + expect(result.code).to.equal(0) + expect(result.stdout).to.include('Processed 0 projects') + expect(result.stdout).to.include( + 'Found 0 projects with 0 out of sync docs' + ) + }) + }) + + describe('with a project', function () { + let projectId, docId + beforeEach(function (done) { + projectId = DocUpdaterClient.randomId() + docId = DocUpdaterClient.randomId() + MockWebApi.insertDoc(projectId, docId, { + lines: ['mongo', 'lines'], + version: 1, + }) + DocUpdaterClient.getDoc(projectId, docId, done) + }) + + it('should work when in sync', async function () { + const result = await runScript({}) + expect(result.code).to.equal(0) + expect(result.stdout).to.include('Processed 1 projects') + expect(result.stdout).to.include( + 'Found 0 projects with 0 out of sync docs' + ) + }) + + describe('with out of sync lines', function () { + beforeEach(function () { + MockWebApi.insertDoc(projectId, docId, { + lines: ['updated', 'mongo', 'lines'], + version: 1, + }) + }) + + it('should detect the out of sync state', async function () { + const result = await runScript({}) + expect(result.code).to.equal(1) + expect(result.stdout).to.include('Processed 1 projects') + expect(result.stdout).to.include( + 'Found 1 projects with 1 out of sync docs' + ) + }) + }) + + describe('with out of sync ranges', function () { + beforeEach(function () { + MockWebApi.insertDoc(projectId, docId, { + lines: ['mongo', 'lines'], + version: 1, + ranges: { changes: ['FAKE CHANGE'] }, + }) + }) + + it('should detect the out of sync state', async function () { + const result = await runScript({}) + expect(result.code).to.equal(1) + expect(result.stdout).to.include('Processed 1 projects') + expect(result.stdout).to.include( + 'Found 1 projects with 1 out of sync docs' + ) + }) + }) + + describe('with out of sync version', function () { + beforeEach(function () { + MockWebApi.insertDoc(projectId, docId, { + lines: ['mongo', 'lines'], + version: 2, + }) + }) + + it('should detect the out of sync state', async function () { + const result = await runScript({}) + expect(result.code).to.equal(1) + expect(result.stdout).to.include('Processed 1 projects') + expect(result.stdout).to.include( + 'Found 1 projects with 1 out of sync docs' + ) + }) + + it('should auto-fix the out of sync state', async function () { + const result = await runScript({ + AUTO_FIX_VERSION_MISMATCH: 'true', + }) + expect(result.code).to.equal(0) + expect(result.stdout).to.include('Processed 1 projects') + expect(result.stdout).to.include( + 'Found 0 projects with 0 out of sync docs' + ) + }) + }) + + describe('with a project', function () { + let projectId2, docId2 + beforeEach(function (done) { + projectId2 = DocUpdaterClient.randomId() + docId2 = DocUpdaterClient.randomId() + MockWebApi.insertDoc(projectId2, docId2, { + lines: ['mongo', 'lines'], + version: 1, + }) + DocUpdaterClient.getDoc(projectId2, docId2, done) + }) + + it('should work when in sync', async function () { + const result = await runScript({}) + expect(result.code).to.equal(0) + expect(result.stdout).to.include('Processed 2 projects') + expect(result.stdout).to.include( + 'Found 0 projects with 0 out of sync docs' + ) + }) + + describe('with one out of sync', function () { + beforeEach(function () { + MockWebApi.insertDoc(projectId, docId, { + lines: ['updated', 'mongo', 'lines'], + version: 1, + }) + }) + + it('should detect one project out of sync', async function () { + const result = await runScript({}) + expect(result.code).to.equal(1) + expect(result.stdout).to.include('Processed 2 projects') + expect(result.stdout).to.include( + 'Found 1 projects with 1 out of sync docs' + ) + }) + + it('should write differences to disk', async function () { + const FOLDER = '/tmp/folder' + await fs.promises.rm(FOLDER, { recursive: true, force: true }) + const result = await runScript({ + WRITE_CONTENT: 'true', + FOLDER, + }) + expect(result.code).to.equal(1) + expect(result.stdout).to.include('Processed 2 projects') + expect(result.stdout).to.include( + 'Found 1 projects with 1 out of sync docs' + ) + + const dir = Path.join(FOLDER, projectId, docId) + expect(await fs.promises.readdir(FOLDER)).to.deep.equal([projectId]) + expect(await fs.promises.readdir(dir)).to.deep.equal([ + 'mongo-snapshot.txt', + 'redis-snapshot.txt', + ]) + expect( + await fs.promises.readFile( + Path.join(dir, 'mongo-snapshot.txt'), + 'utf-8' + ) + ).to.equal('updated\nmongo\nlines') + expect( + await fs.promises.readFile( + Path.join(dir, 'redis-snapshot.txt'), + 'utf-8' + ) + ).to.equal('mongo\nlines') + }) + }) + + describe('with both out of sync', function () { + beforeEach(function () { + MockWebApi.insertDoc(projectId, docId, { + lines: ['updated', 'mongo', 'lines'], + version: 1, + }) + MockWebApi.insertDoc(projectId2, docId2, { + lines: ['updated2', 'mongo', 'lines'], + version: 1, + }) + }) + + it('should detect both projects out of sync', async function () { + const result = await runScript({}) + expect(result.code).to.equal(1) + expect(result.stdout).to.include('Processed 2 projects') + expect(result.stdout).to.include( + 'Found 2 projects with 2 out of sync docs' + ) + }) + }) + }) + }) + + describe('with more projects than the LIMIT', function () { + for (let i = 0; i < 20; i++) { + beforeEach(function (done) { + const projectId = DocUpdaterClient.randomId() + const docId = DocUpdaterClient.randomId() + MockWebApi.insertDoc(projectId, docId, { + lines: ['mongo', 'lines'], + version: 1, + }) + DocUpdaterClient.getDoc(projectId, docId, done) + }) + } + + it('should flag limit', async function () { + const result = await runScript({ LIMIT: '2' }) + expect(result.code).to.equal(2) + expect(result.stdout).to.include('Processed 2 projects') + expect(result.stderr).to.include( + 'Found too many un-flushed projects (LIMIT=2). Please fix the reported projects first, then try again.' + ) + }) + + it('should continue with auto-flush', async function () { + const result = await runScript({ + LIMIT: '2', + FLUSH_IN_SYNC_PROJECTS: 'true', + }) + expect(result.code).to.equal(0) + expect(result.stdout).to.include('Processed 20 projects') + }) + }) +})