Merge pull request #12831 from overleaf/jpa-project-history-pipe

[project-history] migrate to fetch/stream.pipeline

GitOrigin-RevId: 9224eab6a0ad9cbb0c80e443a1c0d4abdd9bbd0f
This commit is contained in:
Jakob Ackermann
2023-05-08 11:58:38 +01:00
committed by Copybot
parent 5714deaa08
commit 51af6558cc
8 changed files with 170 additions and 191 deletions

2
package-lock.json generated
View File

@@ -34395,6 +34395,7 @@
"lodash": "^4.17.20",
"mongo-uri": "^0.1.2",
"mongodb": "^4.11.0",
"node-fetch": "^2.6.0",
"overleaf-editor-core": "*",
"redis": "~0.10.1",
"request": "^2.88.2",
@@ -44209,6 +44210,7 @@
"mongodb": "^4.11.0",
"multer": "^1.4.2",
"nock": "^12.0.3",
"node-fetch": "^2.6.0",
"overleaf-editor-core": "*",
"redis": "~0.10.1",
"request": "^2.88.2",

View File

@@ -12,9 +12,8 @@
*/
import fs from 'fs'
import crypto from 'crypto'
import _ from 'lodash'
import logger from '@overleaf/logger'
import OError from '@overleaf/o-error'
import { pipeline } from 'stream'
export function _getBlobHashFromString(string) {
const byteLength = Buffer.byteLength(string)
@@ -26,12 +25,7 @@ export function _getBlobHashFromString(string) {
return hash.read()
}
export function _getBlobHash(fsPath, _callback) {
if (_callback == null) {
_callback = function () {}
}
const callback = _.once(_callback)
export function _getBlobHash(fsPath, callback) {
return fs.stat(fsPath, function (err, stats) {
if (err != null) {
OError.tag(err, 'failed to stat file in _getBlobHash', { fsPath })
@@ -42,22 +36,18 @@ export function _getBlobHash(fsPath, _callback) {
hash.setEncoding('hex')
hash.update('blob ' + byteLength + '\x00')
const stream = fs.createReadStream(fsPath)
stream.on('error', function (err) {
return callback(
OError.tag(err, 'error streaming file from disk', {
fsPath,
byteLength,
})
)
pipeline(fs.createReadStream(fsPath), hash, err => {
if (err) {
callback(
OError.tag(err, 'error streaming file from disk', {
fsPath,
byteLength,
})
)
} else {
hash.end()
callback(null, hash.read(), byteLength)
}
})
stream.on('end', function () {
hash.end()
return callback(null, hash.read(), byteLength)
})
return stream.pipe(hash)
})
}

View File

@@ -11,6 +11,7 @@ import * as Versions from './Versions.js'
import * as Errors from './Errors.js'
import * as LocalFileWriter from './LocalFileWriter.js'
import * as HashManager from './HashManager.js'
import fetch from 'node-fetch'
const HTTP_REQUEST_TIMEOUT = 300 * 1000 // 5 minutes
@@ -168,14 +169,24 @@ export function getProjectBlob(historyId, blobHash, callback) {
}
export function getProjectBlobStream(historyId, blobHash, callback) {
const url = `${Settings.overleaf.history.host}/projects/${historyId}/blobs/${blobHash}`
logger.debug(
{ historyId, blobHash },
'getting blob stream from history service'
)
_requestHistoryServiceStream(
{ path: `projects/${historyId}/blobs/${blobHash}` },
callback
)
fetch(url, getHistoryFetchOptions())
.then(res => {
if (!res.ok) {
const err = new OError(
`history store a non-success status code: ${res.status}`
)
err.statusCode = res.status
logger.warn({ err, url }, 'cannot get project blob')
return callback(err)
}
callback(null, res.body)
})
.catch(err => callback(OError.tag(err)))
}
export function sendChanges(
@@ -224,6 +235,7 @@ export function createBlobForUpdate(projectId, historyId, update, callback) {
LocalFileWriter.bufferOnDisk(
stringStream,
'',
`project-${projectId}-doc-${update.doc}`,
(fsPath, cb) => {
_createBlob(historyId, fsPath, cb)
@@ -245,50 +257,47 @@ export function createBlobForUpdate(projectId, historyId, update, callback) {
return callback(new OError('invalid project for blob creation'))
}
const fileId = urlMatch[2]
const fileStoreStream = request.get({
url: `${Settings.apis.filestore.url}/project/${projectId}/file/${fileId}`,
timeout: HTTP_REQUEST_TIMEOUT,
})
fileStoreStream.pause()
fileStoreStream.on('error', err => {
callback(OError.tag(err, 'error from filestore', { url: update.url }))
})
fileStoreStream.on('response', response => {
if (response.statusCode >= 200 && response.statusCode < 300) {
LocalFileWriter.bufferOnDisk(
fileStoreStream,
`project-${projectId}-file-${fileId}`,
(fsPath, cb) => {
_createBlob(historyId, fsPath, cb)
},
callback
)
fileStoreStream.resume() // start data flowing when ready
} else if (response.statusCode === 404) {
logger.warn(
{ projectId, historyId, fileStoreUrl: update.url },
'File contents not found in filestore. Storing in history as an empty file'
)
const emptyStream = new StringStream()
LocalFileWriter.bufferOnDisk(
emptyStream,
`project-${projectId}-file-${fileId}`,
(fsPath, cb) => {
_createBlob(historyId, fsPath, cb)
},
callback
)
fileStoreStream.resume() // Drain the filestore stream
emptyStream.push(null) // send an EOF signal
} else {
const error = new OError(
`bad response from filestore: ${response.statusCode}`,
{ url: update.url, statusCode: response.statusCode }
)
fileStoreStream.resume() // See https://github.com/overleaf/write_latex/wiki/Streams-and-pipes-in-Node.js#discard-data-if-necessary-in-the-response-handler
callback(error)
}
})
const filestoreURL = `${Settings.apis.filestore.url}/project/${projectId}/file/${fileId}`
fetch(filestoreURL, { signal: AbortSignal.timeout(HTTP_REQUEST_TIMEOUT) })
.then(response => {
const statusCode = response.status
if (statusCode >= 200 && statusCode < 300) {
LocalFileWriter.bufferOnDisk(
response.body,
filestoreURL,
`project-${projectId}-file-${fileId}`,
(fsPath, cb) => {
_createBlob(historyId, fsPath, cb)
},
callback
)
} else if (statusCode === 404) {
logger.warn(
{ projectId, historyId, filestoreURL },
'File contents not found in filestore. Storing in history as an empty file'
)
const emptyStream = new StringStream()
LocalFileWriter.bufferOnDisk(
emptyStream,
filestoreURL,
`project-${projectId}-file-${fileId}`,
(fsPath, cb) => {
_createBlob(historyId, fsPath, cb)
},
callback
)
emptyStream.push(null) // send an EOF signal
} else {
const error = new OError(
`bad response from filestore: ${statusCode}`,
{ filestoreURL, statusCode }
)
callback(error)
}
})
.catch(err =>
callback(OError.tag(err, 'error from filestore', { filestoreURL }))
)
} else {
const error = new OError('invalid update for blob creation')
callback(error)
@@ -304,33 +313,24 @@ function _createBlob(historyId, fsPath, _callback) {
}
const outStream = fs.createReadStream(fsPath)
outStream.on('error', err => {
callback(
OError.tag(err, 'error streaming file from disk', {
fsPath,
hash,
byteLength,
})
)
})
logger.debug(
{ fsPath, hash, byteLength },
{ fsPath, historyId, hash, byteLength },
'sending blob to history service'
)
_requestHistoryService(
{
method: 'PUT',
path: `projects/${historyId}/blobs/${hash}`,
body: outStream,
},
error => {
if (error) {
return callback(OError.tag(error))
const url = `${Settings.overleaf.history.host}/projects/${historyId}/blobs/${hash}`
fetch(url, { method: 'PUT', body: outStream, ...getHistoryFetchOptions() })
.then(res => {
if (!res.ok) {
const err = new OError(
`history store a non-success status code: ${res.status}`
)
err.statusCode = res.status
logger.warn({ err, url }, 'cannot create project blob')
return callback(err)
}
callback(null, hash)
}
)
})
.catch(err => callback(OError.tag(err)))
})
}
@@ -407,6 +407,22 @@ function _requestOptions(options) {
return requestOptions
}
/**
* @return {RequestInit}
*/
function getHistoryFetchOptions() {
return {
signal: AbortSignal.timeout(HTTP_REQUEST_TIMEOUT),
headers: {
Authorization:
'Basic ' +
Buffer.from(
`${Settings.overleaf.history.user}:${Settings.overleaf.history.pass}`
).toString('base64'),
},
}
}
function _requestHistoryService(options, callback) {
const requestOptions = _requestOptions(options)
request(requestOptions, (error, res, body) => {
@@ -427,21 +443,3 @@ function _requestHistoryService(options, callback) {
}
})
}
function _requestHistoryServiceStream(options, callback) {
callback = _.once(callback)
const requestOptions = _requestOptions(options)
const stream = request(requestOptions)
stream.on('error', callback)
stream.on('response', res => {
if (res.statusCode >= 200 && res.statusCode < 300) {
callback(null, stream)
} else {
const error = new OError(
`history store a non-success status code: ${res.statusCode}`
)
logger.warn({ err: error, options }, error.message)
callback(error)
}
})
}

View File

@@ -15,6 +15,7 @@ import * as LabelsManager from './LabelsManager.js'
import * as HistoryApiManager from './HistoryApiManager.js'
import * as RetryManager from './RetryManager.js'
import * as FlushManager from './FlushManager.js'
import { pipeline } from 'stream'
export function getProjectBlob(req, res, next) {
const projectId = req.params.project_id
@@ -26,7 +27,10 @@ export function getProjectBlob(req, res, next) {
if (err != null) {
return next(OError.tag(err))
}
stream.pipe(res)
pipeline(stream, res, err => {
if (err) next(err)
// res.end() is already called via 'end' event by pipeline.
})
}
)
}
@@ -189,7 +193,10 @@ export function getFileSnapshot(req, res, next) {
if (error != null) {
return next(OError.tag(error))
}
stream.pipe(res)
pipeline(stream, res, err => {
if (err) next(err)
// res.end() is already called via 'end' event by pipeline.
})
}
)
}

View File

@@ -10,9 +10,9 @@
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/
import fs from 'fs'
import { pipeline } from 'stream'
import { randomUUID } from 'crypto'
import path from 'path'
import Url from 'url'
import _ from 'lodash'
import logger from '@overleaf/logger'
import metrics from '@overleaf/metrics'
@@ -30,26 +30,21 @@ import * as LargeFileManager from './LargeFileManager.js'
// stream can be passed to this method, the data will then be held on disk
// rather than in memory and will be cleaned up once it has been consumed.
//
export function bufferOnDisk(inStream, fileId, consumeOutStream, callback) {
if (consumeOutStream == null) {
consumeOutStream = function (fsPath, done) {}
}
if (callback == null) {
callback = function () {}
}
export function bufferOnDisk(
inStream,
url,
fileId,
consumeOutStream,
callback
) {
const timer = new metrics.Timer('LocalFileWriter.writeStream')
// capture the stream url for logging
const url = inStream.uri && Url.format(inStream.uri)
const fsPath = path.join(
Settings.path.uploadFolder,
randomUUID() + `-${fileId}`
)
let cleaningUp = false
const cleanup = _.once((streamError, res) => {
cleaningUp = true
return deleteFile(fsPath, function (cleanupError) {
if (streamError) {
OError.tag(streamError, 'error deleting temporary file', {
@@ -70,48 +65,37 @@ export function bufferOnDisk(inStream, fileId, consumeOutStream, callback) {
logger.debug({ fsPath, url }, 'writing file locally')
inStream.on('error', function (err) {
OError.tag(err, 'problem writing file locally, with read stream', {
fsPath,
url,
})
return cleanup(err)
})
const writeStream = fs.createWriteStream(fsPath)
writeStream.on('error', function (err) {
OError.tag(err, 'problem writing file locally, with write stream', {
fsPath,
url,
})
return cleanup(err)
})
writeStream.on('finish', function () {
pipeline(inStream, writeStream, err => {
if (err) {
OError.tag(err, 'problem writing file locally', {
fsPath,
url,
})
return cleanup(err)
}
timer.done()
// in future check inStream.response.headers for hash value here
logger.debug({ fsPath, url }, 'stream closed after writing file locally')
if (!cleaningUp) {
const fileSize = writeStream.bytesWritten
return LargeFileManager.replaceWithStubIfNeeded(
fsPath,
fileId,
fileSize,
function (err, newFsPath) {
if (err != null) {
OError.tag(err, 'problem in large file manager', {
newFsPath,
fsPath,
fileId,
fileSize,
})
return cleanup(err)
}
return consumeOutStream(newFsPath, cleanup)
const fileSize = writeStream.bytesWritten
return LargeFileManager.replaceWithStubIfNeeded(
fsPath,
fileId,
fileSize,
function (err, newFsPath) {
if (err != null) {
OError.tag(err, 'problem in large file manager', {
newFsPath,
fsPath,
fileId,
fileSize,
})
return cleanup(err)
}
)
}
return consumeOutStream(newFsPath, cleanup)
}
)
})
return inStream.pipe(writeStream)
}
export function deleteFile(fsPath, callback) {

View File

@@ -39,6 +39,7 @@
"lodash": "^4.17.20",
"mongo-uri": "^0.1.2",
"mongodb": "^4.11.0",
"node-fetch": "^2.6.0",
"overleaf-editor-core": "*",
"redis": "~0.10.1",
"request": "^2.88.2",

View File

@@ -47,8 +47,10 @@ describe('HistoryStoreManager', function () {
.yields(null, this.historyId)
this.request = sinon.stub()
this.request.get = sinon.stub()
this.fetch = sinon.stub().resolves()
this.HistoryStoreManager = await esmock(MODULE_PATH, {
'node-fetch': this.fetch,
request: this.request,
'@overleaf/settings': this.settings,
'../../../../app/js/LocalFileWriter.js': this.LocalFileWriter,
@@ -361,17 +363,13 @@ describe('HistoryStoreManager', function () {
describe('createBlobForUpdate', function () {
beforeEach(function () {
this.fileStream = {
pause: sinon.stub(),
resume: sinon.stub(),
on: sinon.stub(),
}
this.fileStream = {}
this.hash = 'random-hash'
this.fileStream.on
.withArgs('response')
.callsArgWith(1, { statusCode: 200 })
this.LocalFileWriter.bufferOnDisk.callsArgWith(3, null, this.hash)
this.request.get.returns(this.fileStream)
this.LocalFileWriter.bufferOnDisk.callsArgWith(4, null, this.hash)
this.fetch.resolves({
status: 200,
body: this.fileStream,
})
})
describe('for a file update with any filestore location', function () {
@@ -396,9 +394,9 @@ describe('HistoryStoreManager', function () {
})
it('should request the file from the filestore in settings', function () {
expect(this.request.get).to.have.been.calledWithMatch({
url: `${this.settings.apis.filestore.url}/project/${this.projectId}/file/${this.file_id}`,
})
expect(this.fetch).to.have.been.calledWithMatch(
`${this.settings.apis.filestore.url}/project/${this.projectId}/file/${this.file_id}`
)
})
it('should call the callback with the blob', function () {
@@ -469,7 +467,11 @@ describe('HistoryStoreManager', function () {
this.historyResponse = new EventEmitter()
this.blobHash = 'test hash'
this.request.returns(this.historyResponse)
this.fetch.resolves({
status: 200,
ok: true,
body: this.historyResponse,
})
this.HistoryStoreManager.getProjectBlobStream(
this.historyId,
this.blobHash,
@@ -481,19 +483,12 @@ describe('HistoryStoreManager', function () {
done()
}
)
this.historyResponse.emit('response', { statusCode: 200 })
})
it('should get the blob from the overleaf history service', function () {
expect(this.request).to.have.been.calledWithMatch({
method: 'GET',
url: `${this.settings.overleaf.history.host}/projects/${this.historyId}/blobs/${this.blobHash}`,
auth: {
user: this.settings.overleaf.history.user,
pass: this.settings.overleaf.history.pass,
sendImmediately: true,
},
})
expect(this.fetch).to.have.been.calledWithMatch(
`${this.settings.overleaf.history.host}/projects/${this.historyId}/blobs/${this.blobHash}`
)
})
it('should return a stream of the blob contents', function () {

View File

@@ -52,8 +52,10 @@ describe('HttpController', function () {
this.RetryManager = {}
this.FlushManager = {}
this.request = {}
this.pipeline = sinon.stub()
this.HttpController = await esmock(MODULE_PATH, {
request: this.request,
stream: { pipeline: this.pipeline },
'../../../../app/js/UpdatesProcessor.js': this.UpdatesProcessor,
'../../../../app/js/SummarizedUpdatesManager.js':
this.SummarizedUpdatesManager,
@@ -85,7 +87,7 @@ describe('HttpController', function () {
describe('getProjectBlob', function () {
beforeEach(function () {
this.blobHash = 'abcd'
this.stream = { pipe: sinon.stub() }
this.stream = {}
this.HistoryStoreManager.getProjectBlobStream.yields(null, this.stream)
this.HttpController.getProjectBlob(
{ params: { project_id: this.projectId, hash: this.blobHash } },
@@ -98,7 +100,7 @@ describe('HttpController', function () {
this.HistoryStoreManager.getProjectBlobStream
.calledWith(this.projectId, this.blobHash)
.should.equal(true)
this.stream.pipe.calledWith(this.res).should.equal(true)
this.pipeline.should.have.been.calledWith(this.stream, this.res)
})
})
@@ -308,7 +310,7 @@ describe('HttpController', function () {
},
}
this.res = { mock: 'res' }
this.stream = { pipe: sinon.stub() }
this.stream = {}
this.SnapshotManager.getFileSnapshotStream.yields(null, this.stream)
this.HttpController.getFileSnapshot(this.req, this.res, this.next)
})
@@ -322,7 +324,7 @@ describe('HttpController', function () {
})
it('should pipe the returned stream into the response', function () {
this.stream.pipe.calledWith(this.res).should.equal(true)
this.pipeline.should.have.been.calledWith(this.stream, this.res)
})
})