From 7aeeb5a5a9be028d78bea0b32706d7ca3c8de922 Mon Sep 17 00:00:00 2001 From: Jakob Ackermann Date: Tue, 18 May 2021 18:06:15 +0100 Subject: [PATCH] [ContentCacheManager] finish tracking of ranges across builds --- services/clsi/app/js/ContentCacheManager.js | 93 ++++++-- services/clsi/app/js/ContentCacheMetrics.js | 5 +- services/clsi/app/js/OutputCacheManager.js | 5 +- .../test/unit/js/ContentCacheManagerTests.js | 203 ++++++++++++++++-- 4 files changed, 272 insertions(+), 34 deletions(-) diff --git a/services/clsi/app/js/ContentCacheManager.js b/services/clsi/app/js/ContentCacheManager.js index f5ccc503b5..60c68b9ba6 100644 --- a/services/clsi/app/js/ContentCacheManager.js +++ b/services/clsi/app/js/ContentCacheManager.js @@ -7,6 +7,7 @@ const fs = require('fs') const crypto = require('crypto') const Path = require('path') const Settings = require('settings-sharelatex') +const pLimit = require('p-limit') const MIN_CHUNK_SIZE = Settings.pdfCachingMinChunkSize @@ -27,9 +28,7 @@ async function update(contentDir, filePath) { const newRanges = [] const seenHashes = new Set() // keep track of hashes expire old ones when they reach a generation > N. - const tracker = new HashFileTracker() - await loadState(contentDir, tracker) - + const tracker = await HashFileTracker.from(contentDir) for await (const chunk of stream) { const pdfStreams = extractor.consume(chunk) for (const pdfStream of pdfStreams) { @@ -48,43 +47,98 @@ async function update(contentDir, filePath) { } } } - const expiredHashes = tracker.update(ranges).findStale(5) - await deleteHashFiles(expiredHashes) - return [ranges, newRanges] + tracker.update(ranges, newRanges) + const reclaimedSpace = await tracker.deleteStaleHashes(5) + await tracker.flush() + return [ranges, newRanges, reclaimedSpace] +} + +function getStatePath(contentDir) { + return Path.join(contentDir, '.state.v0.json') } class HashFileTracker { - constructor(contentDir) { - this.hashAge = new Map() + constructor(contentDir, { hashAge = [], hashSize = [] }) { + this.contentDir = contentDir + this.hashAge = new Map(hashAge) + this.hashSize = new Map(hashSize) } - update(ranges) { + static async from(contentDir) { + const statePath = getStatePath(contentDir) + let state = {} + try { + const blob = await fs.promises.readFile(statePath) + state = JSON.parse(blob) + } catch (e) {} + return new HashFileTracker(contentDir, state) + } + + update(ranges, newRanges) { for (const [hash, age] of this.hashAge) { this.hashAge.set(hash, age + 1) } - for (const range in ranges) { + for (const range of ranges) { this.hashAge.set(range.hash, 0) } + for (const range of newRanges) { + this.hashSize.set(range.hash, range.end - range.start) + } + return this } findStale(maxAge) { - var stale = [] + const stale = [] for (const [hash, age] of this.hashAge) { if (age > maxAge) { stale.push(hash) - this.hashAge.delete(hash) } } return stale } -} -async function loadState(contentDir, tracker) { + async flush() { + const statePath = getStatePath(this.contentDir) + const blob = JSON.stringify({ + hashAge: Array.from(this.hashAge.entries()), + hashSize: Array.from(this.hashSize.entries()) + }) + const atomicWrite = statePath + '~' + try { + await fs.promises.writeFile(atomicWrite, blob) + } catch (err) { + try { + await fs.promises.unlink(atomicWrite) + } catch (e) {} + throw err + } + try { + await fs.promises.rename(atomicWrite, statePath) + } catch (err) { + try { + await fs.promises.unlink(atomicWrite) + } catch (e) {} + throw err + } + } -} + async deleteStaleHashes(n) { + // delete any hash file older than N generations + const hashes = this.findStale(n) -async function deleteHashFiles(n) { - // delete any hash file older than N generations + let reclaimedSpace = 0 + if (hashes.length === 0) { + return reclaimedSpace + } + + await promiseMapWithLimit(10, hashes, async (hash) => { + await fs.promises.unlink(Path.join(this.contentDir, hash)) + this.hashAge.delete(hash) + reclaimedSpace += this.hashSize.get(hash) + this.hashSize.delete(hash) + }) + return reclaimedSpace + } } class PdfStreamsExtractor { @@ -193,6 +247,11 @@ async function writePdfStream(dir, hash, buffers) { return true } +function promiseMapWithLimit(concurrency, array, fn) { + const limit = pLimit(concurrency) + return Promise.all(array.map((x) => limit(() => fn(x)))) +} + module.exports = { HASH_REGEX: /^[0-9a-f]{64}$/, update: callbackify(update) diff --git a/services/clsi/app/js/ContentCacheMetrics.js b/services/clsi/app/js/ContentCacheMetrics.js index 04b475f827..f5f9188ddc 100644 --- a/services/clsi/app/js/ContentCacheMetrics.js +++ b/services/clsi/app/js/ContentCacheMetrics.js @@ -72,7 +72,10 @@ function emitPdfCachingStats(stats, timings) { // How much space do the ranges use? // This will accumulate the ranges size over time, skipping already written ranges. - Metrics.summary('pdf-ranges-disk-size', stats['pdf-caching-new-ranges-size']) + Metrics.summary( + 'pdf-ranges-disk-size', + stats['pdf-caching-new-ranges-size'] - stats['pdf-caching-reclaimed-space'] + ) } module.exports = { diff --git a/services/clsi/app/js/OutputCacheManager.js b/services/clsi/app/js/OutputCacheManager.js index fe23ae3e3c..e6167570f8 100644 --- a/services/clsi/app/js/OutputCacheManager.js +++ b/services/clsi/app/js/OutputCacheManager.js @@ -278,10 +278,10 @@ module.exports = OutputCacheManager = { const timer = new Metrics.Timer('compute-pdf-ranges') ContentCacheManager.update(contentDir, outputFilePath, function ( err, - ranges + result ) { if (err) return callback(err, outputFiles) - const [contentRanges, newContentRanges] = ranges + const [contentRanges, newContentRanges, reclaimedSpace] = result if (Settings.enablePdfCachingDark) { // In dark mode we are doing the computation only and do not emit @@ -302,6 +302,7 @@ module.exports = OutputCacheManager = { (sum, next) => sum + (next.end - next.start), 0 ) + stats['pdf-caching-reclaimed-space'] = reclaimedSpace callback(null, outputFiles) }) } else { diff --git a/services/clsi/test/unit/js/ContentCacheManagerTests.js b/services/clsi/test/unit/js/ContentCacheManagerTests.js index a7b25eacbc..35f22fffae 100644 --- a/services/clsi/test/unit/js/ContentCacheManagerTests.js +++ b/services/clsi/test/unit/js/ContentCacheManagerTests.js @@ -48,16 +48,19 @@ describe('ContentCacheManager', function () { } }) } - let contentRanges, newContentRanges + let contentRanges, newContentRanges, reclaimed function run(filePath, done) { ContentCacheManager.update(contentDir, filePath, (err, ranges) => { if (err) return done(err) - ;[contentRanges, newContentRanges] = ranges + let newlyReclaimed + ;[contentRanges, newContentRanges, newlyReclaimed] = ranges + reclaimed += newlyReclaimed done() }) } beforeEach(function () { + reclaimed = 0 contentDir = '/app/output/602cee6f6460fca0ba7921e6/content/1797a7f48f9-5abc1998509dea1f' pdfPath = @@ -70,6 +73,18 @@ describe('ContentCacheManager', function () { fs = { createReadStream: sinon.stub().returns(Readable.from([])), promises: { + async writeFile(name, blob) { + const file = new FakeFile() + await file.write(Buffer.from(blob)) + await file.close() + files[name] = file + }, + async readFile(name) { + if (!files[name]) { + throw new Error() + } + return files[name].toJSON().contents + }, async open(name) { files[name] = new FakeFile() return files[name] @@ -86,7 +101,12 @@ describe('ContentCacheManager', function () { files[newName] = files[oldName] delete files[oldName] }, - unlink: sinon.stub().resolves() + async unlink(name) { + if (!files[name]) { + throw new Error() + } + delete files[name] + } } } }) @@ -99,9 +119,12 @@ describe('ContentCacheManager', function () { describe('when the ranges are split across chunks', function () { const RANGE_1 = 'stream123endstream' - const RANGE_2 = 'stream(|)endstream' - const RANGE_3 = 'stream!$%endstream' - beforeEach(function (done) { + const RANGE_2 = 'stream(||)endstream' + const RANGE_3 = 'stream!$%/=endstream' + const h1 = hash(RANGE_1) + const h2 = hash(RANGE_2) + const h3 = hash(RANGE_3) + function runWithSplitStream(done) { fs.createReadStream .withArgs(pdfPath) .returns( @@ -109,12 +132,15 @@ describe('ContentCacheManager', function () { Buffer.from('abcstr'), Buffer.from('eam123endstreamABC'), Buffer.from('str'), - Buffer.from('eam(|'), + Buffer.from('eam(||'), Buffer.from(')end'), - Buffer.from('stream-_~stream!$%endstream') + Buffer.from('stream-_~stream!$%/=endstream') ]) ) run(pdfPath, done) + } + beforeEach(function (done) { + runWithSplitStream(done) }) it('should produce three ranges', function () { @@ -130,12 +156,12 @@ describe('ContentCacheManager', function () { }, { start: 24, - end: 42, + end: 43, hash: hash(RANGE_2) }, { - start: 45, - end: 63, + start: 46, + end: 66, hash: hash(RANGE_3) } ]) @@ -143,17 +169,32 @@ describe('ContentCacheManager', function () { it('should store the contents', function () { expect(JSON.parse(JSON.stringify(files))).to.deep.equal({ - [Path.join(contentDir, hash(RANGE_1))]: { + [Path.join(contentDir, h1)]: { contents: RANGE_1, closed: true }, - [Path.join(contentDir, hash(RANGE_2))]: { + [Path.join(contentDir, h2)]: { contents: RANGE_2, closed: true }, - [Path.join(contentDir, hash(RANGE_3))]: { + [Path.join(contentDir, h3)]: { contents: RANGE_3, closed: true + }, + [Path.join(contentDir, '.state.v0.json')]: { + contents: JSON.stringify({ + hashAge: [ + [h1, 0], + [h2, 0], + [h3, 0] + ], + hashSize: [ + [h1, 18], + [h2, 19], + [h3, 20] + ] + }), + closed: true } }) }) @@ -161,6 +202,140 @@ describe('ContentCacheManager', function () { it('should mark all ranges as new', function () { expect(contentRanges).to.deep.equal(newContentRanges) }) + + describe('when re-running with one stream removed', function () { + function runWithOneSplitStreamRemoved(done) { + fs.createReadStream + .withArgs(pdfPath) + .returns( + Readable.from([ + Buffer.from('abcstr'), + Buffer.from('eam123endstreamABC'), + Buffer.from('stream!$%/=endstream') + ]) + ) + run(pdfPath, done) + } + beforeEach(function (done) { + runWithOneSplitStreamRemoved(done) + }) + + it('should produce two ranges', function () { + expect(contentRanges).to.have.length(2) + }) + + it('should find the correct offsets', function () { + expect(contentRanges).to.deep.equal([ + { + start: 3, + end: 21, + hash: hash(RANGE_1) + }, + { + start: 24, + end: 44, + hash: hash(RANGE_3) + } + ]) + }) + + it('should update the age of the 2nd range', function () { + expect(JSON.parse(JSON.stringify(files))).to.deep.equal({ + [Path.join(contentDir, h1)]: { + contents: RANGE_1, + closed: true + }, + [Path.join(contentDir, h2)]: { + contents: RANGE_2, + closed: true + }, + [Path.join(contentDir, h3)]: { + contents: RANGE_3, + closed: true + }, + [Path.join(contentDir, '.state.v0.json')]: { + contents: JSON.stringify({ + hashAge: [ + [h1, 0], + [h2, 1], + [h3, 0] + ], + hashSize: [ + [h1, 18], + [h2, 19], + [h3, 20] + ] + }), + closed: true + } + }) + }) + + it('should find no new ranges', function () { + expect(newContentRanges).to.deep.equal([]) + }) + + describe('when re-running 5 more times', function () { + for (let i = 0; i < 5; i++) { + beforeEach(function (done) { + runWithOneSplitStreamRemoved(done) + }) + } + + it('should still produce two ranges', function () { + expect(contentRanges).to.have.length(2) + }) + + it('should still find the correct offsets', function () { + expect(contentRanges).to.deep.equal([ + { + start: 3, + end: 21, + hash: hash(RANGE_1) + }, + { + start: 24, + end: 44, + hash: hash(RANGE_3) + } + ]) + }) + + it('should delete the 2nd range', function () { + expect(JSON.parse(JSON.stringify(files))).to.deep.equal({ + [Path.join(contentDir, h1)]: { + contents: RANGE_1, + closed: true + }, + [Path.join(contentDir, h3)]: { + contents: RANGE_3, + closed: true + }, + [Path.join(contentDir, '.state.v0.json')]: { + contents: JSON.stringify({ + hashAge: [ + [h1, 0], + [h3, 0] + ], + hashSize: [ + [h1, 18], + [h3, 20] + ] + }), + closed: true + } + }) + }) + + it('should find no new ranges', function () { + expect(newContentRanges).to.deep.equal([]) + }) + + it('should yield the reclaimed space', function () { + expect(reclaimed).to.equal(RANGE_2.length) + }) + }) + }) }) }) })