Merge pull request #12276 from overleaf/jpa-batched-update-window

[web] add time based window queries to batchedUpdate

GitOrigin-RevId: e56c01b888cd9749f39d42b77de09bc3fe2d0ec1
This commit is contained in:
Jakob Ackermann
2023-03-21 09:09:06 +00:00
committed by Copybot
parent c66784a99d
commit 537df2257c
6 changed files with 163 additions and 170 deletions
+113 -53
View File
@@ -1,12 +1,15 @@
const { ReadPreference, ObjectId } = require('mongodb')
const { db, waitForDb } = require('../../app/src/infrastructure/mongodb')
const ONE_MONTH_IN_MS = 1000 * 60 * 60 * 24 * 31
let ID_EDGE_PAST
const ID_EDGE_FUTURE = objectIdFromMs(Date.now() + 1000)
let BATCH_DESCENDING
let BATCH_SIZE
let VERBOSE_LOGGING
let BATCH_LAST_ID
let BATCH_RANGE_START
let BATCH_RANGE_END
refreshGlobalOptionsForBatchedUpdate()
let BATCH_MAX_TIME_SPAN_IN_MS
function refreshGlobalOptionsForBatchedUpdate(options = {}) {
options = Object.assign({}, options, process.env)
@@ -15,42 +18,54 @@ function refreshGlobalOptionsForBatchedUpdate(options = {}) {
BATCH_SIZE = parseInt(options.BATCH_SIZE, 10) || 1000
VERBOSE_LOGGING = options.VERBOSE_LOGGING === 'true'
if (options.BATCH_LAST_ID) {
BATCH_LAST_ID = ObjectId(options.BATCH_LAST_ID)
BATCH_RANGE_START = ObjectId(options.BATCH_LAST_ID)
} else if (options.BATCH_RANGE_START) {
BATCH_LAST_ID = ObjectId(options.BATCH_RANGE_START)
BATCH_RANGE_START = ObjectId(options.BATCH_RANGE_START)
} else {
if (BATCH_DESCENDING) {
BATCH_RANGE_START = ID_EDGE_FUTURE
} else {
BATCH_RANGE_START = ID_EDGE_PAST
}
}
BATCH_MAX_TIME_SPAN_IN_MS =
parseInt(options.BATCH_MAX_TIME_SPAN_IN_MS, 10) || ONE_MONTH_IN_MS
if (options.BATCH_RANGE_END) {
BATCH_RANGE_END = ObjectId(options.BATCH_RANGE_END)
} else {
if (BATCH_DESCENDING) {
BATCH_RANGE_END = ID_EDGE_PAST
} else {
BATCH_RANGE_END = ID_EDGE_FUTURE
}
}
}
async function getNextBatch(collection, query, maxId, projection, options) {
const queryIdField = {}
maxId = maxId || BATCH_LAST_ID
if (maxId) {
if (BATCH_DESCENDING) {
queryIdField.$lt = maxId
} else {
queryIdField.$gt = maxId
async function getNextBatch({
collection,
query,
start,
end,
projection,
findOptions,
}) {
if (BATCH_DESCENDING) {
query._id = {
$gt: end,
$lt: start,
}
} else {
query._id = {
$gt: start,
$lt: end,
}
}
if (BATCH_RANGE_END) {
if (BATCH_DESCENDING) {
queryIdField.$gt = BATCH_RANGE_END
} else {
queryIdField.$lt = BATCH_RANGE_END
}
}
if (queryIdField.$gt || queryIdField.$lt) {
query._id = queryIdField
}
const entries = await collection
.find(query, options)
return await collection
.find(query, findOptions)
.project(projection)
.sort({ _id: BATCH_DESCENDING ? -1 : 1 })
.limit(BATCH_SIZE)
.toArray()
return entries
}
async function performUpdate(collection, nextBatch, update) {
@@ -60,6 +75,42 @@ async function performUpdate(collection, nextBatch, update) {
)
}
function objectIdFromMs(ms) {
return ObjectId.createFromTime(ms / 1000)
}
function getMsFromObjectId(id) {
return id.getTimestamp().getTime()
}
function getNextEnd(start) {
let end
if (BATCH_DESCENDING) {
end = objectIdFromMs(getMsFromObjectId(start) - BATCH_MAX_TIME_SPAN_IN_MS)
if (getMsFromObjectId(end) <= getMsFromObjectId(BATCH_RANGE_END)) {
end = BATCH_RANGE_END
}
} else {
end = objectIdFromMs(getMsFromObjectId(start) + BATCH_MAX_TIME_SPAN_IN_MS)
if (getMsFromObjectId(end) >= getMsFromObjectId(BATCH_RANGE_END)) {
end = BATCH_RANGE_END
}
}
return end
}
async function getIdEdgePast(collection) {
const [first] = await collection
.find({})
.project({ _id: 1 })
.limit(1)
.toArray()
if (!first) return null
// Go 1s further into the past in order to include the first entry via
// first._id > ID_EDGE_PAST
return objectIdFromMs(Math.max(0, getMsFromObjectId(first._id) - 1000))
}
async function batchedUpdate(
collectionName,
query,
@@ -68,9 +119,14 @@ async function batchedUpdate(
findOptions,
batchedUpdateOptions
) {
refreshGlobalOptionsForBatchedUpdate(batchedUpdateOptions)
await waitForDb()
const collection = db[collectionName]
ID_EDGE_PAST = await getIdEdgePast(collection)
if (!ID_EDGE_PAST) {
console.warn(`The collection ${collectionName} appears to be empty.`)
return 0
}
refreshGlobalOptionsForBatchedUpdate(batchedUpdateOptions)
findOptions = findOptions || {}
findOptions.readPreference = ReadPreference.SECONDARY
@@ -78,35 +134,40 @@ async function batchedUpdate(
projection = projection || { _id: 1 }
let nextBatch
let updated = 0
let maxId
while (
(nextBatch = await getNextBatch(
let start = BATCH_RANGE_START
while (start !== BATCH_RANGE_END) {
let end = getNextEnd(start)
nextBatch = await getNextBatch({
collection,
query,
maxId,
start,
end,
projection,
findOptions
)).length
) {
maxId = nextBatch[nextBatch.length - 1]._id
updated += nextBatch.length
if (VERBOSE_LOGGING) {
console.log(
`Running update on batch with ids ${JSON.stringify(
nextBatch.map(entry => entry._id)
)}`
)
} else {
console.error(`Running update on batch ending ${maxId}`)
}
findOptions,
})
if (nextBatch.length > 0) {
end = nextBatch[nextBatch.length - 1]._id
updated += nextBatch.length
if (typeof update === 'function') {
await update(collection, nextBatch)
} else {
await performUpdate(collection, nextBatch, update)
}
if (VERBOSE_LOGGING) {
console.log(
`Running update on batch with ids ${JSON.stringify(
nextBatch.map(entry => entry._id)
)}`
)
} else {
console.error(`Running update on batch ending ${end}`)
}
console.error(`Completed batch ending ${maxId}`)
if (typeof update === 'function') {
await update(collection, nextBatch)
} else {
await performUpdate(collection, nextBatch, update)
}
}
console.error(`Completed batch ending ${end}`)
start = end
}
return updated
}
@@ -119,8 +180,8 @@ function batchedUpdateWithResultHandling(
options
) {
batchedUpdate(collection, query, update, projection, options)
.then(updated => {
console.error({ updated })
.then(processed => {
console.error({ processed })
process.exit(0)
})
.catch(error => {
@@ -130,7 +191,6 @@ function batchedUpdateWithResultHandling(
}
module.exports = {
getNextBatch,
batchedUpdate,
batchedUpdateWithResultHandling,
}
+9 -33
View File
@@ -1,5 +1,4 @@
const { getNextBatch } = require('./helpers/batchedUpdate')
const { db, waitForDb } = require('../app/src/infrastructure/mongodb')
const { batchedUpdateWithResultHandling } = require('./helpers/batchedUpdate')
const MODEL_NAME = process.argv.pop()
const Model = require(`../app/src/models/${MODEL_NAME}`)[MODEL_NAME]
@@ -14,34 +13,11 @@ function processBatch(batch) {
}
}
async function main() {
await waitForDb()
const collection = db[Model.collection.name]
const query = {}
const projection = {}
let nextBatch
let processed = 0
let maxId
while (
(nextBatch = await getNextBatch(collection, query, maxId, projection))
.length
) {
processBatch(nextBatch)
maxId = nextBatch[nextBatch.length - 1]._id
processed += nextBatch.length
console.error(maxId, processed)
}
console.error('done')
}
main()
.then(() => {
process.exit(0)
})
.catch(error => {
console.error({ error })
process.exit(1)
})
batchedUpdateWithResultHandling(
Model.collection.name,
{},
async (_, nextBatch) => {
await processBatch(nextBatch)
},
{}
)