Merge pull request #25241 from overleaf/bg-remove-existing-chunk-buffer

remove existing chunk redis backend and chunk buffer

GitOrigin-RevId: 28fb02d1802312de6892e2fb7dd59191e3fc8914
This commit is contained in:
Brian Gough
2025-05-07 09:27:15 +01:00
committed by Copybot
parent 5cc0895c56
commit f9b36cd5be
9 changed files with 1845 additions and 1652 deletions
-1
View File
@@ -1,7 +1,6 @@
exports.BatchBlobStore = require('./lib/batch_blob_store')
exports.blobHash = require('./lib/blob_hash')
exports.HashCheckBlobStore = require('./lib/hash_check_blob_store')
exports.chunkBuffer = require('./lib/chunk_buffer')
exports.chunkStore = require('./lib/chunk_store')
exports.historyStore = require('./lib/history_store').historyStore
exports.knex = require('./lib/knex')
@@ -1,39 +0,0 @@
'use strict'
/**
* @module storage/lib/chunk_buffer
*/
const chunkStore = require('../chunk_store')
const redisBackend = require('../chunk_store/redis')
const metrics = require('@overleaf/metrics')
/**
* Load the latest Chunk stored for a project, including blob metadata.
*
* @param {string} projectId
* @return {Promise.<Chunk>}
*/
async function loadLatest(projectId) {
const chunkRecord = await chunkStore.loadLatestRaw(projectId)
const cachedChunk = await redisBackend.getCurrentChunkIfValid(
projectId,
chunkRecord
)
if (cachedChunk) {
metrics.inc('chunk_buffer.loadLatest', 1, {
status: 'cache-hit',
})
return cachedChunk
} else {
metrics.inc('chunk_buffer.loadLatest', 1, {
status: 'cache-miss',
})
const chunk = await chunkStore.loadLatest(projectId)
await redisBackend.setCurrentChunk(projectId, chunk)
return chunk
}
}
module.exports = {
loadLatest,
}
@@ -1,7 +1,13 @@
const OError = require('@overleaf/o-error')
class ChunkVersionConflictError extends OError {}
class BaseVersionConflictError extends OError {}
class JobNotFoundError extends OError {}
class JobNotReadyError extends OError {}
module.exports = {
ChunkVersionConflictError,
BaseVersionConflictError,
JobNotFoundError,
JobNotReadyError,
}
File diff suppressed because it is too large Load Diff
@@ -2,7 +2,7 @@ const logger = require('@overleaf/logger')
const commandLineArgs = require('command-line-args') // Add this line
const redis = require('../lib/redis')
const { scanRedisCluster, extractKeyId } = require('../lib/scan')
const { expireCurrentChunk } = require('../lib/chunk_store/redis')
const { expireProject, claimExpireJob } = require('../lib/chunk_store/redis')
const rclient = redis.rclientHistory
const EXPIRE_TIME_KEY_PATTERN = `expire-time:{*}`
@@ -30,24 +30,42 @@ function isExpiredKey(expireTimestamp, currentTime) {
return currentTime > expireTime
}
async function processKeysBatch(keysBatch, rclient) {
async function fetchTimestamps(projectIds, rclient) {
const expireTimeKeys = projectIds.map(id => `expire-time:{${id}}`)
// For efficiency, we use MGET to fetch all the timestamps in a single request
const expireTimestamps = await rclient.mget(expireTimeKeys)
// Return an array of objects with projectId and expireTimestamp
const results = projectIds.map((projectId, index) => ({
projectId,
expireTimestamp: expireTimestamps[index],
}))
return results
}
async function processKeysBatch(projectIds, rclient) {
let clearedKeyCount = 0
if (keysBatch.length === 0) {
if (projectIds.length === 0) {
return 0
}
// For efficiency, we use MGET to fetch all the timestamps in a single request
const expireTimestamps = await rclient.mget(keysBatch)
const projects = await fetchTimestamps(projectIds, rclient)
const currentTime = Date.now()
for (let i = 0; i < keysBatch.length; i++) {
const key = keysBatch[i]
for (const project of projects) {
const { projectId, expireTimestamp } = project
// For each key, do a quick check to see if the key is expired before calling
// the LUA script to expire the chunk atomically.
if (isExpiredKey(expireTimestamps[i], currentTime)) {
const projectId = extractKeyId(key)
if (isExpiredKey(expireTimestamp, currentTime)) {
if (DRY_RUN) {
logger.info({ projectId }, '[Dry Run] Would expire chunk for project')
} else {
await expireCurrentChunk(projectId)
try {
const job = await claimExpireJob(projectId)
await expireProject(projectId)
await job.close()
} catch (err) {
logger.error({ projectId, err }, 'error expiring chunk for project')
continue
}
}
clearedKeyCount++
}
@@ -61,7 +79,6 @@ async function expireRedisChunks() {
const START_TIME = Date.now()
if (DRY_RUN) {
// Use global DRY_RUN
logger.info({}, 'starting expireRedisChunks scan in DRY RUN mode')
} else {
logger.info({}, 'starting expireRedisChunks scan')
@@ -72,7 +89,10 @@ async function expireRedisChunks() {
EXPIRE_TIME_KEY_PATTERN
)) {
scannedKeyCount += keysBatch.length
clearedKeyCount += await processKeysBatch(keysBatch, rclient)
clearedKeyCount += await processKeysBatch(
keysBatch.map(extractKeyId),
rclient
)
if (scannedKeyCount % 1000 === 0) {
logger.info(
{ scannedKeyCount, clearedKeyCount },
@@ -92,7 +112,13 @@ async function expireRedisChunks() {
await redis.disconnect()
}
expireRedisChunks().catch(err => {
logger.fatal({ err }, 'unhandled error in expireRedisChunks')
process.exit(1)
})
// Check if the script is being run directly
if (require.main === module) {
expireRedisChunks().catch(err => {
logger.fatal({ err }, 'unhandled error in expireRedisChunks')
process.exit(1)
})
} else {
// Export the function for module usage
module.exports = { expireRedisChunks }
}
@@ -22,7 +22,6 @@ const TextOperation = core.TextOperation
const V2DocVersions = core.V2DocVersions
const knex = require('../../../../storage').knex
const redis = require('../../../../storage/lib/chunk_store/redis')
describe('history import', function () {
beforeEach(cleanup.everything)
@@ -595,10 +594,6 @@ describe('history import', function () {
testFiles.NULL_CHARACTERS_TXT_BYTE_LENGTH
)
})
.then(() => {
// Now clear the cache because we have changed the string length in the database
return redis.clearCache(testProjectId)
})
.then(importChanges)
.then(getLatestContent)
.then(response => {
@@ -1,351 +0,0 @@
'use strict'
const { expect } = require('chai')
const sinon = require('sinon')
const {
Chunk,
Snapshot,
History,
File,
AddFileOperation,
EditFileOperation,
AddCommentOperation,
TextOperation,
Range,
TrackingProps,
Change,
} = require('overleaf-editor-core')
const cleanup = require('./support/cleanup')
const fixtures = require('./support/fixtures')
const chunkBuffer = require('../../../../storage/lib/chunk_buffer')
const chunkStore = require('../../../../storage/lib/chunk_store')
const redisBackend = require('../../../../storage/lib/chunk_store/redis')
const metrics = require('@overleaf/metrics')
describe('chunk buffer', function () {
beforeEach(cleanup.everything)
beforeEach(fixtures.create)
beforeEach(function () {
sinon.spy(metrics, 'inc')
})
afterEach(function () {
metrics.inc.restore()
})
const projectId = '123456'
describe('loadLatest', function () {
// Initialize project and create a test chunk
beforeEach(async function () {
// Initialize project in chunk store
await chunkStore.initializeProject(projectId)
})
describe('with an existing chunk', function () {
beforeEach(async function () {
// Create a sample chunk with some content
const snapshot = new Snapshot()
const changes = [
new Change(
[new AddFileOperation('test.tex', File.fromString('Hello World'))],
new Date(),
[]
),
]
const history = new History(snapshot, changes)
const chunk = new Chunk(history, 1) // startVersion 1
// Store the chunk directly in the chunk store using create method
// which internally calls uploadChunk
await chunkStore.create(projectId, chunk)
// Clear any existing cache
await redisBackend.clearCache(projectId)
})
it('should load from chunk store and update cache on first access (cache miss)', async function () {
// Load the underlying chunk from the chunk store for verification
const storedChunk = await chunkStore.loadLatest(projectId)
// First access should load from chunk store and populate cache
const firstResult = await chunkBuffer.loadLatest(projectId)
// Verify the chunk is correct
expect(firstResult).to.not.be.null
expect(firstResult.getStartVersion()).to.equal(1)
expect(firstResult.getEndVersion()).to.equal(2)
// Verify the chunk is the same as the one in the store
expect(firstResult).to.deep.equal(storedChunk)
// Verify that we got a cache miss metric
expect(
metrics.inc.calledWith('chunk_buffer.loadLatest', 1, {
status: 'cache-miss',
})
).to.be.true
// Reset the metrics spy
metrics.inc.resetHistory()
// Second access should hit the cache
const secondResult = await chunkBuffer.loadLatest(projectId)
// Verify we got the same chunk
expect(secondResult).to.not.be.null
expect(secondResult.getStartVersion()).to.equal(1)
expect(secondResult.getEndVersion()).to.equal(2)
// Verify the chunk is the same as the one in the store
expect(secondResult).to.deep.equal(storedChunk)
// Verify that we got a cache hit metric
expect(
metrics.inc.calledWith('chunk_buffer.loadLatest', 1, {
status: 'cache-hit',
})
).to.be.true
// Verify both chunks are equivalent
expect(secondResult.getStartVersion()).to.equal(
firstResult.getStartVersion()
)
expect(secondResult.getEndVersion()).to.equal(
firstResult.getEndVersion()
)
expect(secondResult).to.deep.equal(firstResult)
})
it('should refresh the cache when chunk changes in the store', async function () {
// First access to load into cache
const firstResult = await chunkBuffer.loadLatest(projectId)
expect(firstResult.getStartVersion()).to.equal(1)
// Reset metrics spy
metrics.inc.resetHistory()
// Create a new chunk with different content
const newSnapshot = new Snapshot()
const newChanges = [
new Change(
[
new AddFileOperation(
'updated.tex',
File.fromString('Updated content')
),
],
new Date(),
[]
),
]
const newHistory = new History(newSnapshot, newChanges)
const newChunk = new Chunk(newHistory, 2) // Different start version
// Store the new chunk directly in the chunk store
await chunkStore.create(projectId, newChunk)
// Load the underlying chunk from the chunk store for verification
const storedChunk = await chunkStore.loadLatest(projectId)
// Access again - should detect the change and refresh cache
const secondResult = await chunkBuffer.loadLatest(projectId)
// Verify we got the updated chunk
expect(secondResult.getStartVersion()).to.equal(2)
expect(secondResult.getEndVersion()).to.equal(3)
// Verify that the chunk content is the same
expect(secondResult).to.deep.equal(storedChunk)
// Verify that we got a cache miss metric (since the cached chunk was invalidated)
expect(
metrics.inc.calledWith('chunk_buffer.loadLatest', 1, {
status: 'cache-miss',
})
).to.be.true
})
it('should continue using cache when chunk in store has not changed', async function () {
// Load the underlying chunk from the chunk store for verification
const storedChunk = await chunkStore.loadLatest(projectId)
// First access to load into cache
await chunkBuffer.loadLatest(projectId)
// Reset metrics spy
metrics.inc.resetHistory()
// Access again without changing the underlying chunk
const result = await chunkBuffer.loadLatest(projectId)
// Verify we got the same chunk
expect(result.getStartVersion()).to.equal(1)
expect(result.getEndVersion()).to.equal(2)
expect(result).to.deep.equal(storedChunk)
// Verify that we got a cache hit metric
expect(
metrics.inc.calledWith('chunk_buffer.loadLatest', 1, {
status: 'cache-hit',
})
).to.be.true
})
})
it('should handle a chunk with metadata, comments and tracked changes', async function () {
// Create a snapshot and initial file
const snapshot = new Snapshot()
const initialFileOp = new AddFileOperation(
'test.tex',
File.fromString('Initial line.\\nSecond line.', {
meta1: 'abc',
meta2: 'def',
})
)
const initialChange = new Change([initialFileOp], new Date(), [])
// Add a comment
const commentOp = new AddCommentOperation(
'comment1',
[new Range(0, 7)] // Range for "Initial"
)
const commentChange = new Change(
[new EditFileOperation('test.tex', commentOp)],
new Date(),
[]
)
// Tracked insert
const trackedInsertOp = new TextOperation()
.retain(14)
.insert('Hello', {
commentIds: ['comment1'],
tracking: TrackingProps.fromRaw({
ts: '2024-01-01T00:00:00.000Z',
type: 'insert',
userId: 'user1',
}),
})
.retain(12)
const insertChange = new Change(
[new EditFileOperation('test.tex', trackedInsertOp)],
new Date(),
[]
)
// Tracked delete
const trackedDeleteOp = new TextOperation().retain(14, {
tracking: TrackingProps.fromRaw({
ts: '2024-01-01T00:00:00.000Z',
type: 'delete',
userId: 'user1',
}),
})
const deleteChange = new Change(
[new EditFileOperation('test.tex', trackedDeleteOp)],
new Date(),
[]
)
// Combine changes into history and create chunk
const history = new History(snapshot, [
initialChange,
commentChange,
insertChange,
deleteChange,
])
const chunk = new Chunk(history, 1) // Start version 0
// Store the chunk
await chunkStore.create(projectId, chunk)
// Clear the cache
await redisBackend.clearCache(projectId)
metrics.inc.resetHistory()
// Load the underlying chunk from the chunk store for verification
const storedChunk = await chunkStore.loadLatest(projectId)
// Load the chunk via buffer (cache miss)
const firstResult = await chunkBuffer.loadLatest(projectId)
// Verify chunk details
expect(firstResult.getStartVersion()).to.equal(1)
expect(firstResult.getEndVersion()).to.equal(5) // 4 changes
expect(firstResult.history.changes.length).to.equal(4)
expect(firstResult).to.deep.equal(storedChunk)
// Verify cache miss metric
expect(
metrics.inc.calledWith('chunk_buffer.loadLatest', 1, {
status: 'cache-miss',
})
).to.be.true
// Reset metrics
metrics.inc.resetHistory()
// Second access should hit the cache
const secondResult = await chunkBuffer.loadLatest(projectId)
// Verify we got the same chunk
expect(secondResult.getStartVersion()).to.equal(1)
expect(secondResult.getEndVersion()).to.equal(5)
expect(secondResult.history.changes.length).to.equal(4)
expect(secondResult).to.deep.equal(storedChunk)
// Verify cache hit metric
expect(
metrics.inc.calledWith('chunk_buffer.loadLatest', 1, {
status: 'cache-hit',
})
).to.be.true
})
describe('with an empty project', function () {
it('should handle a case with empty chunks (no changes)', async function () {
// Clear the cache
await redisBackend.clearCache(projectId)
// Load the underlying chunk from the chunk store for verification
const storedChunk = await chunkStore.loadLatest(projectId)
// Load the initial empty chunk via buffer
const result = await chunkBuffer.loadLatest(projectId)
// Verify we got the empty chunk
expect(result.getStartVersion()).to.equal(0)
expect(result.getEndVersion()).to.equal(0) // Start equals end for empty chunks
expect(result.history.changes.length).to.equal(0)
// Verify that the chunk is the same as the one in the store
expect(result).to.deep.equal(storedChunk)
// Verify cache miss metric
expect(
metrics.inc.calledWith('chunk_buffer.loadLatest', 1, {
status: 'cache-miss',
})
).to.be.true
// Reset metrics
metrics.inc.resetHistory()
// Second access should hit the cache
const secondResult = await chunkBuffer.loadLatest(projectId)
// Verify we got the same empty chunk
expect(secondResult.getStartVersion()).to.equal(0)
expect(secondResult.getEndVersion()).to.equal(0)
expect(secondResult.history.changes.length).to.equal(0)
// Verify that the chunk is the same as the one in the store
expect(secondResult).to.deep.equal(storedChunk)
// Verify cache hit metric
expect(
metrics.inc.calledWith('chunk_buffer.loadLatest', 1, {
status: 'cache-hit',
})
).to.be.true
})
})
})
})
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,209 @@
'use strict'
const { expect } = require('chai')
const { promisify } = require('node:util')
const { execFile } = require('node:child_process')
const { Snapshot, Author, Change } = require('overleaf-editor-core')
const cleanup = require('./support/cleanup')
const redisBackend = require('../../../../storage/lib/chunk_store/redis')
const redis = require('../../../../storage/lib/redis')
const rclient = redis.rclientHistory
const keySchema = redisBackend.keySchema
const SCRIPT_PATH = 'storage/scripts/expire_redis_chunks.js'
async function runExpireScript() {
const TIMEOUT = 10 * 1000 // 10 seconds
let result
try {
result = await promisify(execFile)('node', [SCRIPT_PATH], {
encoding: 'utf-8',
timeout: TIMEOUT,
env: {
...process.env,
LOG_LEVEL: 'debug', // Override LOG_LEVEL for script output
},
})
result.status = 0
} catch (err) {
const { stdout, stderr, code } = err
if (typeof code !== 'number') {
console.error('Error running expire script:', err)
throw err
}
result = { stdout, stderr, status: code }
}
// The script might exit with status 1 if it finds no keys to process, which is ok
if (result.status !== 0 && result.status !== 1) {
console.error('Expire script failed:', result.stderr)
throw new Error(`expire script failed with status ${result.status}`)
}
return result
}
// Helper to set up a basic project state in Redis
async function setupProjectState(
projectId,
{
headVersion = 0,
persistedVersion = null,
expireTime = null,
persistTime = null,
changes = [],
}
) {
const headSnapshot = new Snapshot()
await rclient.set(
keySchema.head({ projectId }),
JSON.stringify(headSnapshot.toRaw())
)
await rclient.set(
keySchema.headVersion({ projectId }),
headVersion.toString()
)
if (persistedVersion !== null) {
await rclient.set(
keySchema.persistedVersion({ projectId }),
persistedVersion.toString()
)
}
if (expireTime !== null) {
await rclient.set(
keySchema.expireTime({ projectId }),
expireTime.toString()
)
}
if (persistTime !== null) {
await rclient.set(
keySchema.persistTime({ projectId }),
persistTime.toString()
)
}
if (changes.length > 0) {
const rawChanges = changes.map(c => JSON.stringify(c.toRaw()))
await rclient.rpush(keySchema.changes({ projectId }), ...rawChanges)
}
}
function makeChange() {
const timestamp = new Date()
const author = new Author(123, 'test@example.com', 'Test User')
return new Change([], timestamp, [author])
}
describe('expire_redis_chunks script', function () {
beforeEach(cleanup.everything)
let now, past, future
// Setup all projects and run the script once before tests
beforeEach(async function () {
now = Date.now()
past = now - 10000 // 10 seconds ago
future = now + 60000 // 1 minute in the future
// Setup all project states explicitly
await setupProjectState('expired_persisted', {
headVersion: 2,
persistedVersion: 2,
expireTime: past,
})
await setupProjectState('expired_initial_state', {
headVersion: 0,
persistedVersion: 0,
expireTime: past,
})
await setupProjectState('expired_persisted_with_job', {
headVersion: 2,
persistedVersion: 2,
expireTime: past,
persistTime: future,
})
await setupProjectState('expired_not_persisted', {
headVersion: 3,
persistedVersion: 2,
expireTime: past,
changes: [makeChange()],
})
await setupProjectState('expired_no_persisted_version', {
headVersion: 1,
persistedVersion: null,
expireTime: past,
changes: [makeChange()],
})
await setupProjectState('future_expired_persisted', {
headVersion: 2,
persistedVersion: 2,
expireTime: future,
})
await setupProjectState('future_expired_not_persisted', {
headVersion: 3,
persistedVersion: 2,
expireTime: future,
changes: [makeChange()],
})
await setupProjectState('no_expire_time', {
headVersion: 1,
persistedVersion: 1,
expireTime: null,
})
// Run the expire script once after all projects are set up
await runExpireScript()
})
async function checkProjectStatus(projectId) {
const exists =
(await rclient.exists(keySchema.headVersion({ projectId }))) === 1
return exists ? 'exists' : 'deleted'
}
it('should expire a project when expireTime is past and it is fully persisted', async function () {
const projectId = 'expired_persisted'
const status = await checkProjectStatus(projectId)
expect(status).to.equal('deleted')
})
it('should expire a project when expireTime is past and it has no changes (initial state)', async function () {
const projectId = 'expired_initial_state'
const status = await checkProjectStatus(projectId)
expect(status).to.equal('deleted')
})
it('should expire a project when expireTime is past and it is fully persisted even if persistTime is set', async function () {
const projectId = 'expired_persisted_with_job'
const status = await checkProjectStatus(projectId)
expect(status).to.equal('deleted')
})
it('should not expire a project when expireTime is past but it is not fully persisted', async function () {
const projectId = 'expired_not_persisted'
const status = await checkProjectStatus(projectId)
expect(status).to.equal('exists')
})
it('should not expire a project when expireTime is past but persistedVersion is not set', async function () {
const projectId = 'expired_no_persisted_version'
const status = await checkProjectStatus(projectId)
expect(status).to.equal('exists')
})
it('should not expire a project when expireTime is in the future (even if fully persisted)', async function () {
const projectId = 'future_expired_persisted'
const status = await checkProjectStatus(projectId)
expect(status).to.equal('exists')
})
it('should not expire a project when expireTime is in the future (if not fully persisted)', async function () {
const projectId = 'future_expired_not_persisted'
const status = await checkProjectStatus(projectId)
expect(status).to.equal('exists')
})
it('should not expire a project when expireTime is not set', async function () {
const projectId = 'no_expire_time'
const status = await checkProjectStatus(projectId)
expect(status).to.equal('exists')
})
})