mirror of
https://github.com/yu-i-i/overleaf-cep.git
synced 2026-05-23 17:19:37 +02:00
Merge pull request #28436 from overleaf/bg-load-changes-directly-from-redis-buffer
load changes directly from redis buffer for getChanges requests GitOrigin-RevId: 99673c47a137ff4222d331fa88eb6e5103270551
This commit is contained in:
@@ -156,11 +156,12 @@ async function getChanges(req, res, next) {
|
||||
})
|
||||
}
|
||||
|
||||
let chunk
|
||||
try {
|
||||
chunk = await chunkStore.loadAtVersion(projectId, since, {
|
||||
preferNewer: true,
|
||||
})
|
||||
const { changes, hasMore } = await chunkStore.getChangesSinceVersion(
|
||||
projectId,
|
||||
since
|
||||
)
|
||||
res.json({ changes: changes.map(change => change.toRaw()), hasMore })
|
||||
} catch (err) {
|
||||
if (err instanceof Chunk.VersionNotFoundError) {
|
||||
return res.status(400).json({
|
||||
@@ -169,14 +170,6 @@ async function getChanges(req, res, next) {
|
||||
}
|
||||
throw err
|
||||
}
|
||||
|
||||
const latestChunkMetadata = await chunkStore.getLatestChunkMetadata(projectId)
|
||||
|
||||
// Extract the relevant changes from the chunk that contains the start version
|
||||
const changes = chunk.getChanges().slice(since - chunk.getStartVersion())
|
||||
const hasMore = latestChunkMetadata.endVersion > chunk.getEndVersion()
|
||||
|
||||
res.json({ changes: changes.map(change => change.toRaw()), hasMore })
|
||||
}
|
||||
|
||||
async function getZip(req, res, next) {
|
||||
|
||||
@@ -24,6 +24,7 @@
|
||||
|
||||
const config = require('config')
|
||||
const OError = require('@overleaf/o-error')
|
||||
const metrics = require('@overleaf/metrics')
|
||||
const { Chunk, History, Snapshot } = require('overleaf-editor-core')
|
||||
|
||||
const assert = require('../assert')
|
||||
@@ -229,6 +230,52 @@ async function loadAtTimestamp(projectId, timestamp, opts = {}) {
|
||||
return new Chunk(history, startVersion)
|
||||
}
|
||||
|
||||
/** Get the changes since a given version (since), including non-persisted changes.
|
||||
* Note that if there are multiple chunks since the given version, the changes from
|
||||
* the first chunk will be returned with a hasMore flag to indicate that there are
|
||||
* more changes available. The 'since' version is exclusive.
|
||||
* @param {string} projectId
|
||||
* @param {number} since - version to get changes since (exclusive)
|
||||
* @return {Promise<{changes: Change[], hasMore: boolean}>} - object with array of changes and boolean indicating if there are more changes available
|
||||
*/
|
||||
async function getChangesSinceVersion(projectId, since) {
|
||||
assert.projectId(projectId, 'bad projectId')
|
||||
assert.integer(since, 'bad since version')
|
||||
|
||||
// First try to get changes directly from Redis buffer
|
||||
const result = await redisBackend.getChangesSinceVersion(projectId, since)
|
||||
if (result.status === 'ok') {
|
||||
// Successfully got changes from Redis, no more changes available beyond what Redis has
|
||||
metrics.inc('chunk_store.get_changes_since_version', 1, { source: 'redis' })
|
||||
return { changes: result.changes || [], hasMore: false }
|
||||
}
|
||||
|
||||
// If status is 'not_found' or 'out_of_bounds', fall through to chunk-based approach
|
||||
const chunk = await loadAtVersion(projectId, since, {
|
||||
preferNewer: true,
|
||||
})
|
||||
|
||||
// Validate that 'since' is within the bounds of the chunk
|
||||
if (since < chunk.getStartVersion()) {
|
||||
throw new VersionOutOfBoundsError('Chunk does not include since version', {
|
||||
projectId,
|
||||
since,
|
||||
})
|
||||
}
|
||||
// Extract the changes after 'since' from the chunk
|
||||
const changes = chunk.getChanges().slice(since - chunk.getStartVersion())
|
||||
|
||||
// Check if there are more changes beyond the current chunk
|
||||
const latestChunkMetadata = await getLatestChunkMetadata(projectId)
|
||||
const hasMore = latestChunkMetadata.endVersion > chunk.getEndVersion()
|
||||
metrics.inc('chunk_store.get_changes_since_version', 1, {
|
||||
source: 'gcs',
|
||||
hasMore: hasMore ? 'true' : 'false',
|
||||
status: result.status,
|
||||
})
|
||||
return { changes, hasMore }
|
||||
}
|
||||
|
||||
/**
|
||||
* Store the chunk and insert corresponding records in the database.
|
||||
*
|
||||
@@ -581,6 +628,7 @@ module.exports = {
|
||||
getProjectChunkIds,
|
||||
getProjectChunks,
|
||||
getProjectChunksFromVersion,
|
||||
getChangesSinceVersion,
|
||||
deleteProjectChunks,
|
||||
deleteOldChunks,
|
||||
AlreadyInitialized,
|
||||
|
||||
@@ -364,9 +364,11 @@ async function getChangesSinceVersion(projectId, version) {
|
||||
if (status === 'ok') {
|
||||
// If status is OK, parse the changes
|
||||
const changes = result[1]
|
||||
? result[1].map(rawChange =>
|
||||
typeof rawChange === 'string' ? JSON.parse(rawChange) : rawChange
|
||||
)
|
||||
? result[1]
|
||||
.map(rawChange =>
|
||||
typeof rawChange === 'string' ? JSON.parse(rawChange) : rawChange
|
||||
)
|
||||
.map(Change.fromRaw)
|
||||
: []
|
||||
|
||||
metrics.inc('chunk_store.redis.get_changes_since_version', 1, {
|
||||
|
||||
@@ -741,6 +741,409 @@ describe('chunkStore', function () {
|
||||
})
|
||||
})
|
||||
|
||||
describe('getChangesSinceVersion', function () {
|
||||
describe('single chunk scenarios', function () {
|
||||
let singleChunk
|
||||
|
||||
beforeEach(async function () {
|
||||
// Create a single chunk with start version 0, end version 2
|
||||
const blob = await blobStore.putString('single chunk content')
|
||||
singleChunk = makeChunk(
|
||||
[
|
||||
makeChange(
|
||||
Operation.addFile(
|
||||
'file1.tex',
|
||||
File.createLazyFromBlobs(blob)
|
||||
),
|
||||
new Date('2020-01-01T00:00:00')
|
||||
),
|
||||
makeChange(
|
||||
Operation.addFile(
|
||||
'file2.tex',
|
||||
File.createLazyFromBlobs(blob)
|
||||
),
|
||||
new Date('2020-01-01T00:01:00')
|
||||
),
|
||||
],
|
||||
0
|
||||
)
|
||||
await chunkStore.update(projectId, singleChunk)
|
||||
singleChunk = await chunkStore.loadLatest(projectId)
|
||||
})
|
||||
|
||||
describe('without Redis changes', function () {
|
||||
it('returns empty changes when since equals latest version', async function () {
|
||||
const result = await chunkStore.getChangesSinceVersion(
|
||||
projectId,
|
||||
2
|
||||
)
|
||||
expect(result.changes).to.have.length(0)
|
||||
expect(result.hasMore).to.be.false
|
||||
})
|
||||
|
||||
it('returns all changes when since is 0', async function () {
|
||||
const result = await chunkStore.getChangesSinceVersion(
|
||||
projectId,
|
||||
0
|
||||
)
|
||||
expect(result.changes).to.have.length(2)
|
||||
expect(result.hasMore).to.be.false
|
||||
expect(result.changes).to.deep.equal(singleChunk.getChanges())
|
||||
})
|
||||
|
||||
it('returns subset of changes when since is 1', async function () {
|
||||
const result = await chunkStore.getChangesSinceVersion(
|
||||
projectId,
|
||||
1
|
||||
)
|
||||
expect(result.changes).to.have.length(1)
|
||||
expect(result.hasMore).to.be.false
|
||||
expect(result.changes).to.deep.equal([
|
||||
singleChunk.getChanges()[1],
|
||||
])
|
||||
})
|
||||
|
||||
it('throws error when since is negative', async function () {
|
||||
await expect(
|
||||
chunkStore.getChangesSinceVersion(projectId, -1)
|
||||
).to.be.rejectedWith(VersionNotFoundError)
|
||||
})
|
||||
|
||||
it('throws VersionNotFoundError when since is beyond latest version', async function () {
|
||||
await expect(
|
||||
chunkStore.getChangesSinceVersion(projectId, 10)
|
||||
).to.be.rejectedWith(VersionNotFoundError)
|
||||
})
|
||||
})
|
||||
|
||||
describe('with Redis changes', function () {
|
||||
let queuedChanges
|
||||
|
||||
beforeEach(async function () {
|
||||
const snapshot = singleChunk.getSnapshot()
|
||||
snapshot.applyAll(singleChunk.getChanges())
|
||||
const blob = await blobStore.putString('redis content')
|
||||
queuedChanges = [
|
||||
makeChange(
|
||||
Operation.addFile(
|
||||
'redis1.tex',
|
||||
File.createLazyFromBlobs(blob)
|
||||
),
|
||||
new Date('2020-01-01T00:02:00')
|
||||
),
|
||||
makeChange(
|
||||
Operation.addFile(
|
||||
'redis2.tex',
|
||||
File.createLazyFromBlobs(blob)
|
||||
),
|
||||
new Date('2020-01-01T00:03:00')
|
||||
),
|
||||
]
|
||||
await redisBackend.queueChanges(
|
||||
projectId,
|
||||
snapshot,
|
||||
singleChunk.getEndVersion(),
|
||||
queuedChanges
|
||||
)
|
||||
})
|
||||
|
||||
it('returns Redis changes when since equals chunk end version', async function () {
|
||||
const result = await chunkStore.getChangesSinceVersion(
|
||||
projectId,
|
||||
2
|
||||
)
|
||||
expect(result.changes).to.have.length(2)
|
||||
expect(result.hasMore).to.be.false
|
||||
expect(result.changes).to.deep.equal(queuedChanges)
|
||||
})
|
||||
|
||||
it('returns partial Redis changes when since is within Redis buffer', async function () {
|
||||
const result = await chunkStore.getChangesSinceVersion(
|
||||
projectId,
|
||||
3
|
||||
)
|
||||
expect(result.changes).to.have.length(1)
|
||||
expect(result.hasMore).to.be.false
|
||||
expect(result.changes).to.deep.equal([queuedChanges[1]])
|
||||
})
|
||||
|
||||
it('returns chunk changes plus Redis changes when since is within chunk', async function () {
|
||||
const result = await chunkStore.getChangesSinceVersion(
|
||||
projectId,
|
||||
1
|
||||
)
|
||||
expect(result.changes).to.have.length(3)
|
||||
expect(result.hasMore).to.be.false
|
||||
// Should contain the second chunk change plus Redis changes
|
||||
const expectedChanges = [singleChunk.getChanges()[1]].concat(
|
||||
queuedChanges
|
||||
)
|
||||
expect(result.changes).to.deep.equal(expectedChanges)
|
||||
})
|
||||
|
||||
it('returns empty changes when since equals current head version', async function () {
|
||||
const result = await chunkStore.getChangesSinceVersion(
|
||||
projectId,
|
||||
4
|
||||
)
|
||||
expect(result.changes).to.have.length(0)
|
||||
expect(result.hasMore).to.be.false
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe('multiple chunks scenarios', function () {
|
||||
let firstChunk, secondChunk, thirdChunk
|
||||
|
||||
beforeEach(async function () {
|
||||
// Reuse the existing multiple chunks setup
|
||||
const blob = await blobStore.putString('')
|
||||
firstChunk = makeChunk(
|
||||
[
|
||||
makeChange(
|
||||
Operation.addFile('foo.tex', File.createLazyFromBlobs(blob)),
|
||||
new Date('2015-01-01T00:00:00')
|
||||
),
|
||||
makeChange(
|
||||
Operation.addFile('bar.tex', File.createLazyFromBlobs(blob)),
|
||||
new Date('2015-01-01T00:01:00')
|
||||
),
|
||||
],
|
||||
0
|
||||
)
|
||||
await chunkStore.update(projectId, firstChunk)
|
||||
|
||||
secondChunk = makeChunk(
|
||||
[
|
||||
makeChange(
|
||||
Operation.addFile('baz.tex', File.createLazyFromBlobs(blob)),
|
||||
new Date('2016-01-01T00:00:00')
|
||||
),
|
||||
makeChange(
|
||||
Operation.addFile('qux.tex', File.createLazyFromBlobs(blob)),
|
||||
new Date('2016-01-01T00:01:00')
|
||||
),
|
||||
],
|
||||
2
|
||||
)
|
||||
await chunkStore.create(projectId, secondChunk)
|
||||
|
||||
thirdChunk = makeChunk(
|
||||
[
|
||||
makeChange(
|
||||
Operation.addFile('quux.tex', File.createLazyFromBlobs(blob)),
|
||||
new Date('2017-01-01T00:00:00')
|
||||
),
|
||||
makeChange(
|
||||
Operation.addFile(
|
||||
'barbar.tex',
|
||||
File.createLazyFromBlobs(blob)
|
||||
),
|
||||
new Date('2017-01-01T00:01:00')
|
||||
),
|
||||
],
|
||||
4
|
||||
)
|
||||
await chunkStore.create(projectId, thirdChunk)
|
||||
|
||||
// Load the actual chunks for comparison
|
||||
firstChunk = await chunkStore.loadAtVersion(projectId, 1)
|
||||
secondChunk = await chunkStore.loadAtVersion(projectId, 3)
|
||||
thirdChunk = await chunkStore.loadAtVersion(projectId, 5)
|
||||
})
|
||||
|
||||
describe('without Redis changes', function () {
|
||||
it('returns changes from first chunk when since is 0', async function () {
|
||||
const result = await chunkStore.getChangesSinceVersion(
|
||||
projectId,
|
||||
0
|
||||
)
|
||||
expect(result.changes).to.have.length(2)
|
||||
expect(result.hasMore).to.be.true
|
||||
expect(result.changes).to.deep.equal(firstChunk.getChanges())
|
||||
})
|
||||
|
||||
it('returns changes from second chunk when since is 2', async function () {
|
||||
const result = await chunkStore.getChangesSinceVersion(
|
||||
projectId,
|
||||
2
|
||||
)
|
||||
expect(result.changes).to.have.length(2)
|
||||
expect(result.hasMore).to.be.true
|
||||
expect(result.changes).to.deep.equal(secondChunk.getChanges())
|
||||
})
|
||||
|
||||
it('returns partial changes from second chunk when since is 3', async function () {
|
||||
const result = await chunkStore.getChangesSinceVersion(
|
||||
projectId,
|
||||
3
|
||||
)
|
||||
expect(result.changes).to.have.length(1)
|
||||
expect(result.hasMore).to.be.true
|
||||
expect(result.changes).to.deep.equal([
|
||||
secondChunk.getChanges()[1],
|
||||
])
|
||||
})
|
||||
|
||||
it('returns changes from third chunk when since is 4', async function () {
|
||||
const result = await chunkStore.getChangesSinceVersion(
|
||||
projectId,
|
||||
4
|
||||
)
|
||||
expect(result.changes).to.have.length(2)
|
||||
expect(result.hasMore).to.be.false
|
||||
expect(result.changes).to.deep.equal(thirdChunk.getChanges())
|
||||
})
|
||||
|
||||
it('returns empty changes when since equals final version', async function () {
|
||||
const result = await chunkStore.getChangesSinceVersion(
|
||||
projectId,
|
||||
6
|
||||
)
|
||||
expect(result.changes).to.have.length(0)
|
||||
expect(result.hasMore).to.be.false
|
||||
})
|
||||
|
||||
it('returns partial changes from third chunk when since is 5', async function () {
|
||||
const result = await chunkStore.getChangesSinceVersion(
|
||||
projectId,
|
||||
5
|
||||
)
|
||||
expect(result.changes).to.have.length(1)
|
||||
expect(result.hasMore).to.be.false
|
||||
expect(result.changes).to.deep.equal([thirdChunk.getChanges()[1]])
|
||||
})
|
||||
})
|
||||
|
||||
describe('with Redis changes', function () {
|
||||
let queuedChanges
|
||||
|
||||
beforeEach(async function () {
|
||||
// Add Redis changes after the third chunk
|
||||
const latestChunk = await chunkStore.loadLatest(projectId)
|
||||
const snapshot = latestChunk.getSnapshot()
|
||||
snapshot.applyAll(latestChunk.getChanges())
|
||||
const blob = await blobStore.putString('redis multi content')
|
||||
queuedChanges = [
|
||||
makeChange(
|
||||
Operation.addFile(
|
||||
'redis-multi1.tex',
|
||||
File.createLazyFromBlobs(blob)
|
||||
),
|
||||
new Date('2017-01-01T00:02:00')
|
||||
),
|
||||
makeChange(
|
||||
Operation.addFile(
|
||||
'redis-multi2.tex',
|
||||
File.createLazyFromBlobs(blob)
|
||||
),
|
||||
new Date('2017-01-01T00:03:00')
|
||||
),
|
||||
]
|
||||
await redisBackend.queueChanges(
|
||||
projectId,
|
||||
snapshot,
|
||||
latestChunk.getEndVersion(),
|
||||
queuedChanges
|
||||
)
|
||||
})
|
||||
|
||||
it('returns changes from second chunk when since is 2', async function () {
|
||||
const result = await chunkStore.getChangesSinceVersion(
|
||||
projectId,
|
||||
2
|
||||
)
|
||||
// Current implementation limitation: when Redis doesn't have the version,
|
||||
// it falls back to chunk-based approach which only returns changes from
|
||||
// the single chunk that contains the start version, not subsequent chunks or Redis
|
||||
expect(result.changes).to.have.length(2) // Only from second chunk
|
||||
expect(result.hasMore).to.be.true // There are more chunks after this one
|
||||
expect(result.changes).to.deep.equal(secondChunk.getChanges())
|
||||
})
|
||||
|
||||
it('returns changes from third chunk including Redis changes when since is 4', async function () {
|
||||
const result = await chunkStore.getChangesSinceVersion(
|
||||
projectId,
|
||||
4
|
||||
)
|
||||
// When requesting changes from the latest chunk, Redis changes are included
|
||||
// because loadAtVersion for the latest chunk includes non-persisted changes
|
||||
expect(result.changes).to.have.length(4) // 2 from third chunk + 2 from Redis
|
||||
expect(result.hasMore).to.be.false // Redis returns hasMore: false
|
||||
const expectedChanges = thirdChunk
|
||||
.getChanges()
|
||||
.concat(queuedChanges)
|
||||
expect(result.changes).to.deep.equal(expectedChanges)
|
||||
})
|
||||
|
||||
it('returns Redis changes when since equals chunk end version', async function () {
|
||||
const result = await chunkStore.getChangesSinceVersion(
|
||||
projectId,
|
||||
6
|
||||
)
|
||||
expect(result.changes).to.have.length(2)
|
||||
expect(result.hasMore).to.be.false
|
||||
expect(result.changes).to.deep.equal(queuedChanges)
|
||||
})
|
||||
|
||||
it('returns partial Redis changes when since is within Redis buffer', async function () {
|
||||
const result = await chunkStore.getChangesSinceVersion(
|
||||
projectId,
|
||||
7
|
||||
)
|
||||
expect(result.changes).to.have.length(1)
|
||||
expect(result.hasMore).to.be.false
|
||||
expect(result.changes).to.deep.equal([queuedChanges[1]])
|
||||
})
|
||||
|
||||
it('returns empty changes when since equals current head version', async function () {
|
||||
const result = await chunkStore.getChangesSinceVersion(
|
||||
projectId,
|
||||
8
|
||||
)
|
||||
expect(result.changes).to.have.length(0)
|
||||
expect(result.hasMore).to.be.false
|
||||
})
|
||||
|
||||
it('iterates through all the changes using the hasMore parameter', async function () {
|
||||
const allChanges = []
|
||||
let currentVersion = 0
|
||||
let hasMore = true
|
||||
|
||||
while (hasMore) {
|
||||
const result = await chunkStore.getChangesSinceVersion(
|
||||
projectId,
|
||||
currentVersion
|
||||
)
|
||||
allChanges.push(...result.changes)
|
||||
hasMore = result.hasMore
|
||||
|
||||
if (hasMore) {
|
||||
// Move to the next version after the last change we received
|
||||
currentVersion += result.changes.length
|
||||
}
|
||||
}
|
||||
|
||||
// Should have collected the changes from all chunks plus Redis
|
||||
const expectedTotalChanges =
|
||||
firstChunk.getChanges().length +
|
||||
secondChunk.getChanges().length +
|
||||
thirdChunk.getChanges().length +
|
||||
queuedChanges.length
|
||||
expect(allChanges).to.have.length(expectedTotalChanges)
|
||||
|
||||
// Verify we got the expected changes in order
|
||||
const expectedChanges = []
|
||||
.concat(firstChunk.getChanges())
|
||||
.concat(secondChunk.getChanges())
|
||||
.concat(thirdChunk.getChanges())
|
||||
.concat(queuedChanges)
|
||||
expect(allChanges).to.deep.equal(expectedChanges)
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe('version checks', function () {
|
||||
beforeEach(async function () {
|
||||
// Create a chunk with start version 0, end version 3
|
||||
|
||||
@@ -470,10 +470,8 @@ describe('chunk buffer Redis backend', function () {
|
||||
expect(result.status).to.equal('ok')
|
||||
expect(result.changes).to.be.an('array').with.lengthOf(2)
|
||||
|
||||
// The changes array should contain the raw changes
|
||||
// Note: We're comparing raw objects, not the Change instances
|
||||
expect(result.changes[0]).to.deep.equal(change2.toRaw())
|
||||
expect(result.changes[1]).to.deep.equal(change3.toRaw())
|
||||
expect(result.changes[0]).to.deep.equal(change2)
|
||||
expect(result.changes[1]).to.deep.equal(change3)
|
||||
})
|
||||
|
||||
it('should return all changes when requested version is earliest available', async function () {
|
||||
@@ -503,9 +501,9 @@ describe('chunk buffer Redis backend', function () {
|
||||
|
||||
expect(result.status).to.equal('ok')
|
||||
expect(result.changes).to.be.an('array').with.lengthOf(3)
|
||||
expect(result.changes[0]).to.deep.equal(change1.toRaw())
|
||||
expect(result.changes[1]).to.deep.equal(change2.toRaw())
|
||||
expect(result.changes[2]).to.deep.equal(change3.toRaw())
|
||||
expect(result.changes[0]).to.deep.equal(change1)
|
||||
expect(result.changes[1]).to.deep.equal(change2)
|
||||
expect(result.changes[2]).to.deep.equal(change3)
|
||||
})
|
||||
})
|
||||
|
||||
|
||||
Reference in New Issue
Block a user