Merge pull request #22973 from overleaf/em-fix-project-snapshot-concurrency

Fix concurrency in project snapshot

GitOrigin-RevId: 83710b84e5ff5c10d55b1a915a310db1ca431973
This commit is contained in:
Eric Mc Sween
2025-01-20 09:02:18 -05:00
committed by Copybot
parent 6fba73c66a
commit 3d0a9017a4
2 changed files with 288 additions and 181 deletions
@@ -1,5 +1,4 @@
import pLimit from 'p-limit'
import OError from '@overleaf/o-error'
import { Change, Chunk, Snapshot } from 'overleaf-editor-core'
import { RawChange, RawChunk } from 'overleaf-editor-core/lib/types'
import { FetchError, getJSON, postJSON } from '@/infrastructure/fetch-json'
@@ -15,16 +14,18 @@ export class ProjectSnapshot {
private version: number
private blobStore: SimpleBlobStore
private refreshPromise: Promise<void>
private queuedRefreshPromise: Promise<void>
private state: ProjectSnapshotState
private initialized: boolean
private refreshing: boolean
private queued: boolean
constructor(projectId: string) {
this.projectId = projectId
this.snapshot = new Snapshot()
this.version = 0
this.refreshPromise = Promise.resolve()
this.queuedRefreshPromise = Promise.resolve()
this.state = new ProjectSnapshotState()
this.initialized = false
this.refreshing = false
this.queued = false
this.blobStore = new SimpleBlobStore(this.projectId)
}
@@ -36,31 +37,19 @@ export class ProjectSnapshot {
* function was called.
*/
async refresh() {
switch (this.state.getState()) {
case 'init':
this.refreshPromise = this.initialize()
await this.refreshPromise
break
case 'ready':
this.refreshPromise = this.loadChanges()
await this.refreshPromise
break
case 'refreshing':
this.queuedRefreshPromise = this.queueRefresh()
await this.queuedRefreshPromise
break
case 'queued-ready':
case 'queued-waiting':
await this.queuedRefreshPromise
break
default:
throw new OError('Unknown state for project snapshot', {
state: this.state.getState(),
})
if (this.queued) {
// There already is a queued refresh that will run after this call.
// Just wait for it to complete.
await this.refreshPromise
} else if (this.refreshing) {
// There is a refresh running, but no queued refresh. Queue a refresh
// after this one and make it the new promise to wait for.
this.refreshPromise = this.queueRefresh()
await this.refreshPromise
} else {
// There is no refresh running. Start one.
this.refreshPromise = this.startRefresh()
await this.refreshPromise
}
}
@@ -83,20 +72,49 @@ export class ProjectSnapshot {
return file.getContent({ filterTrackedDeletes: true }) ?? null
}
/**
* Immediately start a refresh
*/
private async startRefresh() {
this.refreshing = true
try {
if (!this.initialized) {
await this.initialize()
} else {
await this.loadChanges()
}
} finally {
this.refreshing = false
}
}
/**
* Queue a refresh after the currently running refresh
*/
private async queueRefresh() {
this.queued = true
try {
await this.refreshPromise
} catch {
// Ignore errors
}
this.queued = false
await this.startRefresh()
}
/**
* Initialize the snapshot using the project's latest chunk.
*
* This is run on the first refresh.
*/
private async initialize() {
this.state.startRefresh()
await flushHistory(this.projectId)
const chunk = await fetchLatestChunk(this.projectId)
this.snapshot = chunk.getSnapshot()
this.snapshot.applyAll(chunk.getChanges())
this.version = chunk.getEndVersion()
await this.loadDocs()
this.state.endRefresh()
this.initialized = true
}
/**
@@ -105,22 +123,11 @@ export class ProjectSnapshot {
* This is run on the second and subsequent refreshes
*/
private async loadChanges() {
this.state.startRefresh()
await flushHistory(this.projectId)
const changes = await fetchLatestChanges(this.projectId, this.version)
this.snapshot.applyAll(changes)
this.version += changes.length
await this.loadDocs()
this.state.endRefresh()
}
/**
* Wait for the current refresh to complete, then start a refresh.
*/
private async queueRefresh() {
this.state.queueRefresh()
await this.refreshPromise
await this.loadChanges()
}
/**
@@ -143,106 +150,6 @@ export class ProjectSnapshot {
}
}
/**
* State machine for the project snapshot
*
* There are 5 states:
*
* - init: when the snapshot is built
* - refreshing: while the snapshot is refreshing
* - queued-waiting: while the snapshot is refreshing and another refresh is queued
* - queued-ready: when a refresh is queued, but no refresh is running
* - ready: when no refresh is running and no refresh is queued
*
* There are three transitions:
*
* - start: start a refresh operation
* - end: end a refresh operation
* - queue: queue a refresh operation
*
* Valid transitions are as follows:
*
* +------------+
* | ready |
* +------------+
* ^ |
* | |
* end start
* | |
* | v
* +------+ +------------+ +----------------+
* | init |----start---->| refreshing |---queue---> | queued-waiting |
* +------+ +------------+ +----------------+
* ^ |
* | |
* start end
* | |
* | +--------------+ |
* +-----| queued-ready |<-------+
* +--------------+
*
* These transitions ensure that there are never two refreshes running
* concurrently. In every path, "start" and "end" transitions always alternate.
* You never have two consecutive "start" or two consecutive "end".
*/
class ProjectSnapshotState {
private state:
| 'init'
| 'refreshing'
| 'ready'
| 'queued-waiting'
| 'queued-ready' = 'init'
getState() {
return this.state
}
startRefresh() {
switch (this.state) {
case 'init':
case 'ready':
case 'queued-ready':
this.state = 'refreshing'
break
default:
throw new OError("Can't start a snapshot refresh in this state", {
state: this.state,
})
}
}
endRefresh() {
switch (this.state) {
case 'refreshing':
this.state = 'ready'
break
case 'queued-waiting':
this.state = 'queued-ready'
break
default:
throw new OError("Can't end a snapshot refresh in this state", {
state: this.state,
})
}
}
queueRefresh() {
switch (this.state) {
case 'refreshing':
this.state = 'queued-waiting'
break
default:
throw new OError("Can't queue a snapshot refresh in this state", {
state: this.state,
})
}
}
}
/**
* Blob store that fetches blobs from the history service
*/
@@ -76,19 +76,69 @@ describe('ProjectSnapshot', function () {
startVersion: 0,
}
const changes = [
{
operations: [
{
pathname: 'hello.txt',
textOperation: ['Quote: ', files['hello.txt'].contents.length],
},
{
pathname: 'goodbye.txt',
file: {
hash: files['goodbye.txt'].hash,
stringLength: files['goodbye.txt'].contents.length,
},
},
],
timestamp: '2025-01-01T13:00:00.000Z',
},
]
function mockFlush(
opts: { repeat?: number; failOnCall?: (call: number) => boolean } = {}
) {
let currentCall = 0
const getResponse = () => {
currentCall += 1
return opts.failOnCall?.(currentCall) ? 500 : 200
}
fetchMock.post(`/project/${projectId}/flush`, getResponse, {
name: 'flush',
repeat: opts.repeat ?? 1,
})
}
function mockLatestChunk() {
fetchMock.getOnce(
`/project/${projectId}/latest/history`,
{ chunk },
{ name: 'latest-chunk' }
)
}
function mockChanges() {
fetchMock.getOnce(`/project/${projectId}/changes?since=1`, changes, {
name: 'changes-1',
})
fetchMock.get(`/project/${projectId}/changes?since=2`, [], {
name: 'changes-2',
})
}
function mockBlobs(paths = Object.keys(files) as (keyof typeof files)[]) {
for (const path of paths) {
const file = files[path]
fetchMock.get(`/project/${projectId}/blob/${file.hash}`, file.contents)
}
}
async function initializeSnapshot() {
fetchMock.postOnce(`/project/${projectId}/flush`, 200)
fetchMock.getOnce(`/project/${projectId}/latest/history`, { chunk })
fetchMock.getOnce(
`/project/${projectId}/blob/${files['main.tex'].hash}`,
files['main.tex'].contents
)
fetchMock.getOnce(
`/project/${projectId}/blob/${files['hello.txt'].hash}`,
files['hello.txt'].contents
)
mockFlush()
mockLatestChunk()
mockBlobs(['main.tex', 'hello.txt'])
await snapshot.refresh()
expect(fetchMock.done()).to.be.true
fetchMock.reset()
}
@@ -121,34 +171,11 @@ describe('ProjectSnapshot', function () {
})
})
const changes = [
{
operations: [
{
pathname: 'hello.txt',
textOperation: ['Quote: ', files['hello.txt'].contents.length],
},
{
pathname: 'goodbye.txt',
file: {
hash: files['goodbye.txt'].hash,
stringLength: files['goodbye.txt'].contents.length,
},
},
],
timestamp: '2025-01-01T13:00:00.000Z',
},
]
async function refreshSnapshot() {
fetchMock.postOnce(`/project/${projectId}/flush`, 200, { repeat: 2 })
fetchMock.getOnce(`/project/${projectId}/changes?since=1`, changes)
fetchMock.getOnce(
`/project/${projectId}/blob/${files['goodbye.txt'].hash}`,
files['goodbye.txt'].contents
)
mockFlush()
mockChanges()
mockBlobs(['goodbye.txt'])
await snapshot.refresh()
expect(fetchMock.done()).to.be.true
fetchMock.reset()
}
@@ -170,7 +197,7 @@ describe('ProjectSnapshot', function () {
})
})
describe('getDocCotents()', function () {
describe('getDocContents()', function () {
it('returns the up to date content', function () {
expect(snapshot.getDocContents('hello.txt')).to.equal(
`Quote: ${files['hello.txt'].contents}`
@@ -184,4 +211,177 @@ describe('ProjectSnapshot', function () {
})
})
})
describe('concurrency', function () {
afterEach(function () {
fetchMock.reset()
})
specify('two concurrent inits', async function () {
mockFlush({ repeat: 2 })
mockLatestChunk()
mockChanges()
mockBlobs()
await Promise.all([snapshot.refresh(), snapshot.refresh()])
// The first request initializes, the second request loads changes
expect(fetchMock.calls('flush')).to.have.length(2)
expect(fetchMock.calls('latest-chunk')).to.have.length(1)
expect(fetchMock.calls('changes-1')).to.have.length(1)
})
specify('three concurrent inits', async function () {
mockFlush({ repeat: 2 })
mockLatestChunk()
mockChanges()
mockBlobs()
await Promise.all([
snapshot.refresh(),
snapshot.refresh(),
snapshot.refresh(),
])
// The first request initializes, the second and third are combined and
// load changes
expect(fetchMock.calls('flush')).to.have.length(2)
expect(fetchMock.calls('latest-chunk')).to.have.length(1)
expect(fetchMock.calls('changes-1')).to.have.length(1)
})
specify('two concurrent inits - first fails', async function () {
mockFlush({ repeat: 2, failOnCall: call => call === 1 })
mockLatestChunk()
mockBlobs()
const results = await Promise.allSettled([
snapshot.refresh(),
snapshot.refresh(),
])
// The first init fails, but the second succeeds
expect(results.filter(r => r.status === 'fulfilled')).to.have.length(1)
expect(fetchMock.calls('flush')).to.have.length(2)
expect(fetchMock.calls('latest-chunk')).to.have.length(1)
expect(fetchMock.calls('changes-1')).to.have.length(0)
})
specify('three concurrent inits - second fails', async function () {
mockFlush({ repeat: 4, failOnCall: call => call === 2 })
mockLatestChunk()
mockChanges()
mockBlobs()
const results = await Promise.allSettled([
snapshot.refresh(),
snapshot.refresh(),
snapshot.refresh(),
])
// Another request afterwards
await snapshot.refresh()
// The first init succeeds, the two queued requests fail, the last request
// succeeds
expect(results.filter(r => r.status === 'fulfilled')).to.have.length(1)
expect(fetchMock.calls('flush')).to.have.length(3)
expect(fetchMock.calls('latest-chunk')).to.have.length(1)
expect(fetchMock.calls('changes-1')).to.have.length(1)
expect(fetchMock.calls('changes-2')).to.have.length(0)
})
specify('two concurrent load changes', async function () {
mockFlush({ repeat: 3 })
mockLatestChunk()
mockChanges()
mockBlobs()
// Initialize
await snapshot.refresh()
// Two concurrent load changes
await Promise.all([snapshot.refresh(), snapshot.refresh()])
// One init, two load changes
expect(fetchMock.calls('flush')).to.have.length(3)
expect(fetchMock.calls('latest-chunk')).to.have.length(1)
expect(fetchMock.calls('changes-1')).to.have.length(1)
expect(fetchMock.calls('changes-2')).to.have.length(1)
})
specify('three concurrent load changes', async function () {
mockFlush({ repeat: 3 })
mockLatestChunk()
mockChanges()
mockBlobs()
// Initialize
await snapshot.refresh()
// Three concurrent load changes
await Promise.all([
snapshot.refresh(),
snapshot.refresh(),
snapshot.refresh(),
])
// One init, two load changes (the two last are queued and combined)
expect(fetchMock.calls('flush')).to.have.length(3)
expect(fetchMock.calls('latest-chunk')).to.have.length(1)
expect(fetchMock.calls('changes-1')).to.have.length(1)
expect(fetchMock.calls('changes-2')).to.have.length(1)
})
specify('two concurrent load changes - first fails', async function () {
mockFlush({ repeat: 3, failOnCall: call => call === 2 })
mockLatestChunk()
mockChanges()
mockBlobs()
// Initialize
await snapshot.refresh()
// Two concurrent load changes
const results = await Promise.allSettled([
snapshot.refresh(),
snapshot.refresh(),
])
// One init, one load changes fails, the second succeeds
expect(results.filter(r => r.status === 'fulfilled')).to.have.length(1)
expect(fetchMock.calls('flush')).to.have.length(3)
expect(fetchMock.calls('latest-chunk')).to.have.length(1)
expect(fetchMock.calls('changes-1')).to.have.length(1)
expect(fetchMock.calls('changes-2')).to.have.length(0)
})
specify('three concurrent load changes - second fails', async function () {
mockFlush({ repeat: 4, failOnCall: call => call === 3 })
mockLatestChunk()
mockChanges()
mockBlobs()
// Initialize
await snapshot.refresh()
// Two concurrent load changes
const results = await Promise.allSettled([
snapshot.refresh(),
snapshot.refresh(),
snapshot.refresh(),
])
// Another request afterwards
await snapshot.refresh()
// One init, one load changes succeeds, the second and third are combined
// and fail, the last request succeeds
expect(results.filter(r => r.status === 'fulfilled')).to.have.length(1)
expect(fetchMock.calls('flush')).to.have.length(4)
expect(fetchMock.calls('latest-chunk')).to.have.length(1)
expect(fetchMock.calls('changes-1')).to.have.length(1)
expect(fetchMock.calls('changes-2')).to.have.length(1)
})
})
})