mirror of
https://github.com/yu-i-i/overleaf-cep.git
synced 2026-05-23 09:09:36 +02:00
[real-time] Promisify DocumentUpdaterManager (#30322)
* move function outside of object * convert flushProjectToMongoAndDelete function * convert queueChange function * install fetch utils, remove private fn from export * promisify getDocument function * fix unit tests * preserve function order * try parsing error body * update tests on a new rebased file * remove fetch-utils from devDependencies GitOrigin-RevId: cc34d6f690b6b888214b32c2aee0efb6b585e159
This commit is contained in:
committed by
Copybot
parent
8eba220693
commit
41b8ccc69f
2
package-lock.json
generated
2
package-lock.json
generated
@@ -58898,6 +58898,7 @@
|
||||
"services/real-time": {
|
||||
"name": "@overleaf/real-time",
|
||||
"dependencies": {
|
||||
"@overleaf/fetch-utils": "*",
|
||||
"@overleaf/logger": "*",
|
||||
"@overleaf/metrics": "*",
|
||||
"@overleaf/o-error": "*",
|
||||
@@ -58920,7 +58921,6 @@
|
||||
"zod-validation-error": "^4.0.1"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@overleaf/fetch-utils": "*",
|
||||
"chai": "^4.3.6",
|
||||
"chai-as-promised": "^7.1.1",
|
||||
"cookie-signature": "^1.1.0",
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import request from 'request'
|
||||
import _ from 'lodash'
|
||||
import OError from '@overleaf/o-error'
|
||||
import logger from '@overleaf/logger'
|
||||
@@ -6,6 +5,12 @@ import settings from '@overleaf/settings'
|
||||
import metrics from '@overleaf/metrics'
|
||||
import RedisWrapper from '@overleaf/redis-wrapper'
|
||||
import Errors from './Errors.js'
|
||||
import {
|
||||
fetchJson,
|
||||
fetchNothing,
|
||||
RequestFailedError,
|
||||
} from '@overleaf/fetch-utils'
|
||||
import { callbackify } from 'node:util'
|
||||
|
||||
const {
|
||||
ClientRequestedMissingOpsError,
|
||||
@@ -13,145 +18,141 @@ const {
|
||||
NullBytesInOpError,
|
||||
UpdateTooLargeError,
|
||||
} = Errors
|
||||
|
||||
const rclient = RedisWrapper.createClient(settings.redis.documentupdater)
|
||||
const Keys = settings.redis.documentupdater.key_schema
|
||||
|
||||
const DocumentUpdaterManager = {
|
||||
getDocument(projectId, docId, fromVersion, callback) {
|
||||
const timer = new metrics.Timer('get-document')
|
||||
const url = `${settings.apis.documentupdater.url}/project/${projectId}/doc/${docId}?fromVersion=${fromVersion}&historyOTSupport=true`
|
||||
logger.debug(
|
||||
{ projectId, docId, fromVersion },
|
||||
'getting doc from document updater'
|
||||
)
|
||||
request.get(url, function (err, res, body) {
|
||||
timer.done()
|
||||
if (err) {
|
||||
OError.tag(err, 'error getting doc from doc updater')
|
||||
return callback(err)
|
||||
async function getDocument(projectId, docId, fromVersion) {
|
||||
const timer = new metrics.Timer('get-document')
|
||||
const url = `${settings.apis.documentupdater.url}/project/${projectId}/doc/${docId}?fromVersion=${fromVersion}&historyOTSupport=true`
|
||||
logger.debug(
|
||||
{ projectId, docId, fromVersion },
|
||||
'getting doc from document updater'
|
||||
)
|
||||
try {
|
||||
const body = await fetchJson(url)
|
||||
timer.done()
|
||||
logger.debug({ projectId, docId }, 'got doc from document document updater')
|
||||
return {
|
||||
lines: body?.lines,
|
||||
version: body?.version,
|
||||
ranges: body?.ranges,
|
||||
ops: body?.ops,
|
||||
ttlInS: body?.ttlInS,
|
||||
type: body?.type,
|
||||
}
|
||||
} catch (err) {
|
||||
timer.done()
|
||||
if (err instanceof RequestFailedError) {
|
||||
const { response, body } = err
|
||||
let parsedErrBody = null
|
||||
try {
|
||||
parsedErrBody = JSON.parse(body)
|
||||
} catch (error) {
|
||||
// ignore parse error
|
||||
}
|
||||
if (res.statusCode >= 200 && res.statusCode < 300) {
|
||||
logger.debug(
|
||||
{ projectId, docId },
|
||||
'got doc from document document updater'
|
||||
)
|
||||
try {
|
||||
body = JSON.parse(body)
|
||||
} catch (error) {
|
||||
OError.tag(error, 'error parsing doc updater response')
|
||||
return callback(error)
|
||||
}
|
||||
body = body || {}
|
||||
callback(
|
||||
null,
|
||||
body.lines,
|
||||
body.version,
|
||||
body.ranges,
|
||||
body.ops,
|
||||
body.ttlInS,
|
||||
body.type
|
||||
)
|
||||
} else if (res.statusCode === 422 && body?.firstVersionInRedis) {
|
||||
callback(new ClientRequestedMissingOpsError(422, body))
|
||||
} else if ([404, 422].includes(res.statusCode)) {
|
||||
callback(new ClientRequestedMissingOpsError(res.statusCode))
|
||||
if (response.status === 422 && parsedErrBody?.firstVersionInRedis) {
|
||||
throw new ClientRequestedMissingOpsError(422, parsedErrBody)
|
||||
} else if ([404, 422].includes(response.status)) {
|
||||
throw new ClientRequestedMissingOpsError(response.status)
|
||||
} else {
|
||||
callback(
|
||||
new DocumentUpdaterRequestFailedError('getDocument', res.statusCode)
|
||||
throw new DocumentUpdaterRequestFailedError(
|
||||
'getDocument',
|
||||
response.status
|
||||
)
|
||||
}
|
||||
})
|
||||
},
|
||||
|
||||
checkDocument(projectId, docId, callback) {
|
||||
// in this call fromVersion = -1 means get document without docOps
|
||||
DocumentUpdaterManager.getDocument(projectId, docId, -1, callback)
|
||||
},
|
||||
|
||||
flushProjectToMongoAndDelete(projectId, callback) {
|
||||
// this method is called when the last connected user leaves the project
|
||||
logger.debug({ projectId }, 'deleting project from document updater')
|
||||
const timer = new metrics.Timer('delete.mongo.project')
|
||||
// flush the project in the background when all users have left
|
||||
const url =
|
||||
`${settings.apis.documentupdater.url}/project/${projectId}?background=true` +
|
||||
(settings.shutDownInProgress ? '&shutdown=true' : '')
|
||||
request.del(url, function (err, res) {
|
||||
timer.done()
|
||||
if (err) {
|
||||
OError.tag(err, 'error deleting project from document updater')
|
||||
callback(err)
|
||||
} else if (res.statusCode >= 200 && res.statusCode < 300) {
|
||||
logger.debug({ projectId }, 'deleted project from document updater')
|
||||
callback(null)
|
||||
} else {
|
||||
callback(
|
||||
new DocumentUpdaterRequestFailedError(
|
||||
'flushProjectToMongoAndDelete',
|
||||
res.statusCode
|
||||
)
|
||||
)
|
||||
}
|
||||
})
|
||||
},
|
||||
|
||||
_getPendingUpdateListKey() {
|
||||
const shard = _.random(0, settings.pendingUpdateListShardCount - 1)
|
||||
if (shard === 0) {
|
||||
return 'pending-updates-list'
|
||||
} else {
|
||||
return `pending-updates-list-${shard}`
|
||||
}
|
||||
},
|
||||
|
||||
queueChange(projectId, docId, change, callback) {
|
||||
const allowedKeys = [
|
||||
'doc',
|
||||
'op',
|
||||
'v',
|
||||
'dupIfSource',
|
||||
'meta',
|
||||
'lastV',
|
||||
'hash',
|
||||
]
|
||||
change = _.pick(change, allowedKeys)
|
||||
const jsonChange = JSON.stringify(change)
|
||||
if (jsonChange.indexOf('\u0000') !== -1) {
|
||||
// memory corruption check
|
||||
return callback(new NullBytesInOpError(jsonChange))
|
||||
}
|
||||
|
||||
const updateSize = jsonChange.length
|
||||
if (updateSize > settings.maxUpdateSize) {
|
||||
return callback(new UpdateTooLargeError(updateSize))
|
||||
}
|
||||
|
||||
// record metric for each update added to queue
|
||||
metrics.summary('redis.pendingUpdates', updateSize, { status: 'push' })
|
||||
|
||||
const docKey = `${projectId}:${docId}`
|
||||
// Push onto pendingUpdates for doc_id first, because once the doc updater
|
||||
// gets an entry on pending-updates-list, it starts processing.
|
||||
rclient.rpush(
|
||||
Keys.pendingUpdates({ doc_id: docId }),
|
||||
jsonChange,
|
||||
function (error) {
|
||||
if (error) {
|
||||
error = new OError('error pushing update into redis').withCause(error)
|
||||
return callback(error)
|
||||
}
|
||||
const queueKey = DocumentUpdaterManager._getPendingUpdateListKey()
|
||||
rclient.rpush(queueKey, docKey, function (error) {
|
||||
if (error) {
|
||||
error = new OError('error pushing doc_id into redis')
|
||||
.withInfo({ queueKey })
|
||||
.withCause(error)
|
||||
}
|
||||
callback(error)
|
||||
})
|
||||
}
|
||||
)
|
||||
},
|
||||
OError.tag(err, 'error getting doc from doc updater')
|
||||
throw err
|
||||
}
|
||||
}
|
||||
|
||||
export default DocumentUpdaterManager
|
||||
async function checkDocument(projectId, docId) {
|
||||
// in this call fromVersion = -1 means get document without docOps
|
||||
return await getDocument(projectId, docId, -1)
|
||||
}
|
||||
|
||||
async function flushProjectToMongoAndDelete(projectId) {
|
||||
// this method is called when the last connected user leaves the project
|
||||
logger.debug({ projectId }, 'deleting project from document updater')
|
||||
const timer = new metrics.Timer('delete.mongo.project')
|
||||
// flush the project in the background when all users have left
|
||||
const url =
|
||||
`${settings.apis.documentupdater.url}/project/${projectId}?background=true` +
|
||||
(settings.shutDownInProgress ? '&shutdown=true' : '')
|
||||
|
||||
try {
|
||||
await fetchNothing(url, { method: 'DELETE' })
|
||||
logger.debug({ projectId }, 'deleted project from document updater')
|
||||
timer.done()
|
||||
} catch (err) {
|
||||
timer.done()
|
||||
if (err instanceof RequestFailedError) {
|
||||
throw new DocumentUpdaterRequestFailedError(
|
||||
'flushProjectToMongoAndDelete',
|
||||
err.response.status
|
||||
)
|
||||
}
|
||||
OError.tag(err, 'error deleting project from document updater')
|
||||
throw err
|
||||
}
|
||||
}
|
||||
|
||||
function _getPendingUpdateListKey() {
|
||||
const shard = _.random(0, settings.pendingUpdateListShardCount - 1)
|
||||
if (shard === 0) {
|
||||
return 'pending-updates-list'
|
||||
} else {
|
||||
return `pending-updates-list-${shard}`
|
||||
}
|
||||
}
|
||||
|
||||
async function queueChange(projectId, docId, change) {
|
||||
const allowedKeys = ['doc', 'op', 'v', 'dupIfSource', 'meta', 'lastV', 'hash']
|
||||
change = _.pick(change, allowedKeys)
|
||||
const jsonChange = JSON.stringify(change)
|
||||
if (jsonChange.indexOf('\u0000') !== -1) {
|
||||
// memory corruption check
|
||||
throw new NullBytesInOpError(jsonChange)
|
||||
}
|
||||
|
||||
const updateSize = jsonChange.length
|
||||
if (updateSize > settings.maxUpdateSize) {
|
||||
throw new UpdateTooLargeError(updateSize)
|
||||
}
|
||||
|
||||
// record metric for each update added to queue
|
||||
metrics.summary('redis.pendingUpdates', updateSize, { status: 'push' })
|
||||
|
||||
const docKey = `${projectId}:${docId}`
|
||||
// Push onto pendingUpdates for doc_id first, because once the doc updater
|
||||
// gets an entry on pending-updates-list, it starts processing.
|
||||
try {
|
||||
await rclient.rpush(Keys.pendingUpdates({ doc_id: docId }), jsonChange)
|
||||
} catch (error) {
|
||||
throw new OError('error pushing update into redis').withCause(error)
|
||||
}
|
||||
|
||||
const queueKey = _getPendingUpdateListKey()
|
||||
try {
|
||||
await rclient.rpush(queueKey, docKey)
|
||||
} catch (error) {
|
||||
throw new OError('error pushing doc_id into redis')
|
||||
.withInfo({ queueKey })
|
||||
.withCause(error)
|
||||
}
|
||||
}
|
||||
|
||||
export default {
|
||||
getDocument: callbackify(getDocument),
|
||||
checkDocument: callbackify(checkDocument),
|
||||
flushProjectToMongoAndDelete: callbackify(flushProjectToMongoAndDelete),
|
||||
_getPendingUpdateListKey,
|
||||
queueChange: callbackify(queueChange),
|
||||
promises: {
|
||||
getDocument,
|
||||
checkDocument,
|
||||
flushProjectToMongoAndDelete,
|
||||
queueChange,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -287,7 +287,7 @@ export default WebsocketController = {
|
||||
projectId,
|
||||
docId,
|
||||
fromVersion,
|
||||
function (error, lines, version, ranges, ops, ttlInS, type) {
|
||||
function (error, { lines, version, ranges, ops, ttlInS, type }) {
|
||||
if (error) {
|
||||
if (error instanceof ClientRequestedMissingOpsError) {
|
||||
emitJoinDocCatchUpMetrics('missing', error.info)
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
"types:check": "tsc --noEmit"
|
||||
},
|
||||
"dependencies": {
|
||||
"@overleaf/fetch-utils": "*",
|
||||
"@overleaf/logger": "*",
|
||||
"@overleaf/metrics": "*",
|
||||
"@overleaf/o-error": "*",
|
||||
@@ -40,7 +41,6 @@
|
||||
"zod-validation-error": "^4.0.1"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@overleaf/fetch-utils": "*",
|
||||
"chai": "^4.3.6",
|
||||
"chai-as-promised": "^7.1.1",
|
||||
"cookie-signature": "^1.1.0",
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
|
||||
*/
|
||||
import sinon from 'sinon'
|
||||
import { vi, describe, beforeEach, it, afterEach } from 'vitest'
|
||||
import { expect, vi, describe, beforeEach, it, afterEach } from 'vitest'
|
||||
import _ from 'lodash'
|
||||
const modulePath = '../../../app/js/DocumentUpdaterManager'
|
||||
|
||||
@@ -31,13 +31,26 @@ describe('DocumentUpdaterManager', function () {
|
||||
pendingUpdateListShardCount: 10,
|
||||
}
|
||||
ctx.rclient = { auth() {} }
|
||||
ctx.fetchJson = sinon.stub()
|
||||
ctx.fetchNothing = sinon.stub()
|
||||
ctx.RequestFailedError = class RequestFailedError extends Error {
|
||||
constructor(message, url, options, response, body) {
|
||||
super(message)
|
||||
this.url = url
|
||||
this.options = options
|
||||
this.response = response
|
||||
this.body = body
|
||||
}
|
||||
}
|
||||
|
||||
vi.doMock('@overleaf/settings', () => ({
|
||||
default: ctx.settings,
|
||||
}))
|
||||
|
||||
vi.doMock('request', () => ({
|
||||
default: (ctx.request = {}),
|
||||
vi.doMock('@overleaf/fetch-utils', () => ({
|
||||
fetchJson: ctx.fetchJson,
|
||||
fetchNothing: ctx.fetchNothing,
|
||||
RequestFailedError: ctx.RequestFailedError,
|
||||
}))
|
||||
|
||||
vi.doMock('@overleaf/redis-wrapper', () => ({
|
||||
@@ -59,88 +72,75 @@ describe('DocumentUpdaterManager', function () {
|
||||
}) // avoid modifying JSON object directly
|
||||
|
||||
describe('getDocument', function () {
|
||||
beforeEach(function (ctx) {
|
||||
ctx.callback = sinon.stub()
|
||||
})
|
||||
|
||||
describe('successfully', function () {
|
||||
beforeEach(function (ctx) {
|
||||
ctx.body = JSON.stringify({
|
||||
beforeEach(async function (ctx) {
|
||||
ctx.body = {
|
||||
lines: ctx.lines,
|
||||
version: ctx.version,
|
||||
ops: (ctx.ops = ['mock-op-1', 'mock-op-2']),
|
||||
ranges: (ctx.ranges = { mock: 'ranges' }),
|
||||
})
|
||||
}
|
||||
ctx.fromVersion = 2
|
||||
ctx.request.get = sinon
|
||||
.stub()
|
||||
.callsArgWith(1, null, { statusCode: 200 }, ctx.body)
|
||||
ctx.DocumentUpdaterManager.getDocument(
|
||||
ctx.fetchJson.resolves(ctx.body)
|
||||
ctx.result = await ctx.DocumentUpdaterManager.promises.getDocument(
|
||||
ctx.project_id,
|
||||
ctx.doc_id,
|
||||
ctx.fromVersion,
|
||||
ctx.callback
|
||||
ctx.fromVersion
|
||||
)
|
||||
})
|
||||
|
||||
it('should get the document from the document updater', function (ctx) {
|
||||
const url = `${ctx.settings.apis.documentupdater.url}/project/${ctx.project_id}/doc/${ctx.doc_id}?fromVersion=${ctx.fromVersion}&historyOTSupport=true`
|
||||
ctx.request.get.calledWith(url).should.equal(true)
|
||||
ctx.fetchJson.calledWith(url).should.equal(true)
|
||||
})
|
||||
|
||||
it('should call the callback with the lines, version, ranges and ops', function (ctx) {
|
||||
ctx.callback
|
||||
.calledWith(null, ctx.lines, ctx.version, ctx.ranges, ctx.ops)
|
||||
.should.equal(true)
|
||||
it('should return the lines, version, ranges and ops', function (ctx) {
|
||||
ctx.result.lines.should.deep.equal(ctx.lines)
|
||||
ctx.result.version.should.equal(ctx.version)
|
||||
ctx.result.ranges.should.deep.equal(ctx.ranges)
|
||||
ctx.result.ops.should.deep.equal(ctx.ops)
|
||||
})
|
||||
})
|
||||
|
||||
describe('when the document updater API returns an error', function () {
|
||||
beforeEach(function (ctx) {
|
||||
ctx.request.get = sinon
|
||||
.stub()
|
||||
.callsArgWith(
|
||||
1,
|
||||
(ctx.error = new Error('something went wrong')),
|
||||
null,
|
||||
null
|
||||
)
|
||||
ctx.DocumentUpdaterManager.getDocument(
|
||||
ctx.project_id,
|
||||
ctx.doc_id,
|
||||
ctx.fromVersion,
|
||||
ctx.callback
|
||||
)
|
||||
ctx.error = new Error('something went wrong')
|
||||
ctx.fetchJson.rejects(ctx.error)
|
||||
})
|
||||
|
||||
it('should return an error to the callback', function (ctx) {
|
||||
ctx.callback.calledWith(ctx.error).should.equal(true)
|
||||
it('should throw an error', async function (ctx) {
|
||||
await expect(
|
||||
ctx.DocumentUpdaterManager.promises.getDocument(
|
||||
ctx.project_id,
|
||||
ctx.doc_id,
|
||||
ctx.fromVersion
|
||||
)
|
||||
).to.be.rejectedWith(ctx.error)
|
||||
})
|
||||
})
|
||||
;[404, 422].forEach(statusCode =>
|
||||
describe(`when the document updater returns a ${statusCode} status code`, function () {
|
||||
beforeEach(function (ctx) {
|
||||
ctx.request.get = sinon
|
||||
.stub()
|
||||
.callsArgWith(1, null, { statusCode }, '')
|
||||
ctx.DocumentUpdaterManager.getDocument(
|
||||
ctx.project_id,
|
||||
ctx.doc_id,
|
||||
ctx.fromVersion,
|
||||
ctx.callback
|
||||
const error = new ctx.RequestFailedError(
|
||||
'error',
|
||||
'url',
|
||||
{},
|
||||
{ status: statusCode },
|
||||
null
|
||||
)
|
||||
ctx.fetchJson.rejects(error)
|
||||
})
|
||||
|
||||
it('should return the callback with an error', function (ctx) {
|
||||
ctx.callback.called.should.equal(true)
|
||||
ctx.callback
|
||||
.calledWith(
|
||||
sinon.match({
|
||||
message: 'doc updater could not load requested ops',
|
||||
info: { statusCode },
|
||||
})
|
||||
it('should throw an error with the status code', async function (ctx) {
|
||||
const error = await expect(
|
||||
ctx.DocumentUpdaterManager.promises.getDocument(
|
||||
ctx.project_id,
|
||||
ctx.doc_id,
|
||||
ctx.fromVersion
|
||||
)
|
||||
.should.equal(true)
|
||||
).to.be.rejected
|
||||
error.message.should.equal('doc updater could not load requested ops')
|
||||
error.info.statusCode.should.equal(statusCode)
|
||||
ctx.logger.error.called.should.equal(false)
|
||||
ctx.logger.warn.called.should.equal(false)
|
||||
})
|
||||
@@ -149,106 +149,89 @@ describe('DocumentUpdaterManager', function () {
|
||||
|
||||
describe('when the document updater returns a failure error code', function () {
|
||||
beforeEach(function (ctx) {
|
||||
ctx.request.get = sinon
|
||||
.stub()
|
||||
.callsArgWith(1, null, { statusCode: 500 }, '')
|
||||
ctx.DocumentUpdaterManager.getDocument(
|
||||
ctx.project_id,
|
||||
ctx.doc_id,
|
||||
ctx.fromVersion,
|
||||
ctx.callback
|
||||
const error = new ctx.RequestFailedError(
|
||||
'error',
|
||||
'url',
|
||||
{},
|
||||
{ status: 500 },
|
||||
null
|
||||
)
|
||||
ctx.fetchJson.rejects(error)
|
||||
})
|
||||
|
||||
it('should return the callback with an error', function (ctx) {
|
||||
ctx.callback.called.should.equal(true)
|
||||
ctx.callback
|
||||
.calledWith(
|
||||
sinon.match({
|
||||
message: 'doc updater returned a non-success status code',
|
||||
info: {
|
||||
action: 'getDocument',
|
||||
statusCode: 500,
|
||||
},
|
||||
})
|
||||
it('should throw an error', async function (ctx) {
|
||||
const error = await expect(
|
||||
ctx.DocumentUpdaterManager.promises.getDocument(
|
||||
ctx.project_id,
|
||||
ctx.doc_id,
|
||||
ctx.fromVersion
|
||||
)
|
||||
.should.equal(true)
|
||||
).to.be.rejected
|
||||
error.message.should.equal(
|
||||
'doc updater returned a non-success status code'
|
||||
)
|
||||
error.info.action.should.equal('getDocument')
|
||||
error.info.statusCode.should.equal(500)
|
||||
ctx.logger.error.called.should.equal(false)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe('flushProjectToMongoAndDelete', function () {
|
||||
beforeEach(function (ctx) {
|
||||
ctx.callback = sinon.stub()
|
||||
})
|
||||
|
||||
describe('successfully', function () {
|
||||
beforeEach(function (ctx) {
|
||||
ctx.request.del = sinon
|
||||
.stub()
|
||||
.callsArgWith(1, null, { statusCode: 204 }, '')
|
||||
ctx.DocumentUpdaterManager.flushProjectToMongoAndDelete(
|
||||
ctx.project_id,
|
||||
ctx.callback
|
||||
beforeEach(async function (ctx) {
|
||||
ctx.fetchNothing.resolves()
|
||||
await ctx.DocumentUpdaterManager.promises.flushProjectToMongoAndDelete(
|
||||
ctx.project_id
|
||||
)
|
||||
})
|
||||
|
||||
it('should delete the project from the document updater', function (ctx) {
|
||||
const url = `${ctx.settings.apis.documentupdater.url}/project/${ctx.project_id}?background=true`
|
||||
ctx.request.del.calledWith(url).should.equal(true)
|
||||
})
|
||||
|
||||
it('should call the callback with no error', function (ctx) {
|
||||
ctx.callback.calledWith(null).should.equal(true)
|
||||
ctx.fetchNothing
|
||||
.calledWith(url, { method: 'DELETE' })
|
||||
.should.equal(true)
|
||||
})
|
||||
})
|
||||
|
||||
describe('when the document updater API returns an error', function () {
|
||||
beforeEach(function (ctx) {
|
||||
ctx.request.del = sinon
|
||||
.stub()
|
||||
.callsArgWith(
|
||||
1,
|
||||
(ctx.error = new Error('something went wrong')),
|
||||
null,
|
||||
null
|
||||
)
|
||||
ctx.DocumentUpdaterManager.flushProjectToMongoAndDelete(
|
||||
ctx.project_id,
|
||||
ctx.callback
|
||||
)
|
||||
ctx.error = new Error('something went wrong')
|
||||
ctx.fetchNothing.rejects(ctx.error)
|
||||
})
|
||||
|
||||
it('should return an error to the callback', function (ctx) {
|
||||
ctx.callback.calledWith(ctx.error).should.equal(true)
|
||||
it('should throw an error', async function (ctx) {
|
||||
await expect(
|
||||
ctx.DocumentUpdaterManager.promises.flushProjectToMongoAndDelete(
|
||||
ctx.project_id
|
||||
)
|
||||
).to.be.rejectedWith(ctx.error)
|
||||
})
|
||||
})
|
||||
|
||||
describe('when the document updater returns a failure error code', function () {
|
||||
beforeEach(function (ctx) {
|
||||
ctx.request.del = sinon
|
||||
.stub()
|
||||
.callsArgWith(1, null, { statusCode: 500 }, '')
|
||||
ctx.DocumentUpdaterManager.flushProjectToMongoAndDelete(
|
||||
ctx.project_id,
|
||||
ctx.callback
|
||||
const error = new ctx.RequestFailedError(
|
||||
'error',
|
||||
'url',
|
||||
{},
|
||||
{ status: 500 },
|
||||
null
|
||||
)
|
||||
ctx.fetchNothing.rejects(error)
|
||||
})
|
||||
|
||||
it('should return the callback with an error', function (ctx) {
|
||||
ctx.callback.called.should.equal(true)
|
||||
ctx.callback
|
||||
.calledWith(
|
||||
sinon.match({
|
||||
message: 'doc updater returned a non-success status code',
|
||||
info: {
|
||||
action: 'flushProjectToMongoAndDelete',
|
||||
statusCode: 500,
|
||||
},
|
||||
})
|
||||
it('should throw an error', async function (ctx) {
|
||||
const error = await expect(
|
||||
ctx.DocumentUpdaterManager.promises.flushProjectToMongoAndDelete(
|
||||
ctx.project_id
|
||||
)
|
||||
.should.equal(true)
|
||||
).to.be.rejected
|
||||
error.message.should.equal(
|
||||
'doc updater returned a non-success status code'
|
||||
)
|
||||
error.info.action.should.equal('flushProjectToMongoAndDelete')
|
||||
error.info.statusCode.should.equal(500)
|
||||
})
|
||||
})
|
||||
})
|
||||
@@ -260,22 +243,15 @@ describe('DocumentUpdaterManager', function () {
|
||||
op: [{ d: 'test', p: 345 }],
|
||||
v: 789,
|
||||
}
|
||||
ctx.rclient.rpush = sinon.stub().yields()
|
||||
ctx.callback = sinon.stub()
|
||||
ctx.rclient.rpush = sinon.stub().resolves()
|
||||
})
|
||||
|
||||
describe('successfully', function () {
|
||||
beforeEach(function (ctx) {
|
||||
ctx.pendingUpdateListKey = `pending-updates-list-key-${Math.random()}`
|
||||
|
||||
ctx.DocumentUpdaterManager._getPendingUpdateListKey = sinon
|
||||
.stub()
|
||||
.returns(ctx.pendingUpdateListKey)
|
||||
ctx.DocumentUpdaterManager.queueChange(
|
||||
beforeEach(async function (ctx) {
|
||||
await ctx.DocumentUpdaterManager.promises.queueChange(
|
||||
ctx.project_id,
|
||||
ctx.doc_id,
|
||||
ctx.change,
|
||||
ctx.callback
|
||||
ctx.change
|
||||
)
|
||||
})
|
||||
|
||||
@@ -289,12 +265,12 @@ describe('DocumentUpdaterManager', function () {
|
||||
})
|
||||
|
||||
it('should notify the doc updater of the change via the pending-updates-list queue', function (ctx) {
|
||||
ctx.rclient.rpush
|
||||
.calledWith(
|
||||
ctx.pendingUpdateListKey,
|
||||
`${ctx.project_id}:${ctx.doc_id}`
|
||||
)
|
||||
.should.equal(true)
|
||||
// The second call should be to a pending-updates-list key (either base or sharded)
|
||||
const secondCall = ctx.rclient.rpush.secondCall
|
||||
secondCall.should.exist
|
||||
const queueKey = secondCall.args[0]
|
||||
queueKey.should.match(/^pending-updates-list(-\d+)?$/)
|
||||
secondCall.args[1].should.equal(`${ctx.project_id}:${ctx.doc_id}`)
|
||||
})
|
||||
})
|
||||
|
||||
@@ -302,17 +278,17 @@ describe('DocumentUpdaterManager', function () {
|
||||
beforeEach(function (ctx) {
|
||||
ctx.rclient.rpush = sinon
|
||||
.stub()
|
||||
.yields(new Error('something went wrong'))
|
||||
ctx.DocumentUpdaterManager.queueChange(
|
||||
ctx.project_id,
|
||||
ctx.doc_id,
|
||||
ctx.change,
|
||||
ctx.callback
|
||||
)
|
||||
.rejects(new Error('something went wrong'))
|
||||
})
|
||||
|
||||
it('should return an error', function (ctx) {
|
||||
ctx.callback.calledWithExactly(sinon.match(Error)).should.equal(true)
|
||||
it('should throw an error', async function (ctx) {
|
||||
await expect(
|
||||
ctx.DocumentUpdaterManager.promises.queueChange(
|
||||
ctx.project_id,
|
||||
ctx.doc_id,
|
||||
ctx.change
|
||||
)
|
||||
).to.be.rejected
|
||||
})
|
||||
})
|
||||
|
||||
@@ -321,23 +297,30 @@ describe('DocumentUpdaterManager', function () {
|
||||
ctx.stringifyStub = sinon
|
||||
.stub(JSON, 'stringify')
|
||||
.callsFake(() => '["bad bytes! \u0000 <- here"]')
|
||||
ctx.DocumentUpdaterManager.queueChange(
|
||||
ctx.project_id,
|
||||
ctx.doc_id,
|
||||
ctx.change,
|
||||
ctx.callback
|
||||
)
|
||||
})
|
||||
|
||||
afterEach(function (ctx) {
|
||||
ctx.stringifyStub.restore()
|
||||
})
|
||||
|
||||
it('should return an error', function (ctx) {
|
||||
ctx.callback.calledWithExactly(sinon.match(Error)).should.equal(true)
|
||||
it('should throw an error', async function (ctx) {
|
||||
await expect(
|
||||
ctx.DocumentUpdaterManager.promises.queueChange(
|
||||
ctx.project_id,
|
||||
ctx.doc_id,
|
||||
ctx.change
|
||||
)
|
||||
).to.be.rejected
|
||||
})
|
||||
|
||||
it('should not push the change onto the pending-updates-list queue', function (ctx) {
|
||||
it('should not push the change onto the pending-updates-list queue', async function (ctx) {
|
||||
await expect(
|
||||
ctx.DocumentUpdaterManager.promises.queueChange(
|
||||
ctx.project_id,
|
||||
ctx.doc_id,
|
||||
ctx.change
|
||||
)
|
||||
).to.be.rejected
|
||||
ctx.rclient.rpush.called.should.equal(false)
|
||||
})
|
||||
})
|
||||
@@ -347,38 +330,51 @@ describe('DocumentUpdaterManager', function () {
|
||||
ctx.change = {
|
||||
op: { p: 12, t: 'update is too large'.repeat(1024 * 400) },
|
||||
}
|
||||
ctx.DocumentUpdaterManager.queueChange(
|
||||
ctx.project_id,
|
||||
ctx.doc_id,
|
||||
ctx.change,
|
||||
ctx.callback
|
||||
)
|
||||
})
|
||||
|
||||
it('should return an error', function (ctx) {
|
||||
ctx.callback.calledWithExactly(sinon.match(Error)).should.equal(true)
|
||||
it('should throw an error', async function (ctx) {
|
||||
await expect(
|
||||
ctx.DocumentUpdaterManager.promises.queueChange(
|
||||
ctx.project_id,
|
||||
ctx.doc_id,
|
||||
ctx.change
|
||||
)
|
||||
).to.be.rejected
|
||||
})
|
||||
|
||||
it('should add the size to the error', function (ctx) {
|
||||
ctx.callback.args[0][0].info.updateSize.should.equal(7782422)
|
||||
it('should add the size to the error', async function (ctx) {
|
||||
const error = await expect(
|
||||
ctx.DocumentUpdaterManager.promises.queueChange(
|
||||
ctx.project_id,
|
||||
ctx.doc_id,
|
||||
ctx.change
|
||||
)
|
||||
).to.be.rejected
|
||||
error.info.updateSize.should.equal(7782422)
|
||||
})
|
||||
|
||||
it('should not push the change onto the pending-updates-list queue', function (ctx) {
|
||||
it('should not push the change onto the pending-updates-list queue', async function (ctx) {
|
||||
await expect(
|
||||
ctx.DocumentUpdaterManager.promises.queueChange(
|
||||
ctx.project_id,
|
||||
ctx.doc_id,
|
||||
ctx.change
|
||||
)
|
||||
).to.be.rejected
|
||||
ctx.rclient.rpush.called.should.equal(false)
|
||||
})
|
||||
})
|
||||
|
||||
describe('with invalid keys', function () {
|
||||
beforeEach(function (ctx) {
|
||||
beforeEach(async function (ctx) {
|
||||
ctx.change = {
|
||||
op: [{ d: 'test', p: 345 }],
|
||||
version: 789, // not a valid key
|
||||
}
|
||||
ctx.DocumentUpdaterManager.queueChange(
|
||||
await ctx.DocumentUpdaterManager.promises.queueChange(
|
||||
ctx.project_id,
|
||||
ctx.doc_id,
|
||||
ctx.change,
|
||||
ctx.callback
|
||||
ctx.change
|
||||
)
|
||||
})
|
||||
|
||||
|
||||
@@ -550,7 +550,12 @@ describe('WebsocketController', function () {
|
||||
.callsArgWith(2, null)
|
||||
ctx.DocumentUpdaterManager.getDocument = sinon
|
||||
.stub()
|
||||
.callsArgWith(3, null, ctx.doc_lines, ctx.version, ctx.ranges, ctx.ops)
|
||||
.callsArgWith(3, null, {
|
||||
lines: ctx.doc_lines,
|
||||
version: ctx.version,
|
||||
ranges: ctx.ranges,
|
||||
ops: ctx.ops,
|
||||
})
|
||||
ctx.RoomManager.joinDoc = sinon.stub().callsArg(2)
|
||||
})
|
||||
|
||||
@@ -937,7 +942,12 @@ describe('WebsocketController', function () {
|
||||
callback
|
||||
) => {
|
||||
ctx.client.disconnected = true
|
||||
callback(null, ctx.doc_lines, ctx.version, ctx.ranges, ctx.ops)
|
||||
return callback(null, {
|
||||
lines: ctx.doc_lines,
|
||||
version: ctx.version,
|
||||
ranges: ctx.ranges,
|
||||
ops: ctx.ops,
|
||||
})
|
||||
}
|
||||
|
||||
ctx.WebsocketController.joinDoc(
|
||||
|
||||
Reference in New Issue
Block a user