From 10c012d85619b6cbb6a064f17e84effa6dc247a2 Mon Sep 17 00:00:00 2001 From: Henry Oswald Date: Wed, 21 May 2014 12:28:31 +0100 Subject: [PATCH] removed tpdsworker --- services/web/Gruntfile.coffee | 4 -- services/web/TpdsWorker.coffee | 122 --------------------------------- 2 files changed, 126 deletions(-) delete mode 100644 services/web/TpdsWorker.coffee diff --git a/services/web/Gruntfile.coffee b/services/web/Gruntfile.coffee index b35738611f..da02c27a63 100644 --- a/services/web/Gruntfile.coffee +++ b/services/web/Gruntfile.coffee @@ -27,10 +27,6 @@ module.exports = (grunt) -> app: src: 'app.coffee' dest: 'app.js' - - TpdsWorker: - src: 'TpdsWorker.coffee' - dest: 'TpdsWorker.js' BackgroundJobsWorker: src: 'BackgroundJobsWorker.coffee' diff --git a/services/web/TpdsWorker.coffee b/services/web/TpdsWorker.coffee deleted file mode 100644 index cfc5dcd875..0000000000 --- a/services/web/TpdsWorker.coffee +++ /dev/null @@ -1,122 +0,0 @@ -async = require('async') -request = require('request') -keys = require('./app/js/infrastructure/Keys') -settings = require('settings-sharelatex') -logger = require('logger-sharelatex') -_ = require('underscore') -childProcess = require("child_process") -metrics = require("./app/js/infrastructure/Metrics") - -fiveMinutes = 5 * 60 * 1000 - - -processingFuncs = - - sendDoc : (options, callback)-> - if !options.docLines? || options.docLines.length == 0 - logger.err options:options, "doc lines not added to options for processing" - return callback() - docLines = options.docLines.reduce (singleLine, line)-> "#{singleLine}\n#{line}" - post = request(options) - post.on 'error', (err)-> - if err? - callback(err) - else - callback() - post.on 'end', callback - post.write(docLines, 'utf-8') - - standardHttpRequest: (options, callback)-> - request options, (err, reponse, body)-> - if err? - callback(err) - else - callback() - - pipeStreamFrom: (options, _callback)-> - callback = (args...) -> - _callback(args...) - _callback = () -> - - if options.filePath == "/droppy/main.tex" - request options.streamOrigin, (err,res, body)-> - logger.log options:options, body:body - - origin = request(options.streamOrigin) - - cancelled = false - gotResponse = false - origin.on 'response', (res) -> - return if cancelled - gotResponse = true - if 200 <= res.statusCode < 300 - dest = request(options) - origin.pipe(dest) - - dest.on "error", (err)-> - logger.error err:err, options:options, "something went wrong in pipeStreamFrom dest" - callback(err) - - dest.on 'end', callback - else - error = new Error("received non-success status code: #{res.statusCode}") - logger.error err: error, options: options, "something went wrong connecting to origin" - callback(error) - - origin.on 'error', (err)-> - return if cancelled - gotResponse = true - logger.error err:err, options:options, "something went wrong in pipeStreamFrom origin" - callback(err) - - setTimeout () -> - return if gotResponse - cancelled = true - error = new Error("timeout") - logger.error err: error, options: options, "timeout" - callback(error) - , 2000 - - - -workerRegistration = (groupKey, method, options, callback)-> - callback = _.once callback - setTimeout callback, fiveMinutes - metrics.inc "tpds-worker-processing" - logger.log groupKey:groupKey, method:method, options:options, "processing http request from queue" - processingFuncs[method] options, (err)-> - if err? - logger.err err:err, user_id:groupKey, method:method, options:options, "something went wrong processing tpdsUpdateSender update" - return callback("skip-after-retry") - callback() - - -setupWebToTpdsWorkers = (queueName)-> - logger.log worker_count:worker_count, queueName:queueName, "fairy workers" - worker_count = 4 - while worker_count-- > 0 - workerQueueRef = require('fairy').connect(settings.redis.fairy).queue(queueName) - workerQueueRef.polling_interval = 100 - workerQueueRef.regist workerRegistration - - -cleanupPreviousQueues = (queueName, callback)-> - #cleanup queues then setup workers - fairy = require('fairy').connect(settings.redis.fairy) - queuePrefix = "FAIRY:QUEUED:#{queueName}:" - fairy.redis.keys "#{queuePrefix}*", (err, keys)-> - logger.log "#{keys.length} fairy queues need cleanup" - queueNames = keys.map (key)-> - key.replace queuePrefix, "" - cleanupJobs = queueNames.map (projectQueueName)-> - return (cb)-> - cleanup = childProcess.fork(__dirname + '/cleanup.js', [queueName, projectQueueName]) - cleanup.on 'exit', cb - async.series cleanupJobs, callback - - -cleanupPreviousQueues keys.queue.web_to_tpds_http_requests, -> - setupWebToTpdsWorkers keys.queue.web_to_tpds_http_requests - -cleanupPreviousQueues keys.queue.tpds_to_web_http_requests, -> - setupWebToTpdsWorkers keys.queue.tpds_to_web_http_requests