Merge pull request #5941 from overleaf/em-cleanup-redis-manager

Decaf cleanup docupdater RedisManager and DocumentManager

GitOrigin-RevId: 6a9634d1882a3a7328cac8cd88537dd9d204b281
This commit is contained in:
Eric Mc Sween
2021-11-30 08:26:20 -05:00
committed by Copybot
parent 40b087fee8
commit 7a0b40a4bf
4 changed files with 828 additions and 1067 deletions

View File

@@ -8,7 +8,7 @@
"prettier"
],
"parserOptions": {
"ecmaVersion": 2018
"ecmaVersion": 2020
},
"plugins": [
"mocha",

File diff suppressed because it is too large Load Diff

View File

@@ -1,17 +1,3 @@
/* eslint-disable
camelcase,
*/
// TODO: This file was created by bulk-decaffeinate.
// Fix any style issues and re-enable lint.
/*
* decaffeinate suggestions:
* DS101: Remove unnecessary use of Array.from
* DS102: Remove unnecessary code created because of implicit returns
* DS103: Rewrite code to no longer use __guard__
* DS201: Simplify complex destructure assignments
* DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/
let RedisManager
const Settings = require('@overleaf/settings')
const rclient = require('@overleaf/redis-wrapper').createClient(
@@ -34,11 +20,7 @@ const MAX_REDIS_REQUEST_LENGTH = 5000 // 5 seconds
// Make times easy to read
const minutes = 60 // seconds for Redis expire
const logHashErrors =
Settings.documentupdater != null
? Settings.documentupdater.logHashErrors
: undefined
const logHashReadErrors = logHashErrors != null ? logHashErrors.read : undefined
const logHashReadErrors = Settings.documentupdater?.logHashErrors?.read
const MEGABYTES = 1024 * 1024
const MAX_RANGES_SIZE = 3 * MEGABYTES
@@ -50,8 +32,8 @@ module.exports = RedisManager = {
rclient,
putDocInMemory(
project_id,
doc_id,
projectId,
docId,
docLines,
version,
ranges,
@@ -60,9 +42,9 @@ module.exports = RedisManager = {
_callback
) {
const timer = new metrics.Timer('redis.put-doc')
const callback = function (error) {
const callback = error => {
timer.done()
return _callback(error)
_callback(error)
}
const docLinesArray = docLines
docLines = JSON.stringify(docLines)
@@ -70,7 +52,7 @@ module.exports = RedisManager = {
const error = new Error('null bytes found in doc lines')
// this check was added to catch memory corruption in JSON.stringify.
// It sometimes returned null bytes at the end of the string.
logger.error({ err: error, doc_id, docLines }, error.message)
logger.error({ err: error, docId, docLines }, error.message)
return callback(error)
}
// Do an optimised size check on the docLines using the serialised
@@ -79,151 +61,146 @@ module.exports = RedisManager = {
if (docIsTooLarge(sizeBound, docLinesArray, Settings.max_doc_length)) {
const docSize = docLines.length
const err = new Error('blocking doc insert into redis: doc is too large')
logger.error({ project_id, doc_id, err, docSize }, err.message)
logger.error({ projectId, docId, err, docSize }, err.message)
return callback(err)
}
const docHash = RedisManager._computeHash(docLines)
// record bytes sent to redis
metrics.summary('redis.docLines', docLines.length, { status: 'set' })
logger.debug(
{ project_id, doc_id, version, docHash, pathname, projectHistoryId },
{ projectId, docId, version, docHash, pathname, projectHistoryId },
'putting doc in redis'
)
return RedisManager._serializeRanges(ranges, function (error, ranges) {
if (error != null) {
logger.error({ err: error, doc_id, project_id }, error.message)
RedisManager._serializeRanges(ranges, (error, ranges) => {
if (error) {
logger.error({ err: error, docId, projectId }, error.message)
return callback(error)
}
// update docsInProject set before writing doc contents
rclient.sadd(keys.docsInProject({ project_id }), doc_id, error => {
if (error) return callback(error)
rclient.sadd(
keys.docsInProject({ project_id: projectId }),
docId,
error => {
if (error) return callback(error)
if (!pathname) {
metrics.inc('pathname', 1, {
path: 'RedisManager.setDoc',
status: pathname === '' ? 'zero-length' : 'undefined',
})
if (!pathname) {
metrics.inc('pathname', 1, {
path: 'RedisManager.setDoc',
status: pathname === '' ? 'zero-length' : 'undefined',
})
}
rclient.mset(
{
[keys.docLines({ doc_id: docId })]: docLines,
[keys.projectKey({ doc_id: docId })]: projectId,
[keys.docVersion({ doc_id: docId })]: version,
[keys.docHash({ doc_id: docId })]: docHash,
[keys.ranges({ doc_id: docId })]: ranges,
[keys.pathname({ doc_id: docId })]: pathname,
[keys.projectHistoryId({ doc_id: docId })]: projectHistoryId,
},
callback
)
}
rclient.mset(
{
[keys.docLines({ doc_id })]: docLines,
[keys.projectKey({ doc_id })]: project_id,
[keys.docVersion({ doc_id })]: version,
[keys.docHash({ doc_id })]: docHash,
[keys.ranges({ doc_id })]: ranges,
[keys.pathname({ doc_id })]: pathname,
[keys.projectHistoryId({ doc_id })]: projectHistoryId,
},
callback
)
})
)
})
},
removeDocFromMemory(project_id, doc_id, _callback) {
logger.debug({ project_id, doc_id }, 'removing doc from redis')
const callback = function (err) {
if (err != null) {
logger.err({ project_id, doc_id, err }, 'error removing doc from redis')
return _callback(err)
removeDocFromMemory(projectId, docId, _callback) {
logger.debug({ projectId, docId }, 'removing doc from redis')
const callback = err => {
if (err) {
logger.err({ projectId, docId, err }, 'error removing doc from redis')
_callback(err)
} else {
logger.debug({ project_id, doc_id }, 'removed doc from redis')
return _callback()
logger.debug({ projectId, docId }, 'removed doc from redis')
_callback()
}
}
let multi = rclient.multi()
multi.strlen(keys.docLines({ doc_id }))
multi.strlen(keys.docLines({ doc_id: docId }))
multi.del(
keys.docLines({ doc_id }),
keys.projectKey({ doc_id }),
keys.docVersion({ doc_id }),
keys.docHash({ doc_id }),
keys.ranges({ doc_id }),
keys.pathname({ doc_id }),
keys.projectHistoryId({ doc_id }),
keys.projectHistoryType({ doc_id }),
keys.unflushedTime({ doc_id }),
keys.lastUpdatedAt({ doc_id }),
keys.lastUpdatedBy({ doc_id })
keys.docLines({ doc_id: docId }),
keys.projectKey({ doc_id: docId }),
keys.docVersion({ doc_id: docId }),
keys.docHash({ doc_id: docId }),
keys.ranges({ doc_id: docId }),
keys.pathname({ doc_id: docId }),
keys.projectHistoryId({ doc_id: docId }),
keys.projectHistoryType({ doc_id: docId }),
keys.unflushedTime({ doc_id: docId }),
keys.lastUpdatedAt({ doc_id: docId }),
keys.lastUpdatedBy({ doc_id: docId })
)
return multi.exec(function (error, response) {
if (error != null) {
multi.exec((error, response) => {
if (error) {
return callback(error)
}
const length = response != null ? response[0] : undefined
const length = response?.[0]
if (length > 0) {
// record bytes freed in redis
metrics.summary('redis.docLines', length, { status: 'del' })
}
multi = rclient.multi()
multi.srem(keys.docsInProject({ project_id }), doc_id)
multi.del(keys.projectState({ project_id }))
return multi.exec(callback)
multi.srem(keys.docsInProject({ project_id: projectId }), docId)
multi.del(keys.projectState({ project_id: projectId }))
multi.exec(callback)
})
},
checkOrSetProjectState(project_id, newState, callback) {
if (callback == null) {
callback = function () {}
}
checkOrSetProjectState(projectId, newState, callback) {
const multi = rclient.multi()
multi.getset(keys.projectState({ project_id }), newState)
multi.expire(keys.projectState({ project_id }), 30 * minutes)
return multi.exec(function (error, response) {
if (error != null) {
multi.getset(keys.projectState({ project_id: projectId }), newState)
multi.expire(keys.projectState({ project_id: projectId }), 30 * minutes)
multi.exec((error, response) => {
if (error) {
return callback(error)
}
logger.debug(
{ project_id, newState, oldState: response[0] },
{ projectId, newState, oldState: response[0] },
'checking project state'
)
return callback(null, response[0] !== newState)
callback(null, response[0] !== newState)
})
},
clearProjectState(project_id, callback) {
if (callback == null) {
callback = function () {}
}
return rclient.del(keys.projectState({ project_id }), callback)
clearProjectState(projectId, callback) {
rclient.del(keys.projectState({ project_id: projectId }), callback)
},
getDoc(project_id, doc_id, callback) {
if (callback == null) {
callback = function () {}
}
getDoc(projectId, docId, callback) {
const timer = new metrics.Timer('redis.get-doc')
const collectKeys = [
keys.docLines({ doc_id }),
keys.docVersion({ doc_id }),
keys.docHash({ doc_id }),
keys.projectKey({ doc_id }),
keys.ranges({ doc_id }),
keys.pathname({ doc_id }),
keys.projectHistoryId({ doc_id }),
keys.unflushedTime({ doc_id }),
keys.lastUpdatedAt({ doc_id }),
keys.lastUpdatedBy({ doc_id }),
keys.docLines({ doc_id: docId }),
keys.docVersion({ doc_id: docId }),
keys.docHash({ doc_id: docId }),
keys.projectKey({ doc_id: docId }),
keys.ranges({ doc_id: docId }),
keys.pathname({ doc_id: docId }),
keys.projectHistoryId({ doc_id: docId }),
keys.unflushedTime({ doc_id: docId }),
keys.lastUpdatedAt({ doc_id: docId }),
keys.lastUpdatedBy({ doc_id: docId }),
]
rclient.mget(...collectKeys, (error, ...rest) => {
rclient.mget(...collectKeys, (error, result) => {
if (error) {
return callback(error)
}
let [
docLines,
version,
storedHash,
doc_project_id,
docProjectId,
ranges,
pathname,
projectHistoryId,
unflushedTime,
lastUpdatedAt,
lastUpdatedBy,
] = Array.from(rest[0])
] = result
const timeSpan = timer.done()
if (error != null) {
return callback(error)
}
// check if request took too long and bail out. only do this for
// get, because it is the first call in each update, so if this
// passes we'll assume others have a reasonable chance to succeed.
@@ -241,9 +218,9 @@ module.exports = RedisManager = {
if (logHashReadErrors && computedHash !== storedHash) {
logger.error(
{
project_id,
doc_id,
doc_project_id,
projectId,
docId,
docProjectId,
computedHash,
storedHash,
docLines,
@@ -262,11 +239,8 @@ module.exports = RedisManager = {
version = parseInt(version || 0, 10)
// check doc is in requested project
if (doc_project_id != null && doc_project_id !== project_id) {
logger.error(
{ project_id, doc_id, doc_project_id },
'doc not in project'
)
if (docProjectId != null && docProjectId !== projectId) {
logger.error({ projectId, docId, docProjectId }, 'doc not in project')
return callback(new Errors.NotFoundError('document not found'))
}
@@ -295,127 +269,109 @@ module.exports = RedisManager = {
})
},
getDocVersion(doc_id, callback) {
if (callback == null) {
callback = function () {}
}
return rclient.mget(
keys.docVersion({ doc_id }),
keys.projectHistoryType({ doc_id }),
function (error, result) {
if (error != null) {
getDocVersion(docId, callback) {
rclient.mget(
keys.docVersion({ doc_id: docId }),
keys.projectHistoryType({ doc_id: docId }),
(error, result) => {
if (error) {
return callback(error)
}
let [version, projectHistoryType] = Array.from(result || [])
let [version, projectHistoryType] = result || []
version = parseInt(version, 10)
return callback(null, version, projectHistoryType)
callback(null, version, projectHistoryType)
}
)
},
getDocLines(doc_id, callback) {
if (callback == null) {
callback = function () {}
}
return rclient.get(keys.docLines({ doc_id }), function (error, docLines) {
if (error != null) {
getDocLines(docId, callback) {
rclient.get(keys.docLines({ doc_id: docId }), (error, docLines) => {
if (error) {
return callback(error)
}
return callback(null, docLines)
callback(null, docLines)
})
},
getPreviousDocOps(doc_id, start, end, callback) {
if (callback == null) {
callback = function () {}
}
getPreviousDocOps(docId, start, end, callback) {
const timer = new metrics.Timer('redis.get-prev-docops')
return rclient.llen(keys.docOps({ doc_id }), function (error, length) {
if (error != null) {
rclient.llen(keys.docOps({ doc_id: docId }), (error, length) => {
if (error) {
return callback(error)
}
return rclient.get(
keys.docVersion({ doc_id }),
function (error, version) {
if (error != null) {
return callback(error)
}
version = parseInt(version, 10)
const first_version_in_redis = version - length
if (start < first_version_in_redis || end > version) {
error = new Errors.OpRangeNotAvailableError(
'doc ops range is not loaded in redis'
)
logger.debug(
{ err: error, doc_id, length, version, start, end },
'doc ops range is not loaded in redis'
)
return callback(error)
}
start = start - first_version_in_redis
if (end > -1) {
end = end - first_version_in_redis
}
if (isNaN(start) || isNaN(end)) {
error = new Error('inconsistent version or lengths')
logger.error(
{ err: error, doc_id, length, version, start, end },
'inconsistent version or length'
)
return callback(error)
}
return rclient.lrange(
keys.docOps({ doc_id }),
start,
end,
function (error, jsonOps) {
let ops
if (error != null) {
return callback(error)
}
try {
ops = jsonOps.map(jsonOp => JSON.parse(jsonOp))
} catch (e) {
return callback(e)
}
const timeSpan = timer.done()
if (timeSpan > MAX_REDIS_REQUEST_LENGTH) {
error = new Error('redis getPreviousDocOps exceeded timeout')
return callback(error)
}
return callback(null, ops)
}
)
}
)
})
},
getHistoryType(doc_id, callback) {
if (callback == null) {
callback = function () {}
}
return rclient.get(
keys.projectHistoryType({ doc_id }),
function (error, projectHistoryType) {
if (error != null) {
rclient.get(keys.docVersion({ doc_id: docId }), (error, version) => {
if (error) {
return callback(error)
}
return callback(null, projectHistoryType)
version = parseInt(version, 10)
const firstVersionInRedis = version - length
if (start < firstVersionInRedis || end > version) {
error = new Errors.OpRangeNotAvailableError(
'doc ops range is not loaded in redis'
)
logger.debug(
{ err: error, docId, length, version, start, end },
'doc ops range is not loaded in redis'
)
return callback(error)
}
start = start - firstVersionInRedis
if (end > -1) {
end = end - firstVersionInRedis
}
if (isNaN(start) || isNaN(end)) {
error = new Error('inconsistent version or lengths')
logger.error(
{ err: error, docId, length, version, start, end },
'inconsistent version or length'
)
return callback(error)
}
rclient.lrange(
keys.docOps({ doc_id: docId }),
start,
end,
(error, jsonOps) => {
let ops
if (error) {
return callback(error)
}
try {
ops = jsonOps.map(jsonOp => JSON.parse(jsonOp))
} catch (e) {
return callback(e)
}
const timeSpan = timer.done()
if (timeSpan > MAX_REDIS_REQUEST_LENGTH) {
error = new Error('redis getPreviousDocOps exceeded timeout')
return callback(error)
}
callback(null, ops)
}
)
})
})
},
getHistoryType(docId, callback) {
rclient.get(
keys.projectHistoryType({ doc_id: docId }),
(error, projectHistoryType) => {
if (error) {
return callback(error)
}
callback(null, projectHistoryType)
}
)
},
setHistoryType(doc_id, projectHistoryType, callback) {
if (callback == null) {
callback = function () {}
}
return rclient.set(
keys.projectHistoryType({ doc_id }),
setHistoryType(docId, projectHistoryType, callback) {
rclient.set(
keys.projectHistoryType({ doc_id: docId }),
projectHistoryType,
callback
)
@@ -424,8 +380,8 @@ module.exports = RedisManager = {
DOC_OPS_TTL: 60 * minutes,
DOC_OPS_MAX_LENGTH: 100,
updateDocument(
project_id,
doc_id,
projectId,
docId,
docLines,
newVersion,
appliedOps,
@@ -436,21 +392,18 @@ module.exports = RedisManager = {
if (appliedOps == null) {
appliedOps = []
}
if (callback == null) {
callback = function () {}
}
return RedisManager.getDocVersion(
doc_id,
function (error, currentVersion, projectHistoryType) {
if (error != null) {
RedisManager.getDocVersion(
docId,
(error, currentVersion, projectHistoryType) => {
if (error) {
return callback(error)
}
if (currentVersion + appliedOps.length !== newVersion) {
error = new Error(`Version mismatch. '${doc_id}' is corrupted.`)
error = new Error(`Version mismatch. '${docId}' is corrupted.`)
logger.error(
{
err: error,
doc_id,
docId,
currentVersion,
newVersion,
opsLength: appliedOps.length,
@@ -461,11 +414,11 @@ module.exports = RedisManager = {
}
const jsonOps = appliedOps.map(op => JSON.stringify(op))
for (const op of Array.from(jsonOps)) {
for (const op of jsonOps) {
if (op.indexOf('\u0000') !== -1) {
error = new Error('null bytes found in jsonOps')
// this check was added to catch memory corruption in JSON.stringify
logger.error({ err: error, doc_id, jsonOps }, error.message)
logger.error({ err: error, docId, jsonOps }, error.message)
return callback(error)
}
}
@@ -474,7 +427,7 @@ module.exports = RedisManager = {
if (newDocLines.indexOf('\u0000') !== -1) {
error = new Error('null bytes found in doc lines')
// this check was added to catch memory corruption in JSON.stringify
logger.error({ err: error, doc_id, newDocLines }, error.message)
logger.error({ err: error, docId, newDocLines }, error.message)
return callback(error)
}
// Do an optimised size check on the docLines using the serialised
@@ -483,15 +436,15 @@ module.exports = RedisManager = {
if (docIsTooLarge(sizeBound, docLines, Settings.max_doc_length)) {
const err = new Error('blocking doc update: doc is too large')
const docSize = newDocLines.length
logger.error({ project_id, doc_id, err, docSize }, err.message)
logger.error({ projectId, docId, err, docSize }, err.message)
return callback(err)
}
const newHash = RedisManager._computeHash(newDocLines)
const opVersions = appliedOps.map(op => (op != null ? op.v : undefined))
const opVersions = appliedOps.map(op => op?.v)
logger.debug(
{
doc_id,
docId,
version: newVersion,
hash: newHash,
op_versions: opVersions,
@@ -502,61 +455,65 @@ module.exports = RedisManager = {
metrics.summary('redis.docLines', newDocLines.length, {
status: 'update',
})
return RedisManager._serializeRanges(ranges, function (error, ranges) {
if (error != null) {
logger.error({ err: error, doc_id }, error.message)
RedisManager._serializeRanges(ranges, (error, ranges) => {
if (error) {
logger.error({ err: error, docId }, error.message)
return callback(error)
}
if (ranges != null && ranges.indexOf('\u0000') !== -1) {
if (ranges && ranges.indexOf('\u0000') !== -1) {
error = new Error('null bytes found in ranges')
// this check was added to catch memory corruption in JSON.stringify
logger.error({ err: error, doc_id, ranges }, error.message)
logger.error({ err: error, docId, ranges }, error.message)
return callback(error)
}
const multi = rclient.multi()
multi.mset({
[keys.docLines({ doc_id })]: newDocLines,
[keys.docVersion({ doc_id })]: newVersion,
[keys.docHash({ doc_id })]: newHash,
[keys.ranges({ doc_id })]: ranges,
[keys.lastUpdatedAt({ doc_id })]: Date.now(),
[keys.lastUpdatedBy({ doc_id })]: updateMeta && updateMeta.user_id,
[keys.docLines({ doc_id: docId })]: newDocLines,
[keys.docVersion({ doc_id: docId })]: newVersion,
[keys.docHash({ doc_id: docId })]: newHash,
[keys.ranges({ doc_id: docId })]: ranges,
[keys.lastUpdatedAt({ doc_id: docId })]: Date.now(),
[keys.lastUpdatedBy({ doc_id: docId })]:
updateMeta && updateMeta.user_id,
})
multi.ltrim(
keys.docOps({ doc_id }),
keys.docOps({ doc_id: docId }),
-RedisManager.DOC_OPS_MAX_LENGTH,
-1
) // index 3
// push the ops last so we can get the lengths at fixed index position 7
if (jsonOps.length > 0) {
multi.rpush(keys.docOps({ doc_id }), ...Array.from(jsonOps)) // index 5
multi.rpush(keys.docOps({ doc_id: docId }), ...jsonOps) // index 5
// expire must come after rpush since before it will be a no-op if the list is empty
multi.expire(keys.docOps({ doc_id }), RedisManager.DOC_OPS_TTL) // index 6
multi.expire(
keys.docOps({ doc_id: docId }),
RedisManager.DOC_OPS_TTL
) // index 6
if (projectHistoryType === 'project-history') {
metrics.inc('history-queue', 1, { status: 'skip-track-changes' })
logger.debug(
{ doc_id },
{ docId },
'skipping push of uncompressed ops for project using project-history'
)
} else {
// project is using old track-changes history service
metrics.inc('history-queue', 1, { status: 'track-changes' })
multi.rpush(
historyKeys.uncompressedHistoryOps({ doc_id }),
...Array.from(jsonOps)
historyKeys.uncompressedHistoryOps({ doc_id: docId }),
...jsonOps
) // index 7
}
// Set the unflushed timestamp to the current time if the doc
// hasn't been modified before (the content in mongo has been
// valid up to this point). Otherwise leave it alone ("NX" flag).
multi.set(keys.unflushedTime({ doc_id }), Date.now(), 'NX')
multi.set(keys.unflushedTime({ doc_id: docId }), Date.now(), 'NX')
}
return multi.exec(function (error, result) {
let docUpdateCount
if (error != null) {
multi.exec((error, result) => {
if (error) {
return callback(error)
}
let docUpdateCount
if (projectHistoryType === 'project-history') {
docUpdateCount = undefined // only using project history, don't bother with track-changes
} else {
@@ -564,19 +521,11 @@ module.exports = RedisManager = {
docUpdateCount = result[4]
}
if (
jsonOps.length > 0 &&
__guard__(
Settings.apis != null
? Settings.apis.project_history
: undefined,
x => x.enabled
)
) {
if (jsonOps.length > 0 && Settings.apis?.project_history?.enabled) {
metrics.inc('history-queue', 1, { status: 'project-history' })
return ProjectHistoryRedisManager.queueOps(
project_id,
...Array.from(jsonOps),
ProjectHistoryRedisManager.queueOps(
projectId,
...jsonOps,
(error, projectUpdateCount) => {
if (error) {
// The full project history can re-sync a project in case
@@ -588,7 +537,7 @@ module.exports = RedisManager = {
}
)
} else {
return callback(null, docUpdateCount)
callback(null, docUpdateCount)
}
})
})
@@ -596,86 +545,74 @@ module.exports = RedisManager = {
)
},
renameDoc(project_id, doc_id, user_id, update, projectHistoryId, callback) {
if (callback == null) {
callback = function () {}
}
return RedisManager.getDoc(
project_id,
doc_id,
function (error, lines, version) {
if (error != null) {
return callback(error)
}
if (lines != null && version != null) {
if (!update.newPathname) {
logger.warn(
{ project_id, doc_id, update },
'missing pathname in RedisManager.renameDoc'
)
metrics.inc('pathname', 1, {
path: 'RedisManager.renameDoc',
status: update.newPathname === '' ? 'zero-length' : 'undefined',
})
}
return rclient.set(
keys.pathname({ doc_id }),
update.newPathname,
callback
)
} else {
return callback()
}
renameDoc(projectId, docId, userId, update, projectHistoryId, callback) {
RedisManager.getDoc(projectId, docId, (error, lines, version) => {
if (error) {
return callback(error)
}
)
if (lines != null && version != null) {
if (!update.newPathname) {
logger.warn(
{ projectId, docId, update },
'missing pathname in RedisManager.renameDoc'
)
metrics.inc('pathname', 1, {
path: 'RedisManager.renameDoc',
status: update.newPathname === '' ? 'zero-length' : 'undefined',
})
}
rclient.set(
keys.pathname({ doc_id: docId }),
update.newPathname,
callback
)
} else {
callback()
}
})
},
clearUnflushedTime(doc_id, callback) {
if (callback == null) {
callback = function () {}
}
return rclient.del(keys.unflushedTime({ doc_id }), callback)
clearUnflushedTime(docId, callback) {
rclient.del(keys.unflushedTime({ doc_id: docId }), callback)
},
getDocIdsInProject(project_id, callback) {
if (callback == null) {
callback = function () {}
}
return rclient.smembers(keys.docsInProject({ project_id }), callback)
getDocIdsInProject(projectId, callback) {
rclient.smembers(keys.docsInProject({ project_id: projectId }), callback)
},
getDocTimestamps(doc_ids, callback) {
// get lastupdatedat timestamps for an array of doc_ids
if (callback == null) {
callback = function () {}
}
return async.mapSeries(
doc_ids,
(doc_id, cb) => rclient.get(keys.lastUpdatedAt({ doc_id }), cb),
/**
* Get lastupdatedat timestamps for an array of docIds
*/
getDocTimestamps(docIds, callback) {
async.mapSeries(
docIds,
(docId, cb) => rclient.get(keys.lastUpdatedAt({ doc_id: docId }), cb),
callback
)
},
queueFlushAndDeleteProject(project_id, callback) {
// store the project id in a sorted set ordered by time with a random offset to smooth out spikes
/**
* Store the project id in a sorted set ordered by time with a random offset
* to smooth out spikes
*/
queueFlushAndDeleteProject(projectId, callback) {
const SMOOTHING_OFFSET =
Settings.smoothingOffset > 0
? Math.round(Settings.smoothingOffset * Math.random())
: 0
return rclient.zadd(
rclient.zadd(
keys.flushAndDeleteQueue(),
Date.now() + SMOOTHING_OFFSET,
project_id,
projectId,
callback
)
},
/**
* Find the oldest queued flush that is before the cutoff time
*/
getNextProjectToFlushAndDelete(cutoffTime, callback) {
// find the oldest queued flush that is before the cutoff time
if (callback == null) {
callback = function () {}
}
return rclient.zrangebyscore(
rclient.zrangebyscore(
keys.flushAndDeleteQueue(),
0,
cutoffTime,
@@ -683,47 +620,45 @@ module.exports = RedisManager = {
'LIMIT',
0,
1,
function (err, reply) {
if (err != null) {
(err, reply) => {
if (err) {
return callback(err)
}
if (!(reply != null ? reply.length : undefined)) {
// return if no projects ready to be processed
if (!reply || reply.length === 0) {
return callback()
} // return if no projects ready to be processed
}
// pop the oldest entry (get and remove in a multi)
const multi = rclient.multi()
// Poor man's version of ZPOPMIN, which is only available in Redis 5.
multi.zrange(keys.flushAndDeleteQueue(), 0, 0, 'WITHSCORES')
multi.zremrangebyrank(keys.flushAndDeleteQueue(), 0, 0)
multi.zcard(keys.flushAndDeleteQueue()) // the total length of the queue (for metrics)
return multi.exec(function (err, reply) {
if (err != null) {
multi.exec((err, reply) => {
if (err) {
return callback(err)
}
if (!(reply != null ? reply.length : undefined)) {
if (!reply || reply.length === 0) {
return callback()
}
const [key, timestamp] = Array.from(reply[0])
const [key, timestamp] = reply[0]
const queueLength = reply[2]
return callback(null, key, timestamp, queueLength)
callback(null, key, timestamp, queueLength)
})
}
)
},
_serializeRanges(ranges, callback) {
if (callback == null) {
callback = function () {}
}
let jsonRanges = JSON.stringify(ranges)
if (jsonRanges != null && jsonRanges.length > MAX_RANGES_SIZE) {
if (jsonRanges && jsonRanges.length > MAX_RANGES_SIZE) {
return callback(new Error('ranges are too large'))
}
if (jsonRanges === '{}') {
// Most doc will have empty ranges so don't fill redis with lots of '{}' keys
jsonRanges = null
}
return callback(null, jsonRanges)
callback(null, jsonRanges)
},
_deserializeRanges(ranges) {
@@ -742,9 +677,3 @@ module.exports = RedisManager = {
return crypto.createHash('sha1').update(docLines, 'utf8').digest('hex')
},
}
function __guard__(value, transform) {
return typeof value !== 'undefined' && value !== null
? transform(value)
: undefined
}