mirror of
https://github.com/yu-i-i/overleaf-cep.git
synced 2026-05-31 04:41:32 +02:00
Merge pull request #25306 from overleaf/em-redis-buffer-read-operations
Add changes from Redis when reading chunks from the chunk store GitOrigin-RevId: c0ebf0669b91eb2efc5d1091d025e81efdff9fe4
This commit is contained in:
@@ -95,7 +95,9 @@ async function importChanges(req, res, next) {
|
||||
}
|
||||
|
||||
async function buildResultSnapshot(resultChunk) {
|
||||
const chunk = resultChunk || (await chunkStore.loadLatest(projectId))
|
||||
const chunk =
|
||||
resultChunk ||
|
||||
(await chunkStore.loadLatest(projectId, { persistedOnly: true }))
|
||||
const snapshot = chunk.getSnapshot()
|
||||
snapshot.applyAll(chunk.getChanges())
|
||||
const rawSnapshot = await snapshot.store(hashCheckBlobStore)
|
||||
|
||||
@@ -31,7 +31,8 @@ async function lookBehindForSeenBlobs(
|
||||
// so we find the set of backed up blobs from the previous chunk
|
||||
const previousChunk = await chunkStore.loadAtVersion(
|
||||
projectId,
|
||||
lastBackedUpVersion
|
||||
lastBackedUpVersion,
|
||||
{ persistedOnly: true }
|
||||
)
|
||||
const previousChunkHistory = previousChunk.getHistory()
|
||||
previousChunkHistory.findBlobHashes(seenBlobs)
|
||||
|
||||
@@ -4,10 +4,12 @@ class ChunkVersionConflictError extends OError {}
|
||||
class BaseVersionConflictError extends OError {}
|
||||
class JobNotFoundError extends OError {}
|
||||
class JobNotReadyError extends OError {}
|
||||
class VersionOutOfBoundsError extends OError {}
|
||||
|
||||
module.exports = {
|
||||
ChunkVersionConflictError,
|
||||
BaseVersionConflictError,
|
||||
JobNotFoundError,
|
||||
JobNotReadyError,
|
||||
VersionOutOfBoundsError,
|
||||
}
|
||||
|
||||
@@ -32,7 +32,15 @@ const { BlobStore } = require('../blob_store')
|
||||
const { historyStore } = require('../history_store')
|
||||
const mongoBackend = require('./mongo')
|
||||
const postgresBackend = require('./postgres')
|
||||
const { ChunkVersionConflictError } = require('./errors')
|
||||
const redisBackend = require('./redis')
|
||||
const {
|
||||
ChunkVersionConflictError,
|
||||
VersionOutOfBoundsError,
|
||||
} = require('./errors')
|
||||
|
||||
/**
|
||||
* @import { Change } from 'overleaf-editor-core'
|
||||
*/
|
||||
|
||||
const DEFAULT_DELETE_BATCH_SIZE = parseInt(config.get('maxDeleteKeys'), 10)
|
||||
const DEFAULT_DELETE_TIMEOUT_SECS = 3000 // 50 minutes
|
||||
@@ -103,12 +111,23 @@ async function loadLatestRaw(projectId, opts) {
|
||||
* Load the latest Chunk stored for a project, including blob metadata.
|
||||
*
|
||||
* @param {string} projectId
|
||||
* @return {Promise.<Chunk>}
|
||||
* @param {object} [opts]
|
||||
* @param {boolean} [opts.persistedOnly] - only include persisted changes
|
||||
* @return {Promise<Chunk>}
|
||||
*/
|
||||
async function loadLatest(projectId) {
|
||||
async function loadLatest(projectId, opts = {}) {
|
||||
const chunkRecord = await loadLatestRaw(projectId)
|
||||
const rawHistory = await historyStore.loadRaw(projectId, chunkRecord.id)
|
||||
const history = History.fromRaw(rawHistory)
|
||||
|
||||
if (!opts.persistedOnly) {
|
||||
const nonPersistedChanges = await getChunkExtension(
|
||||
projectId,
|
||||
chunkRecord.endVersion
|
||||
)
|
||||
history.pushChanges(nonPersistedChanges)
|
||||
}
|
||||
|
||||
const blobStore = new BlobStore(projectId)
|
||||
const batchBlobStore = new BatchBlobStore(blobStore)
|
||||
await lazyLoadHistoryFiles(history, batchBlobStore)
|
||||
@@ -117,8 +136,13 @@ async function loadLatest(projectId) {
|
||||
|
||||
/**
|
||||
* Load the the chunk that contains the given version, including blob metadata.
|
||||
*
|
||||
* @param {string} projectId
|
||||
* @param {number} version
|
||||
* @param {object} [opts]
|
||||
* @param {boolean} [opts.persistedOnly] - only include persisted changes
|
||||
*/
|
||||
async function loadAtVersion(projectId, version) {
|
||||
async function loadAtVersion(projectId, version, opts = {}) {
|
||||
assert.projectId(projectId, 'bad projectId')
|
||||
assert.integer(version, 'bad version')
|
||||
|
||||
@@ -129,6 +153,15 @@ async function loadAtVersion(projectId, version) {
|
||||
const chunkRecord = await backend.getChunkForVersion(projectId, version)
|
||||
const rawHistory = await historyStore.loadRaw(projectId, chunkRecord.id)
|
||||
const history = History.fromRaw(rawHistory)
|
||||
|
||||
if (!opts.persistedOnly) {
|
||||
const nonPersistedChanges = await getChunkExtension(
|
||||
projectId,
|
||||
chunkRecord.endVersion
|
||||
)
|
||||
history.pushChanges(nonPersistedChanges)
|
||||
}
|
||||
|
||||
await lazyLoadHistoryFiles(history, batchBlobStore)
|
||||
return new Chunk(history, chunkRecord.endVersion - history.countChanges())
|
||||
}
|
||||
@@ -136,8 +169,13 @@ async function loadAtVersion(projectId, version) {
|
||||
/**
|
||||
* Load the chunk that contains the version that was current at the given
|
||||
* timestamp, including blob metadata.
|
||||
*
|
||||
* @param {string} projectId
|
||||
* @param {Date} timestamp
|
||||
* @param {object} [opts]
|
||||
* @param {boolean} [opts.persistedOnly] - only include persisted changes
|
||||
*/
|
||||
async function loadAtTimestamp(projectId, timestamp) {
|
||||
async function loadAtTimestamp(projectId, timestamp, opts = {}) {
|
||||
assert.projectId(projectId, 'bad projectId')
|
||||
assert.date(timestamp, 'bad timestamp')
|
||||
|
||||
@@ -148,6 +186,15 @@ async function loadAtTimestamp(projectId, timestamp) {
|
||||
const chunkRecord = await backend.getChunkForTimestamp(projectId, timestamp)
|
||||
const rawHistory = await historyStore.loadRaw(projectId, chunkRecord.id)
|
||||
const history = History.fromRaw(rawHistory)
|
||||
|
||||
if (!opts.persistedOnly) {
|
||||
const nonPersistedChanges = await getChunkExtension(
|
||||
projectId,
|
||||
chunkRecord.endVersion
|
||||
)
|
||||
history.pushChanges(nonPersistedChanges)
|
||||
}
|
||||
|
||||
await lazyLoadHistoryFiles(history, batchBlobStore)
|
||||
return new Chunk(history, chunkRecord.endVersion - history.countChanges())
|
||||
}
|
||||
@@ -418,6 +465,31 @@ function getBackend(projectId) {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets non-persisted changes that could extend a chunk
|
||||
*
|
||||
* @param {string} projectId
|
||||
* @param {number} chunkEndVersion - end version of the chunk to extend
|
||||
*
|
||||
* @return {Promise<Change[]>}
|
||||
*/
|
||||
async function getChunkExtension(projectId, chunkEndVersion) {
|
||||
try {
|
||||
const changes = await redisBackend.getNonPersistedChanges(
|
||||
projectId,
|
||||
chunkEndVersion
|
||||
)
|
||||
return changes
|
||||
} catch (err) {
|
||||
if (err instanceof VersionOutOfBoundsError) {
|
||||
// If we can't extend the chunk, simply return an empty list
|
||||
return []
|
||||
} else {
|
||||
throw err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class AlreadyInitialized extends OError {
|
||||
constructor(projectId) {
|
||||
super('Project is already initialized', { projectId })
|
||||
|
||||
@@ -2,13 +2,14 @@
|
||||
|
||||
const metrics = require('@overleaf/metrics')
|
||||
const OError = require('@overleaf/o-error')
|
||||
const redis = require('../redis')
|
||||
const rclient = redis.rclientHistory //
|
||||
const { Change, Snapshot } = require('overleaf-editor-core')
|
||||
const redis = require('../redis')
|
||||
const rclient = redis.rclientHistory
|
||||
const {
|
||||
BaseVersionConflictError,
|
||||
JobNotFoundError,
|
||||
JobNotReadyError,
|
||||
VersionOutOfBoundsError,
|
||||
} = require('./errors')
|
||||
|
||||
const MAX_PERSISTED_CHANGES = 100 // Maximum number of persisted changes to keep in the buffer for clients that need to catch up.
|
||||
@@ -393,34 +394,36 @@ rclient.defineCommand('get_non_persisted_changes', {
|
||||
local headVersionKey = KEYS[1]
|
||||
local persistedVersionKey = KEYS[2]
|
||||
local changesKey = KEYS[3]
|
||||
local baseVersion = tonumber(ARGV[1])
|
||||
|
||||
-- Check if head version exists
|
||||
local headVersion = tonumber(redis.call('GET', headVersionKey))
|
||||
if not headVersion then
|
||||
return {}
|
||||
return {'not_found'}
|
||||
end
|
||||
|
||||
-- Check if persisted version exists
|
||||
local persistedVersion = tonumber(redis.call('GET', persistedVersionKey))
|
||||
|
||||
local startIndex
|
||||
if not persistedVersion then
|
||||
-- None of the changes in Redis have been persisted
|
||||
startIndex = 0
|
||||
elseif persistedVersion > headVersion then
|
||||
-- This should never happen
|
||||
return redis.error_reply('HEAD_VERSION_BEHIND_PERSISTED_VERSION')
|
||||
elseif persistedVersion == headVersion then
|
||||
return {}
|
||||
else
|
||||
-- startIndex is negative and counts from the end of the list of changes
|
||||
startIndex = persistedVersion - headVersion
|
||||
local changesCount = tonumber(redis.call('LLEN', changesKey))
|
||||
persistedVersion = headVersion - changesCount
|
||||
end
|
||||
|
||||
-- Get changes using LRANGE
|
||||
local changes = redis.call('LRANGE', changesKey, startIndex, -1)
|
||||
if baseVersion < persistedVersion or baseVersion > headVersion then
|
||||
return {'out_of_bounds'}
|
||||
elseif baseVersion == headVersion then
|
||||
return {'ok', {}}
|
||||
else
|
||||
local numChanges = headVersion - baseVersion
|
||||
local changes = redis.call('LRANGE', changesKey, -numChanges, -1)
|
||||
|
||||
return changes
|
||||
if #changes < numChanges then
|
||||
-- We didn't get as many changes as we expected
|
||||
return {'out_of_bounds'}
|
||||
end
|
||||
|
||||
return {'ok', changes}
|
||||
end
|
||||
`,
|
||||
})
|
||||
|
||||
@@ -428,32 +431,52 @@ rclient.defineCommand('get_non_persisted_changes', {
|
||||
* Retrieves non-persisted changes for a project from Redis.
|
||||
*
|
||||
* @param {string} projectId - The unique identifier of the project.
|
||||
* @returns {Promise<Change[]>} A Promise that resolves to an array of non-persisted Change objects.
|
||||
* @param {number} baseVersion - The version on top of which the changes should
|
||||
* be applied.
|
||||
* @returns {Promise<Change[]>} Changes that can be applied on top of
|
||||
* baseVersion. An empty array means that the project doesn't have
|
||||
* changes to persist. A null value means that the non-persisted
|
||||
* changes can't be applied to the given base version.
|
||||
*
|
||||
* @throws {Error} If Redis operations fail.
|
||||
*/
|
||||
async function getNonPersistedChanges(projectId) {
|
||||
async function getNonPersistedChanges(projectId, baseVersion) {
|
||||
let result
|
||||
try {
|
||||
const keys = [
|
||||
result = await rclient.get_non_persisted_changes(
|
||||
keySchema.headVersion({ projectId }),
|
||||
keySchema.persistedVersion({ projectId }),
|
||||
keySchema.changes({ projectId }),
|
||||
]
|
||||
|
||||
const result = await rclient.get_non_persisted_changes(keys)
|
||||
|
||||
// Parse the changes
|
||||
const changes = result?.map(json => Change.fromRaw(JSON.parse(json))) ?? []
|
||||
|
||||
metrics.inc('chunk_store.redis.get_non_persisted_changes', 1, {
|
||||
status: 'success',
|
||||
})
|
||||
return changes
|
||||
baseVersion.toString()
|
||||
)
|
||||
} catch (err) {
|
||||
metrics.inc('chunk_store.redis.get_non_persisted_changes', 1, {
|
||||
status: 'error',
|
||||
})
|
||||
throw err
|
||||
}
|
||||
|
||||
const status = result[0]
|
||||
metrics.inc('chunk_store.redis.get_non_persisted_changes', 1, {
|
||||
status,
|
||||
})
|
||||
|
||||
if (status === 'ok') {
|
||||
return result[1].map(json => Change.fromRaw(JSON.parse(json)))
|
||||
} else if (status === 'not_found') {
|
||||
return []
|
||||
} else if (status === 'out_of_bounds') {
|
||||
throw new VersionOutOfBoundsError(
|
||||
"Non-persisted changes can't be applied to base version",
|
||||
{ projectId, baseVersion }
|
||||
)
|
||||
} else {
|
||||
throw new OError('unknown status for get_non_persisted_changes', {
|
||||
projectId,
|
||||
baseVersion,
|
||||
status,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
rclient.defineCommand('set_persisted_version', {
|
||||
|
||||
@@ -185,7 +185,9 @@ async function persistChanges(projectId, allChanges, limits, clientEndVersion) {
|
||||
}
|
||||
|
||||
async function loadLatestChunk() {
|
||||
const latestChunk = await chunkStore.loadLatest(projectId)
|
||||
const latestChunk = await chunkStore.loadLatest(projectId, {
|
||||
persistedOnly: true,
|
||||
})
|
||||
|
||||
currentChunk = latestChunk
|
||||
originalEndVersion = latestChunk.getEndVersion()
|
||||
@@ -217,8 +219,11 @@ async function persistChanges(projectId, allChanges, limits, clientEndVersion) {
|
||||
}
|
||||
|
||||
async function fakePersistRedisChanges() {
|
||||
const nonPersistedChanges =
|
||||
await redisBackend.getNonPersistedChanges(projectId)
|
||||
const baseVersion = currentChunk.getEndVersion()
|
||||
const nonPersistedChanges = await redisBackend.getNonPersistedChanges(
|
||||
projectId,
|
||||
baseVersion
|
||||
)
|
||||
|
||||
if (
|
||||
serializeChanges(nonPersistedChanges) ===
|
||||
@@ -232,7 +237,6 @@ async function persistChanges(projectId, allChanges, limits, clientEndVersion) {
|
||||
})
|
||||
}
|
||||
|
||||
const baseVersion = currentChunk.getEndVersion()
|
||||
const persistedVersion = baseVersion + nonPersistedChanges.length
|
||||
await redisBackend.setPersistedVersion(projectId, persistedVersion)
|
||||
}
|
||||
|
||||
@@ -279,7 +279,7 @@ async function processProject(project, summary) {
|
||||
|
||||
async function getHistoryDocVersions(project) {
|
||||
const historyId = project.overleaf.history.id
|
||||
const chunk = await chunkStore.loadLatest(historyId)
|
||||
const chunk = await chunkStore.loadLatest(historyId, { persistedOnly: true })
|
||||
if (chunk == null) {
|
||||
return []
|
||||
}
|
||||
|
||||
@@ -48,7 +48,16 @@ async function listChunks(historyId) {
|
||||
async function fetchChunkLocal(historyId, version) {
|
||||
const chunkRecord = await getChunkMetadataForVersion(historyId, version)
|
||||
const chunk = await loadAtVersion(historyId, version)
|
||||
return { key: version, chunk, metadata: chunkRecord, source: 'local storage' }
|
||||
const persistedChunk = await loadAtVersion(historyId, version, {
|
||||
persistedOnly: true,
|
||||
})
|
||||
return {
|
||||
key: version,
|
||||
chunk,
|
||||
persistedChunk,
|
||||
metadata: chunkRecord,
|
||||
source: 'local storage',
|
||||
}
|
||||
}
|
||||
|
||||
async function fetchChunkRemote(historyId, version) {
|
||||
@@ -73,7 +82,7 @@ async function fetchChunkRemote(historyId, version) {
|
||||
}
|
||||
|
||||
async function displayChunk(historyId, version, options) {
|
||||
const { key, chunk, metadata, source } = await (options.remote
|
||||
const { key, chunk, persistedChunk, metadata, source } = await (options.remote
|
||||
? fetchChunkRemote(historyId, version)
|
||||
: fetchChunkLocal(historyId, version))
|
||||
console.log('Source:', source)
|
||||
@@ -81,6 +90,18 @@ async function displayChunk(historyId, version, options) {
|
||||
console.log('Key', key)
|
||||
// console.log('Number of changes', chunk.getChanges().length)
|
||||
console.log(JSON.stringify(chunk))
|
||||
if (
|
||||
persistedChunk &&
|
||||
persistedChunk.getChanges().length !== chunk.getChanges().length
|
||||
) {
|
||||
console.warn(
|
||||
'Warning: Local chunk and persisted chunk have different number of changes:',
|
||||
chunk.getChanges().length,
|
||||
'local (including buffer) vs',
|
||||
persistedChunk.getChanges().length,
|
||||
'persisted'
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
async function fetchBlobRemote(historyId, blobHash) {
|
||||
|
||||
@@ -34,7 +34,7 @@ async function main() {
|
||||
|
||||
async function processProject(projectId, save) {
|
||||
console.log(`Project ${projectId}:`)
|
||||
const chunk = await chunkStore.loadLatest(projectId)
|
||||
const chunk = await chunkStore.loadLatest(projectId, { persistedOnly: true })
|
||||
let numChanges = 0
|
||||
numChanges += removeDuplicateProjectVersions(chunk)
|
||||
numChanges += removeDuplicateDocVersions(chunk)
|
||||
|
||||
@@ -149,7 +149,7 @@ async function addFileInNewChunk(
|
||||
historyId,
|
||||
{ creationDate = new Date() }
|
||||
) {
|
||||
const chunk = await chunkStore.loadLatest(historyId)
|
||||
const chunk = await chunkStore.loadLatest(historyId, { persistedOnly: true })
|
||||
const operation = Operation.addFile(
|
||||
`${historyId}.txt`,
|
||||
File.fromString(fileContents)
|
||||
|
||||
@@ -18,7 +18,8 @@ const {
|
||||
EditFileOperation,
|
||||
TextOperation,
|
||||
} = require('overleaf-editor-core')
|
||||
const { chunkStore, historyStore } = require('../../../../storage')
|
||||
const { chunkStore, historyStore, BlobStore } = require('../../../../storage')
|
||||
const redisBackend = require('../../../../storage/lib/chunk_store/redis')
|
||||
|
||||
describe('chunkStore', function () {
|
||||
beforeEach(cleanup.everything)
|
||||
@@ -42,6 +43,7 @@ describe('chunkStore', function () {
|
||||
describe(scenario.description, function () {
|
||||
let projectId
|
||||
let projectRecord
|
||||
let blobStore
|
||||
|
||||
beforeEach(async function () {
|
||||
projectId = await scenario.createProject()
|
||||
@@ -49,6 +51,7 @@ describe('chunkStore', function () {
|
||||
projectRecord = await projects.insertOne({
|
||||
overleaf: { history: { id: scenario.idMapping(projectId) } },
|
||||
})
|
||||
blobStore = new BlobStore(projectId)
|
||||
})
|
||||
|
||||
it('loads empty latest chunk for a new project', async function () {
|
||||
@@ -62,10 +65,11 @@ describe('chunkStore', function () {
|
||||
const pendingChangeTimestamp = new Date('2014-01-01T00:00:00')
|
||||
const lastChangeTimestamp = new Date('2015-01-01T00:00:00')
|
||||
beforeEach(async function () {
|
||||
const blob = await blobStore.putString('abc')
|
||||
const chunk = makeChunk(
|
||||
[
|
||||
makeChange(
|
||||
Operation.addFile('main.tex', File.fromString('abc')),
|
||||
Operation.addFile('main.tex', File.createLazyFromBlobs(blob)),
|
||||
lastChangeTimestamp
|
||||
),
|
||||
],
|
||||
@@ -98,8 +102,11 @@ describe('chunkStore', function () {
|
||||
beforeEach(async function () {
|
||||
const chunk = await chunkStore.loadLatest(projectId)
|
||||
const oldEndVersion = chunk.getEndVersion()
|
||||
const blob = await blobStore.putString('')
|
||||
const changes = [
|
||||
makeChange(Operation.addFile(testPathname, File.fromString(''))),
|
||||
makeChange(
|
||||
Operation.addFile(testPathname, File.createLazyFromBlobs(blob))
|
||||
),
|
||||
makeChange(Operation.editFile(testPathname, testTextOperation)),
|
||||
]
|
||||
lastChangeTimestamp = changes[1].getTimestamp()
|
||||
@@ -181,14 +188,15 @@ describe('chunkStore', function () {
|
||||
let firstChunk, secondChunk, thirdChunk
|
||||
|
||||
beforeEach(async function () {
|
||||
const blob = await blobStore.putString('')
|
||||
firstChunk = makeChunk(
|
||||
[
|
||||
makeChange(
|
||||
Operation.addFile('foo.tex', File.fromString('')),
|
||||
Operation.addFile('foo.tex', File.createLazyFromBlobs(blob)),
|
||||
new Date(firstChunkTimestamp - 5000)
|
||||
),
|
||||
makeChange(
|
||||
Operation.addFile('bar.tex', File.fromString('')),
|
||||
Operation.addFile('bar.tex', File.createLazyFromBlobs(blob)),
|
||||
firstChunkTimestamp
|
||||
),
|
||||
],
|
||||
@@ -205,11 +213,11 @@ describe('chunkStore', function () {
|
||||
secondChunk = makeChunk(
|
||||
[
|
||||
makeChange(
|
||||
Operation.addFile('baz.tex', File.fromString('')),
|
||||
Operation.addFile('baz.tex', File.createLazyFromBlobs(blob)),
|
||||
new Date(secondChunkTimestamp - 5000)
|
||||
),
|
||||
makeChange(
|
||||
Operation.addFile('qux.tex', File.fromString('')),
|
||||
Operation.addFile('qux.tex', File.createLazyFromBlobs(blob)),
|
||||
secondChunkTimestamp
|
||||
),
|
||||
],
|
||||
@@ -221,7 +229,7 @@ describe('chunkStore', function () {
|
||||
thirdChunk = makeChunk(
|
||||
[
|
||||
makeChange(
|
||||
Operation.addFile('quux.tex', File.fromString('')),
|
||||
Operation.addFile('quux.tex', File.createLazyFromBlobs(blob)),
|
||||
thirdChunkTimestamp
|
||||
),
|
||||
],
|
||||
@@ -317,11 +325,15 @@ describe('chunkStore', function () {
|
||||
let newChunk
|
||||
|
||||
beforeEach(async function () {
|
||||
const blob = await blobStore.putString('')
|
||||
newChunk = makeChunk(
|
||||
[
|
||||
...thirdChunk.getChanges(),
|
||||
makeChange(
|
||||
Operation.addFile('onemore.tex', File.fromString('')),
|
||||
Operation.addFile(
|
||||
'onemore.tex',
|
||||
File.createLazyFromBlobs(blob)
|
||||
),
|
||||
thirdChunkTimestamp
|
||||
),
|
||||
],
|
||||
@@ -368,6 +380,79 @@ describe('chunkStore', function () {
|
||||
})
|
||||
})
|
||||
|
||||
describe('with changes queued in the Redis buffer', function () {
|
||||
let queuedChanges
|
||||
|
||||
beforeEach(async function () {
|
||||
const snapshot = thirdChunk.getSnapshot()
|
||||
snapshot.applyAll(thirdChunk.getChanges())
|
||||
const blob = await blobStore.putString('zzz')
|
||||
queuedChanges = [
|
||||
makeChange(
|
||||
Operation.addFile(
|
||||
'in-redis.tex',
|
||||
File.createLazyFromBlobs(blob)
|
||||
),
|
||||
new Date()
|
||||
),
|
||||
]
|
||||
await redisBackend.queueChanges(
|
||||
projectId,
|
||||
snapshot,
|
||||
thirdChunk.getEndVersion(),
|
||||
queuedChanges
|
||||
)
|
||||
})
|
||||
|
||||
it('includes the queued changes when getting the latest chunk', async function () {
|
||||
const chunk = await chunkStore.loadLatest(projectId)
|
||||
const expectedChanges = thirdChunk
|
||||
.getChanges()
|
||||
.concat(queuedChanges)
|
||||
expect(chunk.getChanges()).to.deep.equal(expectedChanges)
|
||||
})
|
||||
|
||||
it('includes the queued changes when getting the latest chunk by timestamp', async function () {
|
||||
const chunk = await chunkStore.loadAtTimestamp(
|
||||
projectId,
|
||||
thirdChunkTimestamp
|
||||
)
|
||||
const expectedChanges = thirdChunk
|
||||
.getChanges()
|
||||
.concat(queuedChanges)
|
||||
expect(chunk.getChanges()).to.deep.equal(expectedChanges)
|
||||
})
|
||||
|
||||
it("doesn't include the queued changes when getting another chunk by timestamp", async function () {
|
||||
const chunk = await chunkStore.loadAtTimestamp(
|
||||
projectId,
|
||||
secondChunkTimestamp
|
||||
)
|
||||
const expectedChanges = secondChunk.getChanges()
|
||||
expect(chunk.getChanges()).to.deep.equal(expectedChanges)
|
||||
})
|
||||
|
||||
it('includes the queued changes when getting the latest chunk by version', async function () {
|
||||
const chunk = await chunkStore.loadAtVersion(
|
||||
projectId,
|
||||
thirdChunk.getEndVersion()
|
||||
)
|
||||
const expectedChanges = thirdChunk
|
||||
.getChanges()
|
||||
.concat(queuedChanges)
|
||||
expect(chunk.getChanges()).to.deep.equal(expectedChanges)
|
||||
})
|
||||
|
||||
it("doesn't include the queued changes when getting another chunk by version", async function () {
|
||||
const chunk = await chunkStore.loadAtVersion(
|
||||
projectId,
|
||||
secondChunk.getEndVersion()
|
||||
)
|
||||
const expectedChanges = secondChunk.getChanges()
|
||||
expect(chunk.getChanges()).to.deep.equal(expectedChanges)
|
||||
})
|
||||
})
|
||||
|
||||
describe('when iterating the chunks with getProjectChunksFromVersion', function () {
|
||||
// The first chunk has startVersion:0 and endVersion:2
|
||||
for (let startVersion = 0; startVersion <= 2; startVersion++) {
|
||||
@@ -470,8 +555,11 @@ describe('chunkStore', function () {
|
||||
let chunk = await chunkStore.loadLatest(projectId)
|
||||
expect(chunk.getEndVersion()).to.equal(oldEndVersion)
|
||||
|
||||
const blob = await blobStore.putString('')
|
||||
const changes = [
|
||||
makeChange(Operation.addFile(testPathname, File.fromString(''))),
|
||||
makeChange(
|
||||
Operation.addFile(testPathname, File.createLazyFromBlobs(blob))
|
||||
),
|
||||
makeChange(Operation.editFile(testPathname, testTextOperation)),
|
||||
]
|
||||
chunk.pushChanges(changes)
|
||||
@@ -487,9 +575,12 @@ describe('chunkStore', function () {
|
||||
describe('version checks', function () {
|
||||
beforeEach(async function () {
|
||||
// Create a chunk with start version 0, end version 3
|
||||
const blob = await blobStore.putString('abc')
|
||||
const chunk = makeChunk(
|
||||
[
|
||||
makeChange(Operation.addFile('main.tex', File.fromString('abc'))),
|
||||
makeChange(
|
||||
Operation.addFile('main.tex', File.createLazyFromBlobs(blob))
|
||||
),
|
||||
makeChange(
|
||||
Operation.editFile(
|
||||
'main.tex',
|
||||
@@ -509,8 +600,13 @@ describe('chunkStore', function () {
|
||||
})
|
||||
|
||||
it('refuses to create a chunk with the same start version', async function () {
|
||||
const blob = await blobStore.putString('abc')
|
||||
const chunk = makeChunk(
|
||||
[makeChange(Operation.addFile('main.tex', File.fromString('abc')))],
|
||||
[
|
||||
makeChange(
|
||||
Operation.addFile('main.tex', File.createLazyFromBlobs(blob))
|
||||
),
|
||||
],
|
||||
0
|
||||
)
|
||||
await expect(chunkStore.create(projectId, chunk)).to.be.rejectedWith(
|
||||
@@ -519,8 +615,13 @@ describe('chunkStore', function () {
|
||||
})
|
||||
|
||||
it("allows creating chunks that don't have version conflicts", async function () {
|
||||
const blob = await blobStore.putString('abc')
|
||||
const chunk = makeChunk(
|
||||
[makeChange(Operation.addFile('main.tex', File.fromString('abc')))],
|
||||
[
|
||||
makeChange(
|
||||
Operation.addFile('main.tex', File.createLazyFromBlobs(blob))
|
||||
),
|
||||
],
|
||||
3
|
||||
)
|
||||
await chunkStore.create(projectId, chunk)
|
||||
|
||||
@@ -12,6 +12,7 @@ const redisBackend = require('../../../../storage/lib/chunk_store/redis')
|
||||
const {
|
||||
JobNotReadyError,
|
||||
JobNotFoundError,
|
||||
VersionOutOfBoundsError,
|
||||
} = require('../../../../storage/lib/chunk_store/errors')
|
||||
const redis = require('../../../../storage/lib/redis')
|
||||
const rclient = redis.rclientHistory
|
||||
@@ -509,189 +510,191 @@ describe('chunk buffer Redis backend', function () {
|
||||
})
|
||||
|
||||
describe('getNonPersistedChanges', function () {
|
||||
it('should return empty array when project does not exist', async function () {
|
||||
const changes = await redisBackend.getNonPersistedChanges(projectId)
|
||||
expect(changes).to.be.an('array').that.is.empty
|
||||
describe('project not loaded', function () {
|
||||
it('should return empty array', async function () {
|
||||
const changes = await redisBackend.getNonPersistedChanges(projectId, 0)
|
||||
expect(changes).to.be.an('array').that.is.empty
|
||||
})
|
||||
|
||||
it('should handle any base version', async function () {
|
||||
const changes = await redisBackend.getNonPersistedChanges(projectId, 2)
|
||||
expect(changes).to.be.an('array').that.is.empty
|
||||
})
|
||||
})
|
||||
|
||||
it('should return all changes when persisted version is not set', async function () {
|
||||
const changes = [makeChange(), makeChange(), makeChange()]
|
||||
queueChanges(projectId, changes)
|
||||
describe('project never persisted', function () {
|
||||
let changes
|
||||
|
||||
const nonPersistedChanges =
|
||||
await redisBackend.getNonPersistedChanges(projectId)
|
||||
expect(nonPersistedChanges.map(change => change.toRaw())).to.deep.equal(
|
||||
changes.map(change => change.toRaw())
|
||||
)
|
||||
beforeEach(async function () {
|
||||
changes = await setupState(projectId, {
|
||||
headVersion: 5,
|
||||
persistedVersion: null,
|
||||
changes: 3,
|
||||
})
|
||||
})
|
||||
|
||||
it('should return all changes if requested', async function () {
|
||||
const nonPersistedChanges = await redisBackend.getNonPersistedChanges(
|
||||
projectId,
|
||||
2
|
||||
)
|
||||
expect(nonPersistedChanges).to.deep.equal(changes)
|
||||
})
|
||||
|
||||
it('should return part of the changes if requested', async function () {
|
||||
const nonPersistedChanges = await redisBackend.getNonPersistedChanges(
|
||||
projectId,
|
||||
3
|
||||
)
|
||||
expect(nonPersistedChanges).to.deep.equal(changes.slice(1))
|
||||
})
|
||||
|
||||
it('should error if the base version requested is too low', async function () {
|
||||
await expect(
|
||||
redisBackend.getNonPersistedChanges(projectId, 0)
|
||||
).to.be.rejectedWith(VersionOutOfBoundsError)
|
||||
})
|
||||
|
||||
it('should return an empty array if the base version is the head version', async function () {
|
||||
const nonPersistedChanges = await redisBackend.getNonPersistedChanges(
|
||||
projectId,
|
||||
5
|
||||
)
|
||||
expect(nonPersistedChanges).to.deep.equal([])
|
||||
})
|
||||
|
||||
it('should error if the base version requested is too high', async function () {
|
||||
await expect(
|
||||
redisBackend.getNonPersistedChanges(projectId, 6)
|
||||
).to.be.rejectedWith(VersionOutOfBoundsError)
|
||||
})
|
||||
})
|
||||
|
||||
it('should return empty array when persisted version equals head version', async function () {
|
||||
// Set both head and persisted versions to be equal
|
||||
const version = 5
|
||||
await rclient.set(
|
||||
keySchema.headVersion({ projectId }),
|
||||
version.toString()
|
||||
)
|
||||
await rclient.set(
|
||||
keySchema.persistedVersion({ projectId }),
|
||||
version.toString()
|
||||
)
|
||||
describe('fully persisted changes', function () {
|
||||
beforeEach(async function () {
|
||||
await setupState(projectId, {
|
||||
headVersion: 5,
|
||||
persistedVersion: 5,
|
||||
changes: 3,
|
||||
})
|
||||
})
|
||||
|
||||
const changes = await redisBackend.getNonPersistedChanges(projectId)
|
||||
expect(changes).to.be.an('array').that.is.empty
|
||||
it('should return an empty array when asked for the head version', async function () {
|
||||
const nonPersistedChanges = await redisBackend.getNonPersistedChanges(
|
||||
projectId,
|
||||
5
|
||||
)
|
||||
expect(nonPersistedChanges).to.deep.equal([])
|
||||
})
|
||||
|
||||
it('should throw an error when asked for an older version', async function () {
|
||||
await expect(
|
||||
redisBackend.getNonPersistedChanges(projectId, 4)
|
||||
).to.be.rejectedWith(VersionOutOfBoundsError)
|
||||
})
|
||||
|
||||
it('should throw an error when asked for a newer version', async function () {
|
||||
await expect(
|
||||
redisBackend.getNonPersistedChanges(projectId, 6)
|
||||
).to.be.rejectedWith(VersionOutOfBoundsError)
|
||||
})
|
||||
})
|
||||
|
||||
it('should return all non-persisted changes', async function () {
|
||||
// Set head version to 5 and persisted version to 2
|
||||
const headVersion = 5
|
||||
const persistedVersion = 2
|
||||
await rclient.set(
|
||||
keySchema.headVersion({ projectId }),
|
||||
headVersion.toString()
|
||||
)
|
||||
await rclient.set(
|
||||
keySchema.persistedVersion({ projectId }),
|
||||
persistedVersion.toString()
|
||||
)
|
||||
describe('partially persisted project', async function () {
|
||||
let changes
|
||||
|
||||
// Create changes for versions 3, 4, 5
|
||||
const timestamp = new Date()
|
||||
const change1 = new Change([], timestamp) // Version 3
|
||||
const change2 = new Change([], timestamp) // Version 4
|
||||
const change3 = new Change([], timestamp) // Version 5
|
||||
beforeEach(async function () {
|
||||
changes = await setupState(projectId, {
|
||||
headVersion: 10,
|
||||
persistedVersion: 7,
|
||||
changes: 6,
|
||||
})
|
||||
})
|
||||
|
||||
// Push changes to Redis
|
||||
await rclient.rpush(
|
||||
keySchema.changes({ projectId }),
|
||||
JSON.stringify(change1.toRaw()),
|
||||
JSON.stringify(change2.toRaw()),
|
||||
JSON.stringify(change3.toRaw())
|
||||
)
|
||||
it('should return all non-persisted changes if requested', async function () {
|
||||
const nonPersistedChanges = await redisBackend.getNonPersistedChanges(
|
||||
projectId,
|
||||
7
|
||||
)
|
||||
expect(nonPersistedChanges).to.deep.equal(changes.slice(3))
|
||||
})
|
||||
|
||||
// Get non-persisted changes
|
||||
const nonPersistedChanges =
|
||||
await redisBackend.getNonPersistedChanges(projectId)
|
||||
it('should return part of the changes if requested', async function () {
|
||||
const nonPersistedChanges = await redisBackend.getNonPersistedChanges(
|
||||
projectId,
|
||||
8
|
||||
)
|
||||
expect(nonPersistedChanges).to.deep.equal(changes.slice(4))
|
||||
})
|
||||
|
||||
// Should return changes for versions 3, 4, 5
|
||||
expect(nonPersistedChanges).to.be.an('array').with.lengthOf(3)
|
||||
expect(nonPersistedChanges[0].toRaw()).to.deep.equal(change1.toRaw())
|
||||
expect(nonPersistedChanges[1].toRaw()).to.deep.equal(change2.toRaw())
|
||||
expect(nonPersistedChanges[2].toRaw()).to.deep.equal(change3.toRaw())
|
||||
it('should error if the base version requested is too low', async function () {
|
||||
await expect(
|
||||
redisBackend.getNonPersistedChanges(projectId, 5)
|
||||
).to.be.rejectedWith(VersionOutOfBoundsError)
|
||||
})
|
||||
|
||||
it('should return an empty array if the base version is the head version', async function () {
|
||||
const nonPersistedChanges = await redisBackend.getNonPersistedChanges(
|
||||
projectId,
|
||||
10
|
||||
)
|
||||
expect(nonPersistedChanges).to.deep.equal([])
|
||||
})
|
||||
|
||||
it('should error if the base version requested is too high', async function () {
|
||||
await expect(
|
||||
redisBackend.getNonPersistedChanges(projectId, 12)
|
||||
).to.be.rejectedWith(VersionOutOfBoundsError)
|
||||
})
|
||||
})
|
||||
|
||||
it('should return a subset of changes when some are persisted', async function () {
|
||||
// Set head version to 5 and persisted version to 3
|
||||
// This means versions 4 and 5 are not persisted
|
||||
const headVersion = 5
|
||||
const persistedVersion = 3
|
||||
await rclient.set(
|
||||
keySchema.headVersion({ projectId }),
|
||||
headVersion.toString()
|
||||
)
|
||||
await rclient.set(
|
||||
keySchema.persistedVersion({ projectId }),
|
||||
persistedVersion.toString()
|
||||
)
|
||||
// This case should never happen, but we'll handle it anyway
|
||||
describe('persisted version before start of changes list', async function () {
|
||||
let changes
|
||||
|
||||
// Create changes for versions 1, 2, 3, 4, 5
|
||||
const timestamp = new Date()
|
||||
const change1 = new Change([], timestamp) // Version 1
|
||||
const change2 = new Change([], timestamp) // Version 2
|
||||
const change3 = new Change([], timestamp) // Version 3
|
||||
const change4 = new Change([], timestamp) // Version 4
|
||||
const change5 = new Change([], timestamp) // Version 5
|
||||
beforeEach(async function () {
|
||||
changes = await setupState(projectId, {
|
||||
headVersion: 5,
|
||||
persistedVersion: 1,
|
||||
changes: 3,
|
||||
})
|
||||
})
|
||||
|
||||
// Push changes to Redis
|
||||
await rclient.rpush(
|
||||
keySchema.changes({ projectId }),
|
||||
JSON.stringify(change1.toRaw()),
|
||||
JSON.stringify(change2.toRaw()),
|
||||
JSON.stringify(change3.toRaw()),
|
||||
JSON.stringify(change4.toRaw()),
|
||||
JSON.stringify(change5.toRaw())
|
||||
)
|
||||
it('should return all non-persisted changes if requested', async function () {
|
||||
const nonPersistedChanges = await redisBackend.getNonPersistedChanges(
|
||||
projectId,
|
||||
2
|
||||
)
|
||||
expect(nonPersistedChanges).to.deep.equal(changes)
|
||||
})
|
||||
|
||||
// Get non-persisted changes
|
||||
const nonPersistedChanges =
|
||||
await redisBackend.getNonPersistedChanges(projectId)
|
||||
it('should return part of the changes if requested', async function () {
|
||||
const nonPersistedChanges = await redisBackend.getNonPersistedChanges(
|
||||
projectId,
|
||||
3
|
||||
)
|
||||
expect(nonPersistedChanges).to.deep.equal(changes.slice(1))
|
||||
})
|
||||
|
||||
// Should return only changes for versions 4 and 5
|
||||
expect(nonPersistedChanges).to.be.an('array').with.lengthOf(2)
|
||||
expect(nonPersistedChanges[0].toRaw()).to.deep.equal(change4.toRaw())
|
||||
expect(nonPersistedChanges[1].toRaw()).to.deep.equal(change5.toRaw())
|
||||
})
|
||||
it('should error if the base version requested is too low', async function () {
|
||||
await expect(
|
||||
redisBackend.getNonPersistedChanges(projectId, 1)
|
||||
).to.be.rejectedWith(VersionOutOfBoundsError)
|
||||
})
|
||||
|
||||
it('should throw an error when persisted version is higher than head version', async function () {
|
||||
// This is an unusual case that should not happen in practice
|
||||
// The system should throw an error to indicate this abnormal state
|
||||
const headVersion = 3
|
||||
const persistedVersion = 5
|
||||
await rclient.set(
|
||||
keySchema.headVersion({ projectId }),
|
||||
headVersion.toString()
|
||||
)
|
||||
await rclient.set(
|
||||
keySchema.persistedVersion({ projectId }),
|
||||
persistedVersion.toString()
|
||||
)
|
||||
it('should return an empty array if the base version is the head version', async function () {
|
||||
const nonPersistedChanges = await redisBackend.getNonPersistedChanges(
|
||||
projectId,
|
||||
5
|
||||
)
|
||||
expect(nonPersistedChanges).to.deep.equal([])
|
||||
})
|
||||
|
||||
// Create changes
|
||||
const timestamp = new Date()
|
||||
const change1 = new Change([], timestamp)
|
||||
const change2 = new Change([], timestamp)
|
||||
const change3 = new Change([], timestamp)
|
||||
|
||||
// Push changes to Redis
|
||||
await rclient.rpush(
|
||||
keySchema.changes({ projectId }),
|
||||
JSON.stringify(change1.toRaw()),
|
||||
JSON.stringify(change2.toRaw()),
|
||||
JSON.stringify(change3.toRaw())
|
||||
)
|
||||
|
||||
// Use chai-as-promised for cleaner async error assertion
|
||||
await expect(
|
||||
redisBackend.getNonPersistedChanges(projectId)
|
||||
).to.be.rejectedWith(/HEAD_VERSION_BEHIND_PERSISTED_VERSION/)
|
||||
})
|
||||
|
||||
it('should handle case where persisted version is before start of changes list', async function () {
|
||||
// Setup: head version is 5, persisted version is 1
|
||||
// But changes list only starts from version 3
|
||||
const headVersion = 5
|
||||
const persistedVersion = 1
|
||||
await rclient.set(
|
||||
keySchema.headVersion({ projectId }),
|
||||
headVersion.toString()
|
||||
)
|
||||
await rclient.set(
|
||||
keySchema.persistedVersion({ projectId }),
|
||||
persistedVersion.toString()
|
||||
)
|
||||
|
||||
// Create changes for versions 3, 4, 5 only
|
||||
const timestamp = new Date()
|
||||
const change3 = new Change([], timestamp) // Version 3
|
||||
const change4 = new Change([], timestamp) // Version 4
|
||||
const change5 = new Change([], timestamp) // Version 5
|
||||
|
||||
// Push changes to Redis
|
||||
await rclient.rpush(
|
||||
keySchema.changes({ projectId }),
|
||||
JSON.stringify(change3.toRaw()),
|
||||
JSON.stringify(change4.toRaw()),
|
||||
JSON.stringify(change5.toRaw())
|
||||
)
|
||||
|
||||
// Get non-persisted changes
|
||||
const nonPersistedChanges =
|
||||
await redisBackend.getNonPersistedChanges(projectId)
|
||||
|
||||
// Should return all changes since the persisted version is before the start of the list
|
||||
expect(nonPersistedChanges).to.be.an('array').with.lengthOf(3)
|
||||
expect(nonPersistedChanges[0].toRaw()).to.deep.equal(change3.toRaw())
|
||||
expect(nonPersistedChanges[1].toRaw()).to.deep.equal(change4.toRaw())
|
||||
expect(nonPersistedChanges[2].toRaw()).to.deep.equal(change5.toRaw())
|
||||
it('should error if the base version requested is too high', async function () {
|
||||
await expect(
|
||||
redisBackend.getNonPersistedChanges(projectId, 6)
|
||||
).to.be.rejectedWith(VersionOutOfBoundsError)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
@@ -1137,3 +1140,37 @@ function makeChange() {
|
||||
const timestamp = new Date()
|
||||
return new Change([], timestamp)
|
||||
}
|
||||
|
||||
/**
|
||||
* Setup Redis buffer state for tests
|
||||
*
|
||||
* @param {string} projectId
|
||||
* @param {object} params
|
||||
* @param {number} params.headVersion
|
||||
* @param {number | null} params.persistedVersion
|
||||
* @param {number} params.changes - number of changes to create
|
||||
* @return {Promise<Change[]>} dummy changes that have been created
|
||||
*/
|
||||
async function setupState(projectId, params) {
|
||||
await rclient.set(keySchema.headVersion({ projectId }), params.headVersion)
|
||||
if (params.persistedVersion) {
|
||||
await rclient.set(
|
||||
keySchema.persistedVersion({ projectId }),
|
||||
params.persistedVersion
|
||||
)
|
||||
}
|
||||
|
||||
const changes = []
|
||||
for (let i = 1; i <= params.changes; i++) {
|
||||
const change = new Change(
|
||||
[new AddFileOperation(`file${i}.tex`, File.createHollow(i, i))],
|
||||
new Date()
|
||||
)
|
||||
changes.push(change)
|
||||
}
|
||||
await rclient.rpush(
|
||||
keySchema.changes({ projectId }),
|
||||
changes.map(change => JSON.stringify(change.toRaw()))
|
||||
)
|
||||
return changes
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user