Merge pull request #12824 from overleaf/jpa-clsi-pipe

[clsi] rewrite UrlFetcher in async/await and fetch/pipeline

GitOrigin-RevId: a2a90fd886252f06c818f807e85e566f3fc1f841
This commit is contained in:
Jakob Ackermann
2023-04-27 11:16:41 +01:00
committed by Copybot
parent efa68fce61
commit 77da9fd0cd
4 changed files with 45 additions and 364 deletions

2
package-lock.json generated
View File

@@ -33484,6 +33484,7 @@
"fs-extra": "^10.0.0",
"lockfile": "^1.0.4",
"lodash": "^4.17.21",
"node-fetch": "^2.6.7",
"p-limit": "^3.1.0",
"request": "^2.88.2",
"send": "^0.17.1",
@@ -43534,6 +43535,7 @@
"lodash": "^4.17.21",
"mocha": "^10.2.0",
"mock-fs": "^5.1.2",
"node-fetch": "^2.6.7",
"p-limit": "^3.1.0",
"request": "^2.88.2",
"sandboxed-module": "^2.0.4",

View File

@@ -1,142 +1,55 @@
/* eslint-disable
no-return-assign,
no-unused-vars,
n/no-deprecated-api,
*/
// TODO: This file was created by bulk-decaffeinate.
// Fix any style issues and re-enable lint.
/*
* decaffeinate suggestions:
* DS102: Remove unnecessary code created because of implicit returns
* DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/
let UrlFetcher
const request = require('request').defaults({ jar: false })
const fs = require('fs')
const logger = require('@overleaf/logger')
const settings = require('@overleaf/settings')
const async = require('async')
const Settings = require('@overleaf/settings')
const { URL } = require('url')
const { promisify } = require('util')
const fetch = require('node-fetch')
const pipeline = promisify(require('stream').pipeline)
const oneMinute = 60 * 1000
module.exports = UrlFetcher = {
pipeUrlToFileWithRetry(url, filePath, callback) {
const doDownload = function (cb) {
UrlFetcher.pipeUrlToFile(url, filePath, cb)
}
async.retry(3, doDownload, callback)
},
pipeUrlToFile(url, filePath, _callback) {
if (_callback == null) {
_callback = function () {}
}
const callbackOnce = function (error) {
if (timeoutHandler != null) {
clearTimeout(timeoutHandler)
}
_callback(error)
return (_callback = function () {})
}
const u = new URL(url)
if (
settings.filestoreDomainOveride &&
u.host !== settings.apis.clsiPerf.host
) {
url = `${settings.filestoreDomainOveride}${u.pathname}${u.search}`
}
let timeoutHandler = setTimeout(
function () {
timeoutHandler = null
logger.error({ url, filePath }, 'Timed out downloading file to cache')
return callbackOnce(
new Error(`Timed out downloading file to cache ${url}`)
)
},
// FIXME: maybe need to close fileStream here
3 * oneMinute
)
logger.debug({ url, filePath }, 'started downloading url to cache')
const urlStream = request.get({ url, timeout: oneMinute })
urlStream.pause() // stop data flowing until we are ready
// attach handlers before setting up pipes
urlStream.on('error', function (error) {
logger.error({ err: error, url, filePath }, 'error downloading url')
return callbackOnce(
error || new Error(`Something went wrong downloading the URL ${url}`)
async function pipeUrlToFileWithRetry(url, filePath) {
let remainingAttempts = 3
let lastErr
while (remainingAttempts-- > 0) {
try {
await pipeUrlToFile(url, filePath)
return
} catch (err) {
logger.warn(
{ err, url, filePath, remainingAttempts },
'error downloading url'
)
})
lastErr = err
}
}
throw lastErr
}
urlStream.on('end', () =>
logger.debug({ url, filePath }, 'finished downloading file into cache')
)
async function pipeUrlToFile(url, filePath) {
const u = new URL(url)
if (
Settings.filestoreDomainOveride &&
u.host !== Settings.apis.clsiPerf.host
) {
url = `${Settings.filestoreDomainOveride}${u.pathname}${u.search}`
}
return urlStream.on('response', function (res) {
if (res.statusCode >= 200 && res.statusCode < 300) {
const atomicWrite = filePath + '~'
const fileStream = fs.createWriteStream(atomicWrite)
const res = await fetch(url, { signal: AbortSignal.timeout(60 * 1000) })
if (res.status !== 200) {
throw new Error('non success response: ' + res.statusText)
}
// attach handlers before setting up pipes
fileStream.on('error', function (error) {
logger.error(
{ err: error, url, filePath },
'error writing file into cache'
)
return fs.unlink(atomicWrite, function (err) {
if (err != null) {
logger.err({ err, filePath }, 'error deleting file from cache')
}
return callbackOnce(error)
})
})
fileStream.on('finish', function () {
logger.debug({ url, filePath }, 'finished writing file into cache')
fs.rename(atomicWrite, filePath, error => {
if (error) {
fs.unlink(atomicWrite, () => callbackOnce(error))
} else {
callbackOnce()
}
})
})
fileStream.on('pipe', () =>
logger.debug({ url, filePath }, 'piping into filestream')
)
urlStream.pipe(fileStream)
return urlStream.resume() // now we are ready to handle the data
} else {
logger.error(
{ statusCode: res.statusCode, url, filePath },
'unexpected status code downloading url to cache'
)
// https://nodejs.org/api/http.html#http_class_http_clientrequest
// If you add a 'response' event handler, then you must consume
// the data from the response object, either by calling
// response.read() whenever there is a 'readable' event, or by
// adding a 'data' handler, or by calling the .resume()
// method. Until the data is consumed, the 'end' event will not
// fire. Also, until the data is read it will consume memory
// that can eventually lead to a 'process out of memory' error.
urlStream.resume() // discard the data
return callbackOnce(
new Error(
`URL returned non-success status code: ${res.statusCode} ${url}`
)
)
}
})
},
const atomicWrite = filePath + '~'
try {
await pipeline(res.body, fs.createWriteStream(atomicWrite))
await fs.promises.rename(atomicWrite, filePath)
} catch (err) {
try {
await fs.promises.unlink(atomicWrite)
} catch (e) {}
throw err
}
}
module.exports.promises = {
pipeUrlToFileWithRetry: promisify(UrlFetcher.pipeUrlToFileWithRetry),
pipeUrlToFileWithRetry,
}

View File

@@ -29,6 +29,7 @@
"fs-extra": "^10.0.0",
"lockfile": "^1.0.4",
"lodash": "^4.17.21",
"node-fetch": "^2.6.7",
"p-limit": "^3.1.0",
"request": "^2.88.2",
"send": "^0.17.1",

View File

@@ -1,235 +0,0 @@
/* eslint-disable
no-return-assign,
*/
// TODO: This file was created by bulk-decaffeinate.
// Fix any style issues and re-enable lint.
/*
* decaffeinate suggestions:
* DS102: Remove unnecessary code created because of implicit returns
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/
const SandboxedModule = require('sandboxed-module')
const sinon = require('sinon')
const { expect } = require('chai')
const modulePath = require('path').join(__dirname, '../../../app/js/UrlFetcher')
const { EventEmitter } = require('events')
describe('UrlFetcher', function () {
beforeEach(function () {
this.callback = sinon.stub()
this.url = 'https://www.example.com/file/here?query=string'
return (this.UrlFetcher = SandboxedModule.require(modulePath, {
requires: {
request: {
defaults: (this.defaults = sinon.stub().returns((this.request = {}))),
},
fs: (this.fs = {
rename: sinon.stub().yields(),
unlink: sinon.stub().yields(),
}),
'@overleaf/settings': (this.settings = {
apis: {
clsiPerf: {
host: 'localhost:3043',
},
},
}),
},
}))
})
describe('pipeUrlToFileWithRetry', function () {
this.beforeEach(function () {
this.UrlFetcher.pipeUrlToFile = sinon.stub()
})
it('should call pipeUrlToFile', function (done) {
this.UrlFetcher.pipeUrlToFile.callsArgWith(2)
this.UrlFetcher.pipeUrlToFileWithRetry(this.url, this.path, err => {
expect(err).to.equal(undefined)
this.UrlFetcher.pipeUrlToFile.called.should.equal(true)
done()
})
})
it('should call pipeUrlToFile multiple times on error', function (done) {
const error = new Error("couldn't download file")
this.UrlFetcher.pipeUrlToFile.callsArgWith(2, error)
this.UrlFetcher.pipeUrlToFileWithRetry(this.url, this.path, err => {
expect(err).to.equal(error)
this.UrlFetcher.pipeUrlToFile.callCount.should.equal(3)
done()
})
})
it('should call pipeUrlToFile twice if only 1 error', function (done) {
this.UrlFetcher.pipeUrlToFile.onCall(0).callsArgWith(2, 'error')
this.UrlFetcher.pipeUrlToFile.onCall(1).callsArgWith(2)
this.UrlFetcher.pipeUrlToFileWithRetry(this.url, this.path, err => {
expect(err).to.equal(undefined)
this.UrlFetcher.pipeUrlToFile.callCount.should.equal(2)
done()
})
})
})
describe('pipeUrlToFile', function () {
it('should turn off the cookie jar in request', function () {
return this.defaults.calledWith({ jar: false }).should.equal(true)
})
describe('rewrite url domain if filestoreDomainOveride is set', function () {
beforeEach(function () {
this.path = '/path/to/file/on/disk'
this.request.get = sinon
.stub()
.returns((this.urlStream = new EventEmitter()))
this.urlStream.pipe = sinon.stub()
this.urlStream.pause = sinon.stub()
this.urlStream.resume = sinon.stub()
this.fs.createWriteStream = sinon
.stub()
.returns((this.fileStream = new EventEmitter()))
return (this.fs.unlink = (file, callback) => callback())
})
it('should use the normal domain when override not set', function (done) {
this.UrlFetcher.pipeUrlToFile(this.url, this.path, () => {
this.request.get.args[0][0].url.should.equal(this.url)
return done()
})
this.res = { statusCode: 200 }
this.urlStream.emit('response', this.res)
this.urlStream.emit('end')
return this.fileStream.emit('finish')
})
it('should not use override clsiPerf domain when filestoreDomainOveride is set', function (done) {
this.settings.filestoreDomainOveride = '192.11.11.11'
const url = 'http://localhost:3043/file/here?query=string'
this.UrlFetcher.pipeUrlToFile(url, this.path, () => {
this.request.get.args[0][0].url.should.equal(url)
done()
})
this.res = { statusCode: 200 }
this.urlStream.emit('response', this.res)
this.urlStream.emit('end')
this.fileStream.emit('finish')
})
return it('should use override domain when filestoreDomainOveride is set', function (done) {
this.settings.filestoreDomainOveride = '192.11.11.11'
this.UrlFetcher.pipeUrlToFile(this.url, this.path, () => {
this.request.get.args[0][0].url.should.equal(
'192.11.11.11/file/here?query=string'
)
return done()
})
this.res = { statusCode: 200 }
this.urlStream.emit('response', this.res)
this.urlStream.emit('end')
return this.fileStream.emit('finish')
})
})
return describe('pipeUrlToFile', function () {
beforeEach(function (done) {
this.path = '/path/to/file/on/disk'
this.request.get = sinon
.stub()
.returns((this.urlStream = new EventEmitter()))
this.urlStream.pipe = sinon.stub()
this.urlStream.pause = sinon.stub()
this.urlStream.resume = sinon.stub()
this.fs.createWriteStream = sinon
.stub()
.returns((this.fileStream = new EventEmitter()))
this.fs.unlink = (file, callback) => callback()
return done()
})
describe('successfully', function () {
beforeEach(function (done) {
this.UrlFetcher.pipeUrlToFile(this.url, this.path, () => {
this.callback()
return done()
})
this.res = { statusCode: 200 }
this.urlStream.emit('response', this.res)
this.urlStream.emit('end')
return this.fileStream.emit('finish')
})
it('should request the URL', function () {
return this.request.get
.calledWith(sinon.match({ url: this.url }))
.should.equal(true)
})
it('should open the atomic file for writing', function () {
return this.fs.createWriteStream
.calledWith(this.path + '~')
.should.equal(true)
})
it('should move the atomic file to the target', function () {
return this.fs.rename
.calledWith(this.path + '~', this.path)
.should.equal(true)
})
it('should pipe the URL to the file', function () {
return this.urlStream.pipe
.calledWith(this.fileStream)
.should.equal(true)
})
return it('should call the callback', function () {
return this.callback.called.should.equal(true)
})
})
describe('with non success status code', function () {
beforeEach(function (done) {
this.UrlFetcher.pipeUrlToFile(this.url, this.path, err => {
this.callback(err)
return done()
})
this.res = { statusCode: 404 }
this.urlStream.emit('response', this.res)
return this.urlStream.emit('end')
})
it('should call the callback with an error', function () {
this.callback.calledWith(sinon.match(Error)).should.equal(true)
const message = this.callback.args[0][0].message
expect(message).to.include(
'URL returned non-success status code: 404'
)
})
})
return describe('with error', function () {
beforeEach(function (done) {
this.UrlFetcher.pipeUrlToFile(this.url, this.path, err => {
this.callback(err)
return done()
})
return this.urlStream.emit(
'error',
(this.error = new Error('something went wrong'))
)
})
it('should call the callback with the error', function () {
return this.callback.calledWith(this.error).should.equal(true)
})
return it('should only call the callback once, even if end is called', function () {
this.urlStream.emit('end')
return this.callback.calledOnce.should.equal(true)
})
})
})
})
})