diff --git a/package-lock.json b/package-lock.json index 0af1d0cb1b..70f8885ff3 100644 --- a/package-lock.json +++ b/package-lock.json @@ -34395,6 +34395,7 @@ "lodash": "^4.17.20", "mongo-uri": "^0.1.2", "mongodb": "^4.11.0", + "node-fetch": "^2.6.0", "overleaf-editor-core": "*", "redis": "~0.10.1", "request": "^2.88.2", @@ -44209,6 +44210,7 @@ "mongodb": "^4.11.0", "multer": "^1.4.2", "nock": "^12.0.3", + "node-fetch": "^2.6.0", "overleaf-editor-core": "*", "redis": "~0.10.1", "request": "^2.88.2", diff --git a/services/project-history/app/js/HashManager.js b/services/project-history/app/js/HashManager.js index 46a476f899..898ad2f43c 100644 --- a/services/project-history/app/js/HashManager.js +++ b/services/project-history/app/js/HashManager.js @@ -12,9 +12,8 @@ */ import fs from 'fs' import crypto from 'crypto' -import _ from 'lodash' -import logger from '@overleaf/logger' import OError from '@overleaf/o-error' +import { pipeline } from 'stream' export function _getBlobHashFromString(string) { const byteLength = Buffer.byteLength(string) @@ -26,12 +25,7 @@ export function _getBlobHashFromString(string) { return hash.read() } -export function _getBlobHash(fsPath, _callback) { - if (_callback == null) { - _callback = function () {} - } - const callback = _.once(_callback) - +export function _getBlobHash(fsPath, callback) { return fs.stat(fsPath, function (err, stats) { if (err != null) { OError.tag(err, 'failed to stat file in _getBlobHash', { fsPath }) @@ -42,22 +36,18 @@ export function _getBlobHash(fsPath, _callback) { hash.setEncoding('hex') hash.update('blob ' + byteLength + '\x00') - const stream = fs.createReadStream(fsPath) - - stream.on('error', function (err) { - return callback( - OError.tag(err, 'error streaming file from disk', { - fsPath, - byteLength, - }) - ) + pipeline(fs.createReadStream(fsPath), hash, err => { + if (err) { + callback( + OError.tag(err, 'error streaming file from disk', { + fsPath, + byteLength, + }) + ) + } else { + hash.end() + callback(null, hash.read(), byteLength) + } }) - - stream.on('end', function () { - hash.end() - return callback(null, hash.read(), byteLength) - }) - - return stream.pipe(hash) }) } diff --git a/services/project-history/app/js/HistoryStoreManager.js b/services/project-history/app/js/HistoryStoreManager.js index 6a40284e12..72f71205ab 100644 --- a/services/project-history/app/js/HistoryStoreManager.js +++ b/services/project-history/app/js/HistoryStoreManager.js @@ -11,6 +11,7 @@ import * as Versions from './Versions.js' import * as Errors from './Errors.js' import * as LocalFileWriter from './LocalFileWriter.js' import * as HashManager from './HashManager.js' +import fetch from 'node-fetch' const HTTP_REQUEST_TIMEOUT = 300 * 1000 // 5 minutes @@ -168,14 +169,24 @@ export function getProjectBlob(historyId, blobHash, callback) { } export function getProjectBlobStream(historyId, blobHash, callback) { + const url = `${Settings.overleaf.history.host}/projects/${historyId}/blobs/${blobHash}` logger.debug( { historyId, blobHash }, 'getting blob stream from history service' ) - _requestHistoryServiceStream( - { path: `projects/${historyId}/blobs/${blobHash}` }, - callback - ) + fetch(url, getHistoryFetchOptions()) + .then(res => { + if (!res.ok) { + const err = new OError( + `history store a non-success status code: ${res.status}` + ) + err.statusCode = res.status + logger.warn({ err, url }, 'cannot get project blob') + return callback(err) + } + callback(null, res.body) + }) + .catch(err => callback(OError.tag(err))) } export function sendChanges( @@ -224,6 +235,7 @@ export function createBlobForUpdate(projectId, historyId, update, callback) { LocalFileWriter.bufferOnDisk( stringStream, + '', `project-${projectId}-doc-${update.doc}`, (fsPath, cb) => { _createBlob(historyId, fsPath, cb) @@ -245,50 +257,47 @@ export function createBlobForUpdate(projectId, historyId, update, callback) { return callback(new OError('invalid project for blob creation')) } const fileId = urlMatch[2] - const fileStoreStream = request.get({ - url: `${Settings.apis.filestore.url}/project/${projectId}/file/${fileId}`, - timeout: HTTP_REQUEST_TIMEOUT, - }) - fileStoreStream.pause() - fileStoreStream.on('error', err => { - callback(OError.tag(err, 'error from filestore', { url: update.url })) - }) - fileStoreStream.on('response', response => { - if (response.statusCode >= 200 && response.statusCode < 300) { - LocalFileWriter.bufferOnDisk( - fileStoreStream, - `project-${projectId}-file-${fileId}`, - (fsPath, cb) => { - _createBlob(historyId, fsPath, cb) - }, - callback - ) - fileStoreStream.resume() // start data flowing when ready - } else if (response.statusCode === 404) { - logger.warn( - { projectId, historyId, fileStoreUrl: update.url }, - 'File contents not found in filestore. Storing in history as an empty file' - ) - const emptyStream = new StringStream() - LocalFileWriter.bufferOnDisk( - emptyStream, - `project-${projectId}-file-${fileId}`, - (fsPath, cb) => { - _createBlob(historyId, fsPath, cb) - }, - callback - ) - fileStoreStream.resume() // Drain the filestore stream - emptyStream.push(null) // send an EOF signal - } else { - const error = new OError( - `bad response from filestore: ${response.statusCode}`, - { url: update.url, statusCode: response.statusCode } - ) - fileStoreStream.resume() // See https://github.com/overleaf/write_latex/wiki/Streams-and-pipes-in-Node.js#discard-data-if-necessary-in-the-response-handler - callback(error) - } - }) + const filestoreURL = `${Settings.apis.filestore.url}/project/${projectId}/file/${fileId}` + fetch(filestoreURL, { signal: AbortSignal.timeout(HTTP_REQUEST_TIMEOUT) }) + .then(response => { + const statusCode = response.status + if (statusCode >= 200 && statusCode < 300) { + LocalFileWriter.bufferOnDisk( + response.body, + filestoreURL, + `project-${projectId}-file-${fileId}`, + (fsPath, cb) => { + _createBlob(historyId, fsPath, cb) + }, + callback + ) + } else if (statusCode === 404) { + logger.warn( + { projectId, historyId, filestoreURL }, + 'File contents not found in filestore. Storing in history as an empty file' + ) + const emptyStream = new StringStream() + LocalFileWriter.bufferOnDisk( + emptyStream, + filestoreURL, + `project-${projectId}-file-${fileId}`, + (fsPath, cb) => { + _createBlob(historyId, fsPath, cb) + }, + callback + ) + emptyStream.push(null) // send an EOF signal + } else { + const error = new OError( + `bad response from filestore: ${statusCode}`, + { filestoreURL, statusCode } + ) + callback(error) + } + }) + .catch(err => + callback(OError.tag(err, 'error from filestore', { filestoreURL })) + ) } else { const error = new OError('invalid update for blob creation') callback(error) @@ -304,33 +313,24 @@ function _createBlob(historyId, fsPath, _callback) { } const outStream = fs.createReadStream(fsPath) - outStream.on('error', err => { - callback( - OError.tag(err, 'error streaming file from disk', { - fsPath, - hash, - byteLength, - }) - ) - }) - logger.debug( - { fsPath, hash, byteLength }, + { fsPath, historyId, hash, byteLength }, 'sending blob to history service' ) - _requestHistoryService( - { - method: 'PUT', - path: `projects/${historyId}/blobs/${hash}`, - body: outStream, - }, - error => { - if (error) { - return callback(OError.tag(error)) + const url = `${Settings.overleaf.history.host}/projects/${historyId}/blobs/${hash}` + fetch(url, { method: 'PUT', body: outStream, ...getHistoryFetchOptions() }) + .then(res => { + if (!res.ok) { + const err = new OError( + `history store a non-success status code: ${res.status}` + ) + err.statusCode = res.status + logger.warn({ err, url }, 'cannot create project blob') + return callback(err) } callback(null, hash) - } - ) + }) + .catch(err => callback(OError.tag(err))) }) } @@ -407,6 +407,22 @@ function _requestOptions(options) { return requestOptions } +/** + * @return {RequestInit} + */ +function getHistoryFetchOptions() { + return { + signal: AbortSignal.timeout(HTTP_REQUEST_TIMEOUT), + headers: { + Authorization: + 'Basic ' + + Buffer.from( + `${Settings.overleaf.history.user}:${Settings.overleaf.history.pass}` + ).toString('base64'), + }, + } +} + function _requestHistoryService(options, callback) { const requestOptions = _requestOptions(options) request(requestOptions, (error, res, body) => { @@ -427,21 +443,3 @@ function _requestHistoryService(options, callback) { } }) } - -function _requestHistoryServiceStream(options, callback) { - callback = _.once(callback) - const requestOptions = _requestOptions(options) - const stream = request(requestOptions) - stream.on('error', callback) - stream.on('response', res => { - if (res.statusCode >= 200 && res.statusCode < 300) { - callback(null, stream) - } else { - const error = new OError( - `history store a non-success status code: ${res.statusCode}` - ) - logger.warn({ err: error, options }, error.message) - callback(error) - } - }) -} diff --git a/services/project-history/app/js/HttpController.js b/services/project-history/app/js/HttpController.js index bb08a7a4ef..3f80694f0a 100644 --- a/services/project-history/app/js/HttpController.js +++ b/services/project-history/app/js/HttpController.js @@ -15,6 +15,7 @@ import * as LabelsManager from './LabelsManager.js' import * as HistoryApiManager from './HistoryApiManager.js' import * as RetryManager from './RetryManager.js' import * as FlushManager from './FlushManager.js' +import { pipeline } from 'stream' export function getProjectBlob(req, res, next) { const projectId = req.params.project_id @@ -26,7 +27,10 @@ export function getProjectBlob(req, res, next) { if (err != null) { return next(OError.tag(err)) } - stream.pipe(res) + pipeline(stream, res, err => { + if (err) next(err) + // res.end() is already called via 'end' event by pipeline. + }) } ) } @@ -189,7 +193,10 @@ export function getFileSnapshot(req, res, next) { if (error != null) { return next(OError.tag(error)) } - stream.pipe(res) + pipeline(stream, res, err => { + if (err) next(err) + // res.end() is already called via 'end' event by pipeline. + }) } ) } diff --git a/services/project-history/app/js/LocalFileWriter.js b/services/project-history/app/js/LocalFileWriter.js index a37c0905b0..967dde3acd 100644 --- a/services/project-history/app/js/LocalFileWriter.js +++ b/services/project-history/app/js/LocalFileWriter.js @@ -10,9 +10,9 @@ * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md */ import fs from 'fs' +import { pipeline } from 'stream' import { randomUUID } from 'crypto' import path from 'path' -import Url from 'url' import _ from 'lodash' import logger from '@overleaf/logger' import metrics from '@overleaf/metrics' @@ -30,26 +30,21 @@ import * as LargeFileManager from './LargeFileManager.js' // stream can be passed to this method, the data will then be held on disk // rather than in memory and will be cleaned up once it has been consumed. // -export function bufferOnDisk(inStream, fileId, consumeOutStream, callback) { - if (consumeOutStream == null) { - consumeOutStream = function (fsPath, done) {} - } - if (callback == null) { - callback = function () {} - } +export function bufferOnDisk( + inStream, + url, + fileId, + consumeOutStream, + callback +) { const timer = new metrics.Timer('LocalFileWriter.writeStream') - // capture the stream url for logging - const url = inStream.uri && Url.format(inStream.uri) - const fsPath = path.join( Settings.path.uploadFolder, randomUUID() + `-${fileId}` ) - let cleaningUp = false const cleanup = _.once((streamError, res) => { - cleaningUp = true return deleteFile(fsPath, function (cleanupError) { if (streamError) { OError.tag(streamError, 'error deleting temporary file', { @@ -70,48 +65,37 @@ export function bufferOnDisk(inStream, fileId, consumeOutStream, callback) { logger.debug({ fsPath, url }, 'writing file locally') - inStream.on('error', function (err) { - OError.tag(err, 'problem writing file locally, with read stream', { - fsPath, - url, - }) - return cleanup(err) - }) - const writeStream = fs.createWriteStream(fsPath) - writeStream.on('error', function (err) { - OError.tag(err, 'problem writing file locally, with write stream', { - fsPath, - url, - }) - return cleanup(err) - }) - writeStream.on('finish', function () { + pipeline(inStream, writeStream, err => { + if (err) { + OError.tag(err, 'problem writing file locally', { + fsPath, + url, + }) + return cleanup(err) + } timer.done() // in future check inStream.response.headers for hash value here logger.debug({ fsPath, url }, 'stream closed after writing file locally') - if (!cleaningUp) { - const fileSize = writeStream.bytesWritten - return LargeFileManager.replaceWithStubIfNeeded( - fsPath, - fileId, - fileSize, - function (err, newFsPath) { - if (err != null) { - OError.tag(err, 'problem in large file manager', { - newFsPath, - fsPath, - fileId, - fileSize, - }) - return cleanup(err) - } - return consumeOutStream(newFsPath, cleanup) + const fileSize = writeStream.bytesWritten + return LargeFileManager.replaceWithStubIfNeeded( + fsPath, + fileId, + fileSize, + function (err, newFsPath) { + if (err != null) { + OError.tag(err, 'problem in large file manager', { + newFsPath, + fsPath, + fileId, + fileSize, + }) + return cleanup(err) } - ) - } + return consumeOutStream(newFsPath, cleanup) + } + ) }) - return inStream.pipe(writeStream) } export function deleteFile(fsPath, callback) { diff --git a/services/project-history/package.json b/services/project-history/package.json index 9f02ae59c1..027385400e 100644 --- a/services/project-history/package.json +++ b/services/project-history/package.json @@ -39,6 +39,7 @@ "lodash": "^4.17.20", "mongo-uri": "^0.1.2", "mongodb": "^4.11.0", + "node-fetch": "^2.6.0", "overleaf-editor-core": "*", "redis": "~0.10.1", "request": "^2.88.2", diff --git a/services/project-history/test/unit/js/HistoryStoreManager/HistoryStoreManagerTests.js b/services/project-history/test/unit/js/HistoryStoreManager/HistoryStoreManagerTests.js index 416df5a234..174476cc6a 100644 --- a/services/project-history/test/unit/js/HistoryStoreManager/HistoryStoreManagerTests.js +++ b/services/project-history/test/unit/js/HistoryStoreManager/HistoryStoreManagerTests.js @@ -47,8 +47,10 @@ describe('HistoryStoreManager', function () { .yields(null, this.historyId) this.request = sinon.stub() this.request.get = sinon.stub() + this.fetch = sinon.stub().resolves() this.HistoryStoreManager = await esmock(MODULE_PATH, { + 'node-fetch': this.fetch, request: this.request, '@overleaf/settings': this.settings, '../../../../app/js/LocalFileWriter.js': this.LocalFileWriter, @@ -361,17 +363,13 @@ describe('HistoryStoreManager', function () { describe('createBlobForUpdate', function () { beforeEach(function () { - this.fileStream = { - pause: sinon.stub(), - resume: sinon.stub(), - on: sinon.stub(), - } + this.fileStream = {} this.hash = 'random-hash' - this.fileStream.on - .withArgs('response') - .callsArgWith(1, { statusCode: 200 }) - this.LocalFileWriter.bufferOnDisk.callsArgWith(3, null, this.hash) - this.request.get.returns(this.fileStream) + this.LocalFileWriter.bufferOnDisk.callsArgWith(4, null, this.hash) + this.fetch.resolves({ + status: 200, + body: this.fileStream, + }) }) describe('for a file update with any filestore location', function () { @@ -396,9 +394,9 @@ describe('HistoryStoreManager', function () { }) it('should request the file from the filestore in settings', function () { - expect(this.request.get).to.have.been.calledWithMatch({ - url: `${this.settings.apis.filestore.url}/project/${this.projectId}/file/${this.file_id}`, - }) + expect(this.fetch).to.have.been.calledWithMatch( + `${this.settings.apis.filestore.url}/project/${this.projectId}/file/${this.file_id}` + ) }) it('should call the callback with the blob', function () { @@ -469,7 +467,11 @@ describe('HistoryStoreManager', function () { this.historyResponse = new EventEmitter() this.blobHash = 'test hash' - this.request.returns(this.historyResponse) + this.fetch.resolves({ + status: 200, + ok: true, + body: this.historyResponse, + }) this.HistoryStoreManager.getProjectBlobStream( this.historyId, this.blobHash, @@ -481,19 +483,12 @@ describe('HistoryStoreManager', function () { done() } ) - this.historyResponse.emit('response', { statusCode: 200 }) }) it('should get the blob from the overleaf history service', function () { - expect(this.request).to.have.been.calledWithMatch({ - method: 'GET', - url: `${this.settings.overleaf.history.host}/projects/${this.historyId}/blobs/${this.blobHash}`, - auth: { - user: this.settings.overleaf.history.user, - pass: this.settings.overleaf.history.pass, - sendImmediately: true, - }, - }) + expect(this.fetch).to.have.been.calledWithMatch( + `${this.settings.overleaf.history.host}/projects/${this.historyId}/blobs/${this.blobHash}` + ) }) it('should return a stream of the blob contents', function () { diff --git a/services/project-history/test/unit/js/HttpController/HttpControllerTests.js b/services/project-history/test/unit/js/HttpController/HttpControllerTests.js index a74e342c2a..7d427d7afd 100644 --- a/services/project-history/test/unit/js/HttpController/HttpControllerTests.js +++ b/services/project-history/test/unit/js/HttpController/HttpControllerTests.js @@ -52,8 +52,10 @@ describe('HttpController', function () { this.RetryManager = {} this.FlushManager = {} this.request = {} + this.pipeline = sinon.stub() this.HttpController = await esmock(MODULE_PATH, { request: this.request, + stream: { pipeline: this.pipeline }, '../../../../app/js/UpdatesProcessor.js': this.UpdatesProcessor, '../../../../app/js/SummarizedUpdatesManager.js': this.SummarizedUpdatesManager, @@ -85,7 +87,7 @@ describe('HttpController', function () { describe('getProjectBlob', function () { beforeEach(function () { this.blobHash = 'abcd' - this.stream = { pipe: sinon.stub() } + this.stream = {} this.HistoryStoreManager.getProjectBlobStream.yields(null, this.stream) this.HttpController.getProjectBlob( { params: { project_id: this.projectId, hash: this.blobHash } }, @@ -98,7 +100,7 @@ describe('HttpController', function () { this.HistoryStoreManager.getProjectBlobStream .calledWith(this.projectId, this.blobHash) .should.equal(true) - this.stream.pipe.calledWith(this.res).should.equal(true) + this.pipeline.should.have.been.calledWith(this.stream, this.res) }) }) @@ -308,7 +310,7 @@ describe('HttpController', function () { }, } this.res = { mock: 'res' } - this.stream = { pipe: sinon.stub() } + this.stream = {} this.SnapshotManager.getFileSnapshotStream.yields(null, this.stream) this.HttpController.getFileSnapshot(this.req, this.res, this.next) }) @@ -322,7 +324,7 @@ describe('HttpController', function () { }) it('should pipe the returned stream into the response', function () { - this.stream.pipe.calledWith(this.res).should.equal(true) + this.pipeline.should.have.been.calledWith(this.stream, this.res) }) })