[document-updater] check for flushed docs when fetching historyId (#30838)

* [document-updater] check for flushed docs when fetching historyId

* [document-updater] use doc version key for discovering all docs

GitOrigin-RevId: 122c6786b473c0836a7215ded4fae2819f908cd7
This commit is contained in:
Jakob Ackermann
2026-01-19 12:54:34 +00:00
committed by Copybot
parent 187ffbf108
commit 935801eabc

View File

@@ -34,7 +34,7 @@ const batchSize = parseInt(args.batchSize, 10)
* @return {string|void}
*/
function extractDocId(key) {
const matches = key.match(/ProjectHistoryId:\{(.*?)\}/)
const matches = key.match(/DocVersion:\{(.*?)\}/)
if (matches) {
return matches[1]
}
@@ -91,19 +91,28 @@ async function getHistoryId(docId) {
* @return {Promise<Array<UpdateableDoc>>}
*/
async function findDocsWithMissingHistoryIds(node, docIds) {
const historyIds = await node.mget(
docIds.map(docId => docUpdaterKeys.projectHistoryId({ doc_id: docId }))
const fromRedis = await node.mget(
docIds
.map(docId => [
docUpdaterKeys.docVersion({ doc_id: docId }),
docUpdaterKeys.projectHistoryId({ doc_id: docId }),
])
.flat()
)
const results = []
for (const index in docIds) {
const historyId = historyIds[index]
const docId = docIds[index]
for (const [index, docId] of docIds.entries()) {
const docVersion = fromRedis[index * 2]
const historyId = fromRedis[index * 2 + 1]
if (!docVersion) {
// Already removed from redis.
continue
}
if (!historyId) {
try {
const { projectId, historyId } = await getHistoryId(docId)
results.push({ projectId, historyId, docId })
results.push({ projectId, historyId, docId, docVersion })
} catch (error) {
logger.warn(
{ error },
@@ -157,7 +166,7 @@ async function scanNodes(nodes, batchSize = 1000) {
for (const node of nodes) {
const stream = node.scanStream({
match: docUpdaterKeys.projectHistoryId({ doc_id: '*' }),
match: docUpdaterKeys.docVersion({ doc_id: '*' }),
count: batchSize,
})