mirror of
https://github.com/yu-i-i/overleaf-cep.git
synced 2026-05-25 02:00:10 +02:00
Merge pull request #24906 from overleaf/bg-history-redis-read-cache
implement read cache for history-v1 chunks GitOrigin-RevId: 128de7e9380fd489f68d5045d3333a27018845c2
This commit is contained in:
@@ -18,6 +18,7 @@ const {
|
||||
HashCheckBlobStore,
|
||||
ProjectArchive,
|
||||
zipStore,
|
||||
chunkBuffer,
|
||||
} = require('../../storage')
|
||||
|
||||
const render = require('./render')
|
||||
@@ -44,7 +45,7 @@ async function initializeProject(req, res, next) {
|
||||
async function getLatestContent(req, res, next) {
|
||||
const projectId = req.swagger.params.project_id.value
|
||||
const blobStore = new BlobStore(projectId)
|
||||
const chunk = await chunkStore.loadLatest(projectId)
|
||||
const chunk = await chunkBuffer.loadLatest(projectId)
|
||||
const snapshot = chunk.getSnapshot()
|
||||
snapshot.applyAll(chunk.getChanges())
|
||||
await snapshot.loadFiles('eager', blobStore)
|
||||
@@ -63,7 +64,7 @@ async function getContentAtVersion(req, res, next) {
|
||||
async function getLatestHashedContent(req, res, next) {
|
||||
const projectId = req.swagger.params.project_id.value
|
||||
const blobStore = new HashCheckBlobStore(new BlobStore(projectId))
|
||||
const chunk = await chunkStore.loadLatest(projectId)
|
||||
const chunk = await chunkBuffer.loadLatest(projectId)
|
||||
const snapshot = chunk.getSnapshot()
|
||||
snapshot.applyAll(chunk.getChanges())
|
||||
await snapshot.loadFiles('eager', blobStore)
|
||||
@@ -74,7 +75,7 @@ async function getLatestHashedContent(req, res, next) {
|
||||
async function getLatestHistory(req, res, next) {
|
||||
const projectId = req.swagger.params.project_id.value
|
||||
try {
|
||||
const chunk = await chunkStore.loadLatest(projectId)
|
||||
const chunk = await chunkBuffer.loadLatest(projectId)
|
||||
const chunkResponse = new ChunkResponse(chunk)
|
||||
res.json(chunkResponse.toRaw())
|
||||
} catch (err) {
|
||||
@@ -153,7 +154,7 @@ async function getChanges(req, res, next) {
|
||||
}
|
||||
|
||||
const changes = []
|
||||
let chunk = await chunkStore.loadLatest(projectId)
|
||||
let chunk = await chunkBuffer.loadLatest(projectId)
|
||||
|
||||
if (since > chunk.getEndVersion()) {
|
||||
return res.status(400).json({
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
exports.BatchBlobStore = require('./lib/batch_blob_store')
|
||||
exports.blobHash = require('./lib/blob_hash')
|
||||
exports.HashCheckBlobStore = require('./lib/hash_check_blob_store')
|
||||
exports.chunkBuffer = require('./lib/chunk_buffer')
|
||||
exports.chunkStore = require('./lib/chunk_store')
|
||||
exports.historyStore = require('./lib/history_store').historyStore
|
||||
exports.knex = require('./lib/knex')
|
||||
|
||||
40
services/history-v1/storage/lib/chunk_buffer/index.js
Normal file
40
services/history-v1/storage/lib/chunk_buffer/index.js
Normal file
@@ -0,0 +1,40 @@
|
||||
'use strict'
|
||||
|
||||
/**
|
||||
* @module storage/lib/chunk_buffer
|
||||
*/
|
||||
|
||||
const chunkStore = require('../chunk_store')
|
||||
const redisBackend = require('../chunk_store/redis')
|
||||
const metrics = require('@overleaf/metrics')
|
||||
/**
|
||||
* Load the latest Chunk stored for a project, including blob metadata.
|
||||
*
|
||||
* @param {string} projectId
|
||||
* @return {Promise.<Chunk>}
|
||||
*/
|
||||
async function loadLatest(projectId) {
|
||||
const cachedChunk = await redisBackend.getCurrentChunk(projectId)
|
||||
const chunkRecord = await chunkStore.loadLatestRaw(projectId)
|
||||
const cachedChunkIsValid = redisBackend.checkCacheValidityWithMetadata(
|
||||
cachedChunk,
|
||||
chunkRecord
|
||||
)
|
||||
if (cachedChunkIsValid) {
|
||||
metrics.inc('chunk_buffer.loadLatest', 1, {
|
||||
status: 'cache-hit',
|
||||
})
|
||||
return cachedChunk
|
||||
} else {
|
||||
metrics.inc('chunk_buffer.loadLatest', 1, {
|
||||
status: 'cache-miss',
|
||||
})
|
||||
const chunk = await chunkStore.loadLatest(projectId)
|
||||
await redisBackend.setCurrentChunk(projectId, chunk)
|
||||
return chunk
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
loadLatest,
|
||||
}
|
||||
@@ -30,7 +30,6 @@ const { BlobStore } = require('../blob_store')
|
||||
const { historyStore } = require('../history_store')
|
||||
const mongoBackend = require('./mongo')
|
||||
const postgresBackend = require('./postgres')
|
||||
const redisBackend = require('./redis')
|
||||
const { ChunkVersionConflictError } = require('./errors')
|
||||
|
||||
const DEFAULT_DELETE_BATCH_SIZE = parseInt(config.get('maxDeleteKeys'), 10)
|
||||
@@ -105,23 +104,13 @@ async function loadLatestRaw(projectId, opts) {
|
||||
* @return {Promise.<Chunk>}
|
||||
*/
|
||||
async function loadLatest(projectId) {
|
||||
// Test out the redis caching backend - not in use yet
|
||||
const cachedChunk = await redisBackend.getCurrentChunk(projectId)
|
||||
const chunkRecord = await loadLatestRaw(projectId)
|
||||
const rawHistory = await historyStore.loadRaw(projectId, chunkRecord.id)
|
||||
const history = History.fromRaw(rawHistory)
|
||||
const blobStore = new BlobStore(projectId)
|
||||
const batchBlobStore = new BatchBlobStore(blobStore)
|
||||
await lazyLoadHistoryFiles(history, batchBlobStore)
|
||||
const chunk = new Chunk(history, chunkRecord.startVersion)
|
||||
// if the cached chunk is no longer valid, update it
|
||||
const cachedChunkIsValid = redisBackend.checkCacheValidity(cachedChunk, chunk)
|
||||
if (!cachedChunkIsValid) {
|
||||
await redisBackend.setCurrentChunk(projectId, chunk)
|
||||
} else {
|
||||
await redisBackend.compareChunks(projectId, cachedChunk, chunk)
|
||||
}
|
||||
return chunk
|
||||
return new Chunk(history, chunkRecord.startVersion)
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -148,7 +148,7 @@ async function setCurrentChunk(projectId, chunk) {
|
||||
|
||||
/**
|
||||
* Checks whether a cached chunk's version metadata matches the current chunk's metadata
|
||||
* @param {Chunk}} cachedChunk - The chunk retrieved from cache
|
||||
* @param {Chunk} cachedChunk - The chunk retrieved from cache
|
||||
* @param {Chunk} currentChunk - The current chunk to compare against
|
||||
* @returns {boolean} - Returns true if the chunks have matching start and end versions, false otherwise
|
||||
*/
|
||||
@@ -160,6 +160,22 @@ function checkCacheValidity(cachedChunk, currentChunk) {
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates if a cached chunk matches the current chunk metadata by comparing versions
|
||||
* @param {Object} cachedChunk - The cached chunk object to validate
|
||||
* @param {Object} currentChunkMetadata - The current chunk metadata to compare against
|
||||
* @param {number} currentChunkMetadata.startVersion - The starting version number
|
||||
* @param {number} currentChunkMetadata.endVersion - The ending version number
|
||||
* @returns {boolean} - True if the cached chunk is valid, false otherwise
|
||||
*/
|
||||
function checkCacheValidityWithMetadata(cachedChunk, currentChunkMetadata) {
|
||||
return Boolean(
|
||||
cachedChunk &&
|
||||
cachedChunk.getStartVersion() === currentChunkMetadata.startVersion &&
|
||||
cachedChunk.getEndVersion() === currentChunkMetadata.endVersion
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Compares two chunks for equality using stringified JSON comparison
|
||||
* @param {string} projectId - The ID of the project
|
||||
@@ -194,10 +210,45 @@ function compareChunks(projectId, cachedChunk, currentChunk) {
|
||||
return identical
|
||||
}
|
||||
|
||||
// Define Lua script for atomic cache clearing
|
||||
rclient.defineCommand('clear_chunk_cache', {
|
||||
numberOfKeys: 3,
|
||||
lua: `
|
||||
-- Delete all keys related to a project's chunk cache atomically
|
||||
redis.call('DEL', KEYS[1]) -- snapshot key
|
||||
redis.call('DEL', KEYS[2]) -- startVersion key
|
||||
redis.call('DEL', KEYS[3]) -- changes key
|
||||
return 1
|
||||
`,
|
||||
})
|
||||
|
||||
/**
|
||||
* Clears all cache entries for a project's chunk data
|
||||
* @param {string} projectId - The ID of the project whose cache should be cleared
|
||||
* @returns {Promise<boolean>} A promise that resolves to true if successful, false on error
|
||||
*/
|
||||
async function clearCache(projectId) {
|
||||
try {
|
||||
const snapshotKey = keySchema.snapshot({ projectId })
|
||||
const startVersionKey = keySchema.startVersion({ projectId })
|
||||
const changesKey = keySchema.changes({ projectId })
|
||||
|
||||
await rclient.clear_chunk_cache(snapshotKey, startVersionKey, changesKey)
|
||||
metrics.inc('chunk_store.redis.clear_cache', 1, { status: 'success' })
|
||||
return true
|
||||
} catch (err) {
|
||||
logger.error({ err, projectId }, 'error clearing chunk cache from redis')
|
||||
metrics.inc('chunk_store.redis.clear_cache', 1, { status: 'error' })
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
getCurrentChunk,
|
||||
setCurrentChunk,
|
||||
getCurrentChunkMetadata,
|
||||
checkCacheValidity,
|
||||
checkCacheValidityWithMetadata,
|
||||
compareChunks,
|
||||
clearCache,
|
||||
}
|
||||
|
||||
@@ -0,0 +1,210 @@
|
||||
'use strict'
|
||||
|
||||
const { expect } = require('chai')
|
||||
const sinon = require('sinon')
|
||||
const {
|
||||
Chunk,
|
||||
Snapshot,
|
||||
History,
|
||||
File,
|
||||
AddFileOperation,
|
||||
Change,
|
||||
} = require('overleaf-editor-core')
|
||||
const cleanup = require('./support/cleanup')
|
||||
const fixtures = require('./support/fixtures')
|
||||
const chunkBuffer = require('../../../../storage/lib/chunk_buffer')
|
||||
const chunkStore = require('../../../../storage/lib/chunk_store')
|
||||
const redisBackend = require('../../../../storage/lib/chunk_store/redis')
|
||||
const metrics = require('@overleaf/metrics')
|
||||
|
||||
describe('chunk buffer', function () {
|
||||
beforeEach(cleanup.everything)
|
||||
beforeEach(fixtures.create)
|
||||
beforeEach(function () {
|
||||
sinon.spy(metrics, 'inc')
|
||||
})
|
||||
afterEach(function () {
|
||||
metrics.inc.restore()
|
||||
})
|
||||
|
||||
const projectId = '123456'
|
||||
|
||||
describe('loadLatest', function () {
|
||||
// Initialize project and create a test chunk
|
||||
beforeEach(async function () {
|
||||
// Initialize project in chunk store
|
||||
await chunkStore.initializeProject(projectId)
|
||||
|
||||
// Create a sample chunk with some content
|
||||
const snapshot = new Snapshot()
|
||||
const changes = [
|
||||
new Change(
|
||||
[new AddFileOperation('test.tex', File.fromString('Hello World'))],
|
||||
new Date(),
|
||||
[]
|
||||
),
|
||||
]
|
||||
const history = new History(snapshot, changes)
|
||||
const chunk = new Chunk(history, 1) // startVersion 1
|
||||
|
||||
// Store the chunk directly in the chunk store using create method
|
||||
// which internally calls uploadChunk
|
||||
await chunkStore.create(projectId, chunk)
|
||||
|
||||
// Clear any existing cache
|
||||
await redisBackend.clearCache(projectId)
|
||||
})
|
||||
|
||||
it('should load from chunk store and update cache on first access (cache miss)', async function () {
|
||||
// First access should load from chunk store and populate cache
|
||||
const firstResult = await chunkBuffer.loadLatest(projectId)
|
||||
|
||||
// Verify the chunk is correct
|
||||
expect(firstResult).to.not.be.null
|
||||
expect(firstResult.getStartVersion()).to.equal(1)
|
||||
expect(firstResult.getEndVersion()).to.equal(2)
|
||||
|
||||
// Verify that we got a cache miss metric
|
||||
expect(
|
||||
metrics.inc.calledWith('chunk_buffer.loadLatest', 1, {
|
||||
status: 'cache-miss',
|
||||
})
|
||||
).to.be.true
|
||||
|
||||
// Reset the metrics spy
|
||||
metrics.inc.resetHistory()
|
||||
|
||||
// Second access should hit the cache
|
||||
const secondResult = await chunkBuffer.loadLatest(projectId)
|
||||
|
||||
// Verify we got the same chunk
|
||||
expect(secondResult).to.not.be.null
|
||||
expect(secondResult.getStartVersion()).to.equal(1)
|
||||
expect(secondResult.getEndVersion()).to.equal(2)
|
||||
|
||||
// Verify that we got a cache hit metric
|
||||
expect(
|
||||
metrics.inc.calledWith('chunk_buffer.loadLatest', 1, {
|
||||
status: 'cache-hit',
|
||||
})
|
||||
).to.be.true
|
||||
|
||||
// Verify both chunks are equivalent
|
||||
expect(secondResult.getStartVersion()).to.equal(
|
||||
firstResult.getStartVersion()
|
||||
)
|
||||
expect(secondResult.getEndVersion()).to.equal(firstResult.getEndVersion())
|
||||
})
|
||||
|
||||
it('should refresh the cache when chunk changes in the store', async function () {
|
||||
// First access to load into cache
|
||||
const firstResult = await chunkBuffer.loadLatest(projectId)
|
||||
expect(firstResult.getStartVersion()).to.equal(1)
|
||||
|
||||
// Reset metrics spy
|
||||
metrics.inc.resetHistory()
|
||||
|
||||
// Create a new chunk with different content
|
||||
const newSnapshot = new Snapshot()
|
||||
const newChanges = [
|
||||
new Change(
|
||||
[
|
||||
new AddFileOperation(
|
||||
'updated.tex',
|
||||
File.fromString('Updated content')
|
||||
),
|
||||
],
|
||||
new Date(),
|
||||
[]
|
||||
),
|
||||
]
|
||||
const newHistory = new History(newSnapshot, newChanges)
|
||||
const newChunk = new Chunk(newHistory, 5) // Different start version
|
||||
|
||||
// Store the new chunk directly in the chunk store
|
||||
await chunkStore.create(projectId, newChunk)
|
||||
|
||||
// Access again - should detect the change and refresh cache
|
||||
const secondResult = await chunkBuffer.loadLatest(projectId)
|
||||
|
||||
// Verify we got the updated chunk
|
||||
expect(secondResult.getStartVersion()).to.equal(5)
|
||||
expect(secondResult.getEndVersion()).to.equal(6)
|
||||
|
||||
// Verify that we got a cache miss metric (since the cached chunk was invalidated)
|
||||
expect(
|
||||
metrics.inc.calledWith('chunk_buffer.loadLatest', 1, {
|
||||
status: 'cache-miss',
|
||||
})
|
||||
).to.be.true
|
||||
})
|
||||
|
||||
it('should continue using cache when chunk in store has not changed', async function () {
|
||||
// First access to load into cache
|
||||
await chunkBuffer.loadLatest(projectId)
|
||||
|
||||
// Reset metrics spy
|
||||
metrics.inc.resetHistory()
|
||||
|
||||
// Access again without changing the underlying chunk
|
||||
const result = await chunkBuffer.loadLatest(projectId)
|
||||
|
||||
// Verify we got the same chunk
|
||||
expect(result.getStartVersion()).to.equal(1)
|
||||
expect(result.getEndVersion()).to.equal(2)
|
||||
|
||||
// Verify that we got a cache hit metric
|
||||
expect(
|
||||
metrics.inc.calledWith('chunk_buffer.loadLatest', 1, {
|
||||
status: 'cache-hit',
|
||||
})
|
||||
).to.be.true
|
||||
})
|
||||
|
||||
it('should handle a case with empty chunks (no changes)', async function () {
|
||||
// Replace with an empty chunk
|
||||
const emptySnapshot = new Snapshot()
|
||||
const emptyHistory = new History(emptySnapshot, [])
|
||||
const emptyChunk = new Chunk(emptyHistory, 10)
|
||||
|
||||
// Store the empty chunk
|
||||
await chunkStore.create(projectId, emptyChunk)
|
||||
|
||||
// Clear the cache
|
||||
await redisBackend.clearCache(projectId)
|
||||
|
||||
// Load the chunk via buffer
|
||||
const result = await chunkBuffer.loadLatest(projectId)
|
||||
|
||||
// Verify we got the empty chunk
|
||||
expect(result.getStartVersion()).to.equal(10)
|
||||
expect(result.getEndVersion()).to.equal(10) // Start equals end for empty chunks
|
||||
expect(result.history.changes.length).to.equal(0)
|
||||
|
||||
// Verify cache miss metric
|
||||
expect(
|
||||
metrics.inc.calledWith('chunk_buffer.loadLatest', 1, {
|
||||
status: 'cache-miss',
|
||||
})
|
||||
).to.be.true
|
||||
|
||||
// Reset metrics
|
||||
metrics.inc.resetHistory()
|
||||
|
||||
// Second access should hit the cache
|
||||
const secondResult = await chunkBuffer.loadLatest(projectId)
|
||||
|
||||
// Verify we got the same empty chunk
|
||||
expect(secondResult.getStartVersion()).to.equal(10)
|
||||
expect(secondResult.getEndVersion()).to.equal(10)
|
||||
expect(secondResult.history.changes.length).to.equal(0)
|
||||
|
||||
// Verify cache hit metric
|
||||
expect(
|
||||
metrics.inc.calledWith('chunk_buffer.loadLatest', 1, {
|
||||
status: 'cache-hit',
|
||||
})
|
||||
).to.be.true
|
||||
})
|
||||
})
|
||||
})
|
||||
Reference in New Issue
Block a user