remove pack worker

remove the op-specific code

remove tests for ops, now only packing

remove unused packing code

work in progress

store index for completed packs only

support archiving and unarchiving of individual packs

remove support for archiving whole document history

split out ArchiveManager, IndexManager

remove old DocArchive code

remove docHistoryStats collection

comment about archiving

added method to look at index when last pack has been archived

added start of iterator for project results

use a proper iterator

added heap module

getting it working

increase pack size since bulk operations no longer needed

remove unused MongoAWSexternal

cleanup

added doc iterator

remove old query code

added missing files

cleanup

clean upclean up

started adding pack worker for archiving

work in progress

work in progress

getting pack worker working

updating worker

getting packworker working

added lock

use correct key name for track changes aws access

use correct key name for track changes aws access

always send back users array

fix up comparison of retrieved objects

handle op ids inside packs

log when s3 download completes

comments

cleanup, remove finalisation ideacleanup, remove finalisation idea

remove logging
This commit is contained in:
Brian Gough
2016-02-08 16:22:42 +00:00
parent a23ddf31c0
commit 3d9dfeccc3
18 changed files with 627 additions and 1321 deletions

View File

@@ -48,12 +48,6 @@ app.post "/project/:project_id/flush", HttpController.flushProject
app.post "/project/:project_id/doc/:doc_id/version/:version/restore", HttpController.restore
app.post "/doc/:doc_id/pack", HttpController.packDoc
app.get "/doc/list", HttpController.listDocs
app.post '/project/:project_id/archive', HttpController.archiveProject
app.post '/project/:project_id/unarchive', HttpController.unArchiveProject
packWorker = null # use a single packing worker
app.post "/pack", (req, res, next) ->

View File

@@ -14,7 +14,6 @@ module.exports = DiffManager =
callback(null, content, version, updates)
getDiff: (project_id, doc_id, fromVersion, toVersion, callback = (error, diff) ->) ->
logger.log project_id: project_id, doc_id: doc_id, from: fromVersion, to: toVersion, "getting diff"
DiffManager.getDocumentBeforeVersion project_id, doc_id, fromVersion, (error, startingContent, updates) ->
if error?
if error.message == "broken-history"

View File

@@ -1,88 +0,0 @@
MongoManager = require "./MongoManager"
MongoAWS = require "./MongoAWS"
LockManager = require "./LockManager"
DocstoreHandler = require "./DocstoreHandler"
logger = require "logger-sharelatex"
_ = require "underscore"
async = require "async"
settings = require("settings-sharelatex")
module.exports = DocArchiveManager =
archiveAllDocsChanges: (project_id, callback = (error, docs) ->) ->
DocstoreHandler.getAllDocs project_id, (error, docs) ->
if error?
return callback(error)
else if !docs?
return callback new Error("No docs for project #{project_id}")
jobs = _.map docs, (doc) ->
(cb)-> DocArchiveManager.archiveDocChangesWithLock project_id, doc._id, cb
async.series jobs, callback
archiveDocChangesWithLock: (project_id, doc_id, callback = (error) ->) ->
job = (releaseLock) ->
DocArchiveManager.archiveDocChanges project_id, doc_id, releaseLock
LockManager.runWithLock("HistoryLock:#{doc_id}", job, callback)
archiveDocChanges: (project_id, doc_id, callback)->
MongoManager.getArchivedDocStatus doc_id, (error, result) ->
return callback(error) if error?
if result?.inS3 is true
logger.log {project_id, doc_id}, "document history is already archived"
return callback()
if result?.inS3?
logger.log {project_id, doc_id}, "document history archive is already in progress"
return callback()
MongoManager.getDocChangesCount doc_id, (error, count) ->
return callback(error) if error?
if count == 0
logger.log {project_id, doc_id}, "document history is empty, not archiving"
return callback()
MongoManager.peekLastCompressedUpdate doc_id, (error, update, lastVersion) ->
return callback(error) if error?
logger.log {doc_id, project_id}, "archiving got last compressed update"
MongoManager.markDocHistoryAsArchiveInProgress doc_id, lastVersion, (error) ->
return callback(error) if error?
logger.log {doc_id, project_id}, "marked doc history as archive in progress"
MongoAWS.archiveDocHistory project_id, doc_id, update, (error) ->
if error?
logger.log {doc_id, project_id, error}, "error exporting document to S3"
MongoManager.clearDocHistoryAsArchiveInProgress doc_id, update, (err) ->
return callback(err) if err?
logger.log {doc_id, project_id}, "cleared archive in progress flag"
callback(error)
else
logger.log doc_id:doc_id, project_id:project_id, "exported document to S3"
MongoManager.markDocHistoryAsArchived doc_id, lastVersion, (error) ->
return callback(error) if error?
logger.log {doc_id, project_id}, "marked doc history as archived"
callback()
unArchiveAllDocsChanges: (project_id, callback = (error, docs) ->) ->
DocstoreHandler.getAllDocs project_id, (error, docs) ->
if error?
return callback(error)
else if !docs?
return callback new Error("No docs for project #{project_id}")
jobs = _.map docs, (doc) ->
(cb)-> DocArchiveManager.unArchiveDocChangesWithLock project_id, doc._id, cb
async.parallelLimit jobs, 4, callback
unArchiveDocChangesWithLock: (project_id, doc_id, callback = (error) ->) ->
job = (releaseLock) ->
DocArchiveManager.unArchiveDocChanges project_id, doc_id, releaseLock
LockManager.runWithLock("HistoryLock:#{doc_id}", job, callback)
unArchiveDocChanges: (project_id, doc_id, callback)->
MongoManager.getArchivedDocStatus doc_id, (error, result) ->
return callback(error) if error?
if result?.inS3 isnt true
logger.log {project_id, doc_id}, "no changes marked as in s3, not unarchiving"
return callback()
else
MongoAWS.unArchiveDocHistory project_id, doc_id, (error) ->
return callback(error) if error?
logger.log doc_id:doc_id, project_id:project_id, "imported document from S3"
MongoManager.markDocHistoryAsUnarchived doc_id, (error) ->
return callback(error) if error?
callback()

View File

@@ -1,21 +0,0 @@
request = require("request").defaults(jar: false)
logger = require "logger-sharelatex"
settings = require "settings-sharelatex"
module.exports = DocstoreHandler =
getAllDocs: (project_id, callback = (error) ->) ->
logger.log project_id: project_id, "getting all docs for project in docstore api"
url = "#{settings.apis.docstore.url}/project/#{project_id}/doc"
request.get {
url: url
json: true
}, (error, res, docs) ->
return callback(error) if error?
logger.log {error, res, docs: if docs?.length then docs.map (d) -> d._id else []}, "docstore response"
if 200 <= res.statusCode < 300
callback(null, docs)
else
error = new Error("docstore api responded with non-success code: #{res.statusCode}")
logger.error err: error, project_id: project_id, "error getting all docs from docstore"
callback(error)

View File

@@ -3,7 +3,6 @@ DiffManager = require "./DiffManager"
PackManager = require "./PackManager"
RestoreManager = require "./RestoreManager"
logger = require "logger-sharelatex"
DocArchiveManager = require "./DocArchiveManager"
HealthChecker = require "./HealthChecker"
_ = require "underscore"
@@ -23,23 +22,6 @@ module.exports = HttpController =
return next(error) if error?
res.send 204
listDocs: (req, res, next = (error) ->) ->
logger.log "listing packing doc history"
limit = +req.query?.limit || 100
doc_id = req.query?.doc_id if req.query?.doc_id?.match(/^[0-9a-f]{24}$/)
PackManager.listDocs {limit, doc_id}, (error, doc_ids) ->
return next(error) if error?
ids = (doc.doc_id.toString() for doc in doc_ids)
output = _.uniq(ids).join("\n") + "\n"
res.send output
packDoc: (req, res, next = (error) ->) ->
doc_id = req.params.doc_id
logger.log doc_id: doc_id, "packing doc history"
PackManager.packDocHistory doc_id, (error) ->
return next(error) if error?
res.send 204
checkDoc: (req, res, next = (error) ->) ->
doc_id = req.params.doc_id
project_id = req.params.project_id
@@ -68,7 +50,7 @@ module.exports = HttpController =
else
to = null
logger.log project_id, doc_id: doc_id, from: from, to: to, "getting diff"
logger.log {project_id, doc_id, from, to}, "getting diff"
DiffManager.getDiff project_id, doc_id, from, to, (error, diff) ->
return next(error) if error?
res.send JSON.stringify(diff: diff)
@@ -95,20 +77,6 @@ module.exports = HttpController =
return next(error) if error?
res.send 204
archiveProject: (req, res, next = (error) ->) ->
project_id = req.params.project_id
logger.log project_id: project_id, "archiving all track changes to s3"
DocArchiveManager.archiveAllDocsChanges project_id, (error) ->
return next(error) if error?
res.send 204
unArchiveProject: (req, res, next = (error) ->) ->
project_id = req.params.project_id
logger.log project_id: project_id, "unarchiving all track changes from s3"
DocArchiveManager.unArchiveAllDocsChanges project_id, (error) ->
return next(error) if error?
res.send 204
healthCheck: (req, res)->
HealthChecker.check (err)->
if err?

View File

@@ -5,110 +5,106 @@ S3S = require 's3-streams'
{db, ObjectId} = require "./mongojs"
JSONStream = require "JSONStream"
ReadlineStream = require "byline"
zlib = require "zlib"
DAYS = 24 * 3600 * 1000 # one day in milliseconds
AWS_CONFIG = {
accessKeyId: settings.trackchanges.s3.key
secretAccessKey: settings.trackchanges.s3.secret
}
createStream = (streamConstructor, project_id, doc_id, pack_id) ->
return streamConstructor new AWS.S3(AWS_CONFIG), {
"Bucket": settings.trackchanges.stores.doc_history,
"Key": project_id+"/changes-"+doc_id+"/pack-"+pack_id
}
module.exports = MongoAWS =
MAX_SIZE: 1024*1024 # almost max size
MAX_COUNT: 512 # almost max count
archiveDocHistory: (project_id, doc_id, update, _callback = (error) ->) ->
archivePack: (project_id, doc_id, pack_id, _callback = (error) ->) ->
callback = (args...) ->
_callback(args...)
_callback = () ->
query = {
_id: ObjectId(pack_id)
doc_id: ObjectId(doc_id)
v: {$lte: update.v}
expiresAt: {$exists : false}
}
AWS.config.update {
accessKeyId: settings.filestore.s3.key
secretAccessKey: settings.filestore.s3.secret
}
return callback new Error("invalid project id") if not project_id?
return callback new Error("invalid doc id") if not doc_id?
return callback new Error("invalid pack id") if not pack_id?
logger.log {project_id, doc_id}, "uploading data to s3"
logger.log {project_id, doc_id, pack_id}, "uploading data to s3"
upload = S3S.WriteStream new AWS.S3(), {
"Bucket": settings.filestore.stores.user_files,
"Key": project_id+"/changes-"+doc_id
}
upload = createStream S3S.WriteStream, project_id, doc_id, pack_id
db.docHistory.find(query)
.on 'error', (err) ->
callback(err)
.pipe JSONStream.stringify()
.pipe upload
.on 'error', (err) ->
callback(err)
.on 'finish', () ->
return callback(null)
unArchiveDocHistory: (project_id, doc_id, _callback = (error) ->) ->
db.docHistory.findOne query, (err, result) ->
return callback(err) if err?
return callback new Error("cannot find pack to send to s3") if not result?
return callback new Error("refusing to send pack with TTL to s3") if result.expiresAt?
uncompressedData = JSON.stringify(result)
zlib.gzip uncompressedData, (err, buf) ->
logger.log {project_id, doc_id, pack_id, origSize: uncompressedData.length, newSize: buf.length}, "compressed pack"
return callback(err) if err?
upload.on 'error', (err) ->
callback(err)
upload.on 'finish', () ->
logger.log {project_id, doc_id, pack_id}, "upload to s3 completed"
callback(null)
upload.write buf
upload.end()
readArchivedPack: (project_id, doc_id, pack_id, _callback = (error, result) ->) ->
callback = (args...) ->
_callback(args...)
_callback = () ->
AWS.config.update {
accessKeyId: settings.filestore.s3.key
secretAccessKey: settings.filestore.s3.secret
}
return callback new Error("invalid project id") if not project_id?
return callback new Error("invalid doc id") if not doc_id?
return callback new Error("invalid pack id") if not pack_id?
logger.log {project_id, doc_id}, "downloading data from s3"
logger.log {project_id, doc_id, pack_id}, "downloading data from s3"
download = S3S.ReadStream new AWS.S3(), {
"Bucket": settings.filestore.stores.user_files,
"Key": project_id+"/changes-"+doc_id
}, {
encoding: "utf8"
}
lineStream = new ReadlineStream();
ops = []
sz = 0
download = createStream S3S.ReadStream, project_id, doc_id, pack_id
inputStream = download
.on 'open', (obj) ->
return 1
.on 'error', (err) ->
callback(err)
.pipe lineStream
inputStream.on 'data', (line) ->
if line.length > 2
try
ops.push(JSON.parse(line))
catch err
return callback(err)
sz += line.length
if ops.length >= MongoAWS.MAX_COUNT || sz >= MongoAWS.MAX_SIZE
inputStream.pause()
MongoAWS.handleBulk ops.slice(0), sz, () ->
inputStream.resume()
ops.splice(0,ops.length)
sz = 0
.on 'end', () ->
MongoAWS.handleBulk ops, sz, callback
.on 'error', (err) ->
gunzip = zlib.createGunzip()
gunzip.setEncoding('utf8')
gunzip.on 'error', (err) ->
logger.log {project_id, doc_id, pack_id, err}, "error uncompressing gzip stream"
callback(err)
outputStream = inputStream.pipe gunzip
parts = []
outputStream.on 'error', (err) ->
return callback(err)
outputStream.on 'end', () ->
logger.log {project_id, doc_id, pack_id}, "download from s3 completed"
try
object = JSON.parse parts.join('')
catch e
return callback(e)
object._id = ObjectId(object._id)
object.doc_id = ObjectId(object.doc_id)
object.project_id = ObjectId(object.project_id)
for op in object.pack
op._id = ObjectId(op._id) if op._id?
callback null, object
outputStream.on 'data', (data) ->
parts.push data
handleBulk: (ops, size, cb) ->
bulk = db.docHistory.initializeUnorderedBulkOp();
for op in ops
op._id = ObjectId(op._id)
op.doc_id = ObjectId(op.doc_id)
op.project_id = ObjectId(op.project_id)
bulk.find({_id:op._id}).upsert().updateOne(op)
if ops.length > 0
bulk.execute (err, result) ->
if err?
logger.error err:err, "error bulking ReadlineStream"
else
logger.log count:ops.length, result:result, size: size, "bulked ReadlineStream"
cb(err)
else
cb()
unArchivePack: (project_id, doc_id, pack_id, callback = (error) ->) ->
MongoAWS.readArchivedPack project_id, doc_id, pack_id, (err, object) ->
return callback(err) if err?
# allow the object to expire, we can always retrieve it again
object.expiresAt = new Date(Date.now() + 7 * DAYS)
logger.log {project_id, doc_id, pack_id}, "inserting object from s3"
db.docHistory.insert object, callback

View File

@@ -1,123 +0,0 @@
settings = require "settings-sharelatex"
child_process = require "child_process"
mongoUri = require "mongo-uri";
logger = require "logger-sharelatex"
AWS = require 'aws-sdk'
fs = require 'fs'
S3S = require 's3-streams'
module.exports = MongoAWSexternal =
archiveDocHistory: (project_id, doc_id, callback = (error) ->) ->
MongoAWS.mongoExportDocHistory doc_id, (error, filepath) ->
MongoAWS.s3upStream project_id, doc_id, filepath, callback
#delete temp file?
unArchiveDocHistory: (project_id, doc_id, callback = (error) ->) ->
MongoAWS.s3downStream project_id, doc_id, (error, filepath) ->
if error == null
MongoAWS.mongoImportDocHistory filepath, callback
#delete temp file?
else
callback
mongoExportDocHistory: (doc_id, callback = (error, filepath) ->) ->
uriData = mongoUri.parse(settings.mongo.url);
filepath = settings.path.dumpFolder + '/' + doc_id + '.jsonUp'
args = []
args.push '-h'
args.push uriData.hosts[0]
args.push '-d'
args.push uriData.database
args.push '-c'
args.push 'docHistory'
args.push '-q'
args.push "{doc_id: ObjectId('#{doc_id}') , expiresAt: {$exists : false} }"
args.push '-o'
args.push filepath
proc = child_process.spawn "mongoexport", args
proc.on "error", callback
stderr = ""
proc.stderr.on "data", (chunk) -> stderr += chunk.toString()
proc.on "close", (code) ->
if code == 0
return callback(null,filepath)
else
return callback(new Error("mongodump failed: #{stderr}"),null)
mongoImportDocHistory: (filepath, callback = (error) ->) ->
uriData = mongoUri.parse(settings.mongo.url);
args = []
args.push '-h'
args.push uriData.hosts[0]
args.push '-d'
args.push uriData.database
args.push '-c'
args.push 'docHistory'
args.push '--file'
args.push filepath
proc = child_process.spawn "mongoimport", args
proc.on "error", callback
stderr = ""
proc.stderr.on "data", (chunk) -> stderr += chunk.toString()
proc.on "close", (code) ->
if code == 0
return callback(null,filepath)
else
return callback(new Error("mongodump failed: #{stderr}"),null)
s3upStream: (project_id, doc_id, filepath, callback = (error) ->) ->
AWS.config.update {
accessKeyId: settings.filestore.s3.key
secretAccessKey: settings.filestore.s3.secret
}
upload = S3S.WriteStream new AWS.S3(), {
"Bucket": settings.filestore.stores.user_files,
"Key": project_id+"/changes-"+doc_id
}
fs.createReadStream(filepath)
.on 'open', (obj) ->
return 1
.pipe(upload)
.on 'finish', () ->
return callback(null)
.on 'error', (err) ->
return callback(err)
s3downStream: (project_id, doc_id, callback = (error, filepath) ->) ->
filepath = settings.path.dumpFolder + '/' + doc_id + '.jsonDown'
AWS.config.update {
accessKeyId: settings.filestore.s3.key
secretAccessKey: settings.filestore.s3.secret
}
download = S3S.ReadStream new AWS.S3(), {
"Bucket": settings.filestore.stores.user_files,
"Key": project_id+"/changes-"+doc_id
}
download
.on 'open', (obj) ->
return 1
.pipe(fs.createWriteStream(filepath))
.on 'finish', () ->
return callback(null, filepath)
.on 'error', (err) ->
return callback(err, null)

View File

@@ -31,81 +31,11 @@ module.exports = MongoManager =
else
return callback null, update, update.v
else
MongoManager.getArchivedDocStatus doc_id, (error, status) ->
PackManager.getLastPackFromIndex doc_id, (error, pack) ->
return callback(error) if error?
return callback(null, null, status.lastVersion) if status?.inS3? and status?.lastVersion?
return callback(null, null, pack.v_end) if pack?.inS3? and pack?.v_end?
callback null, null
insertCompressedUpdates: (project_id, doc_id, updates, temporary, callback = (error) ->) ->
jobs = []
for update in updates
do (update) ->
jobs.push (callback) -> MongoManager.insertCompressedUpdate project_id, doc_id, update, temporary, callback
async.series jobs, (err, results) ->
if not temporary
# keep track of updates to be packed
db.docHistoryStats.update {doc_id:ObjectId(doc_id)}, {
$inc:{update_count:updates.length},
$currentDate:{last_update:true}
}, {upsert:true}, () ->
callback(err,results)
else
callback(err,results)
modifyCompressedUpdate: (lastUpdate, newUpdate, callback = (error) ->) ->
return callback() if not newUpdate?
db.docHistory.findAndModify
query: lastUpdate,
update:
$set :
op: newUpdate.op
meta: newUpdate.meta
v: newUpdate.v
new: true
, (err, result, lastErrorObject) ->
return callback(error) if error?
return new Error("could not modify existing op") if not result?
callback(err, result)
insertCompressedUpdate: (project_id, doc_id, update, temporary, callback = (error) ->) ->
update = {
doc_id: ObjectId(doc_id.toString())
project_id: ObjectId(project_id.toString())
op: update.op
meta: update.meta
v: update.v
}
if temporary
seconds = 1000
minutes = 60 * seconds
hours = 60 * minutes
days = 24 * hours
update.expiresAt = new Date(Date.now() + 7 * days)
# may need to roll over a pack here if we are inserting packs
db.docHistory.insert update, callback
getDocUpdates:(doc_id, options = {}, callback = (error, updates) ->) ->
query =
doc_id: ObjectId(doc_id.toString())
if options.from?
query["v"] ||= {}
query["v"]["$gte"] = options.from
if options.to?
query["v"] ||= {}
query["v"]["$lte"] = options.to
PackManager.findDocResults(db.docHistory, query, options.limit, callback)
getProjectUpdates: (project_id, options = {}, callback = (error, updates) ->) ->
query =
project_id: ObjectId(project_id.toString())
if options.before?
query["meta.end_ts"] = { $lt: options.before }
PackManager.findProjectResults(db.docHistory, query, options.limit, callback)
backportProjectId: (project_id, doc_id, callback = (error) ->) ->
db.docHistory.update {
doc_id: ObjectId(doc_id.toString())
@@ -143,31 +73,3 @@ module.exports = MongoManager =
db.projectHistoryMetaData.ensureIndex { project_id: 1 }, { background: true }
# TTL index for auto deleting week old temporary ops
db.docHistory.ensureIndex { expiresAt: 1 }, { expireAfterSeconds: 0, background: true }
# For finding documents which need packing
db.docHistoryStats.ensureIndex { doc_id: 1 }, { background: true }
db.docHistoryStats.ensureIndex { updates: -1, doc_id: 1 }, { background: true }
getArchivedDocStatus: (doc_id, callback)->
db.docHistoryStats.findOne {doc_id: ObjectId(doc_id.toString()), inS3: {$exists:true}}, {inS3: true, lastVersion: true}, callback
getDocChangesCount: (doc_id, callback)->
db.docHistory.count { doc_id : ObjectId(doc_id.toString())}, callback
markDocHistoryAsArchiveInProgress: (doc_id, lastVersion, callback) ->
db.docHistoryStats.update {doc_id: ObjectId(doc_id.toString())}, {$set : {inS3: false, lastVersion: lastVersion}}, {upsert:true}, callback
clearDocHistoryAsArchiveInProgress: (doc_id, update, callback) ->
db.docHistoryStats.update {doc_id: ObjectId(doc_id.toString())}, {$unset : {inS3: true, lastVersion: true}}, callback
markDocHistoryAsArchived: (doc_id, lastVersion, callback)->
db.docHistoryStats.update {doc_id: ObjectId(doc_id.toString())}, {$set : {inS3: true}}, {upsert:true}, (error)->
return callback(error) if error?
# clear the archived entries from the docHistory now we have finally succeeded
db.docHistory.remove { doc_id : ObjectId(doc_id.toString()), v: {$lte : lastVersion}, expiresAt: {$exists : false} }, (error)->
return callback(error) if error?
callback(error)
markDocHistoryAsUnarchived: (doc_id, callback)->
# note this removes any inS3 field, regardless of its value (true/false/null)
db.docHistoryStats.update {doc_id: ObjectId(doc_id.toString())}, { $unset : { inS3: true, lastVersion: true} }, (error)->
callback(error)

View File

@@ -3,463 +3,49 @@ _ = require "underscore"
{db, ObjectId, BSON} = require "./mongojs"
logger = require "logger-sharelatex"
LockManager = require "./LockManager"
MongoAWS = require "./MongoAWS"
ProjectIterator = require "./ProjectIterator"
# Sharejs operations are stored in a 'pack' object
#
# e.g. a single sharejs update looks like
#
# {
# "doc_id" : 549dae9e0a2a615c0c7f0c98,
# "project_id" : 549dae9c0a2a615c0c7f0c8c,
# "op" : [ {"p" : 6981, "d" : "?" } ],
# "meta" : { "user_id" : 52933..., "start_ts" : 1422310693931, "end_ts" : 1422310693931 },
# "v" : 17082
# }
#
# and a pack looks like this
#
# {
# "doc_id" : 549dae9e0a2a615c0c7f0c98,
# "project_id" : 549dae9c0a2a615c0c7f0c8c,
# "pack" : [ U1, U2, U3, ...., UN],
# "meta" : { "user_id" : 52933..., "start_ts" : 1422310693931, "end_ts" : 1422310693931 },
# "v" : 17082
# "v_end" : ...
# }
#
# where U1, U2, U3, .... are single updates stripped of their
# doc_id and project_id fields (which are the same for all the
# updates in the pack).
#
# The pack itself has v and meta fields, this makes it possible to
# treat packs and single updates in a similar way.
#
# The v field of the pack itself is from the first entry U1, the
# v_end field from UN. The meta.end_ts field of the pack itself is
# from the last entry UN, the meta.start_ts field from U1.
DAYS = 24 * 3600 * 1000 # one day in milliseconds
module.exports = PackManager =
# The following functions implement methods like a mongo find, but
# expands any documents containing a 'pack' field into multiple
# values
#
# e.g. a single update looks like
#
# {
# "doc_id" : 549dae9e0a2a615c0c7f0c98,
# "project_id" : 549dae9c0a2a615c0c7f0c8c,
# "op" : [ {"p" : 6981, "d" : "?" } ],
# "meta" : { "user_id" : 52933..., "start_ts" : 1422310693931, "end_ts" : 1422310693931 },
# "v" : 17082
# }
#
# and a pack looks like this
#
# {
# "doc_id" : 549dae9e0a2a615c0c7f0c98,
# "project_id" : 549dae9c0a2a615c0c7f0c8c,
# "pack" : [ U1, U2, U3, ...., UN],
# "meta" : { "user_id" : 52933..., "start_ts" : 1422310693931, "end_ts" : 1422310693931 },
# "v" : 17082
# }
#
# where U1, U2, U3, .... are single updates stripped of their
# doc_id and project_id fields (which are the same for all the
# updates in the pack).
#
# The pack itself has v and meta fields, this makes it possible to
# treat packs and single updates in the same way.
#
# The v field of the pack itself is from the first entry U1
# The meta.end_ts field of the pack itself is from the last entry UN.
findDocResults: (collection, query, limit, callback) ->
# query - the mongo query selector, includes both the doc_id/project_id and
# the range on v
# limit - the mongo limit, we need to apply it after unpacking any
# packs
sort = {}
sort['v'] = -1;
cursor = collection
.find( query )
.sort( sort )
# if we have packs, we will trim the results more later after expanding them
if limit?
cursor.limit(limit)
# take the part of the query which selects the range over the parameter
rangeQuery = query['v']
# helper function to check if an item from a pack is inside the
# desired range
filterFn = (item) ->
return false if rangeQuery?['$gte']? && item['v'] < rangeQuery['$gte']
return false if rangeQuery?['$lte']? && item['v'] > rangeQuery['$lte']
return false if rangeQuery?['$lt']? && item['v'] >= rangeQuery['$lt']
return false if rangeQuery?['$gt']? && item['v'] <= rangeQuery['$gt']
return true
versionOrder = (a, b) ->
b.v - a.v
# create a query which can be used to select the entries BEFORE
# the range because we sometimes need to find extra ones (when the
# boundary falls in the middle of a pack)
extraQuery = _.clone(query)
# The pack uses its first entry for its metadata and v, so the
# only queries where we might not get all the packs are those for
# $gt and $gte (i.e. we need to find packs which start before our
# range but end in it)
if rangeQuery?['$gte']?
extraQuery['v'] = {'$lt' : rangeQuery['$gte']}
else if rangeQuery?['$gt']
extraQuery['v'] = {'$lte' : rangeQuery['$gt']}
else
delete extraQuery['v']
needMore = false # keep track of whether we need to load more data
updates = [] # used to accumulate the set of results
# FIXME: packs are big so we should accumulate the results
# incrementally instead of using .toArray() to avoid reading all
# of the changes into memory
cursor.toArray (err, result) ->
unpackedSet = PackManager._unpackResults(result)
updates = PackManager._filterAndLimit(updates, unpackedSet, filterFn, limit)
# check if we need to retrieve more data, because there is a
# pack that crosses into our range
last = if unpackedSet.length then unpackedSet[unpackedSet.length-1] else null
if limit? && updates.length == limit
needMore = false
else if extraQuery['v']? && last? && filterFn(last)
needMore = true
else if extraQuery['v']? && updates.length == 0
needMore = true
if needMore
# we do need an extra result set
extra = collection
.find(extraQuery)
.sort(sort)
.limit(1)
extra.toArray (err, result2) ->
if err?
return callback err, updates.sort versionOrder
else
extraSet = PackManager._unpackResults(result2)
updates = PackManager._filterAndLimit(updates, extraSet, filterFn, limit)
callback err, updates.sort versionOrder
return
if err?
callback err, result
else
callback err, updates.sort versionOrder
findProjectResults: (collection, query, limit, callback) ->
# query - the mongo query selector, includes both the doc_id/project_id and
# the range on meta.end_ts
# limit - the mongo limit, we need to apply it after unpacking any
# packs
sort = {}
sort['meta.end_ts'] = -1;
projection = {"op":false, "pack.op": false}
cursor = collection
.find( query, projection ) # no need to return the op only need version info
.sort( sort )
# if we have packs, we will trim the results more later after expanding them
if limit?
cursor.limit(limit)
# take the part of the query which selects the range over the parameter
before = query['meta.end_ts']?['$lt'] # may be null
updates = [] # used to accumulate the set of results
# FIXME: packs are big so we should accumulate the results
# incrementally instead of using .toArray() to avoid reading all
# of the changes into memory
cursor.toArray (err, result) ->
if err?
return callback err, result
if result.length == 0 && not before? # no results and no time range specified
return callback err, result
unpackedSet = PackManager._unpackResults(result)
if limit?
unpackedSet = unpackedSet.slice(0, limit)
# find the end time of the last result, we will take all the
# results up to this, and then all the changes at that time
# (without imposing a limit) and any overlapping packs
cutoff = if unpackedSet.length then unpackedSet[unpackedSet.length-1].meta.end_ts else null
filterFn = (item) ->
ts = item?.meta?.end_ts
return false if before? && ts >= before
return false if cutoff? && ts < cutoff
return true
timeOrder = (a, b) ->
(b.meta.end_ts - a.meta.end_ts) || documentOrder(a, b)
documentOrder = (a, b) ->
x = a.doc_id.valueOf()
y = b.doc_id.valueOf()
if x > y then 1 else if x < y then -1 else 0
updates = PackManager._filterAndLimit(updates, unpackedSet, filterFn, limit)
# get all elements on the lower bound (cutoff)
tailQuery = _.clone(query)
tailQuery['meta.end_ts'] = cutoff
tail = collection
.find(tailQuery, projection)
.sort(sort)
# now find any packs that overlap with the time window from outside
# cutoff before
# --|-----wanted-range--|------------------ time=>
# |-------------|pack(end_ts)
#
# these were not picked up by the original query because
# end_ts>before but the beginning of the pack may be in the time range
overlapQuery = _.clone(query)
if before? && cutoff?
overlapQuery['meta.end_ts'] = {"$gte": before}
overlapQuery['meta.start_ts'] = {"$lte": before }
else if before? && not cutoff?
overlapQuery['meta.end_ts'] = {"$gte": before}
overlapQuery['meta.start_ts'] = {"$lte": before }
else if not before? && cutoff?
overlapQuery['meta.end_ts'] = {"$gte": cutoff} # we already have these??
else if not before? && not cutoff?
overlapQuery['meta.end_ts'] = {"$gte": 0 } # shouldn't happen??
overlap = collection
.find(overlapQuery, projection)
.sort(sort)
# we don't specify a limit here, as there could be any number of overlaps
# NB. need to catch items in original query and followup query for duplicates
applyAndUpdate = (result) ->
extraSet = PackManager._unpackResults(result)
# note: final argument is null, no limit applied because we
# need all the updates at the final time to avoid breaking
# the changeset into parts
updates = PackManager._filterAndLimit(updates, extraSet, filterFn, null)
tail.toArray (err, result2) ->
if err?
return callback err, updates.sort timeOrder
else
applyAndUpdate result2
overlap.toArray (err, result3) ->
if err?
return callback err, updates.sort timeOrder
else
applyAndUpdate result3
callback err, updates.sort timeOrder
_unpackResults: (updates) ->
# iterate over the updates, if there's a pack, expand it into ops and
# insert it into the array at that point
result = []
updates.forEach (item) ->
if item.pack?
all = PackManager._explodePackToOps item
result = result.concat all
else
result.push item
return result
_explodePackToOps: (packObj) ->
# convert a pack into an array of ops
doc_id = packObj.doc_id
project_id = packObj.project_id
result = packObj.pack.map (item) ->
item.doc_id = doc_id
item.project_id = project_id
item
return result.reverse()
_filterAndLimit: (results, extra, filterFn, limit) ->
# update results with extra docs, after filtering and limiting
filtered = extra.filter(filterFn)
newResults = results.concat filtered
# remove duplicates
seen = {}
newResults = newResults.filter (item) ->
key = item.doc_id + ' ' + item.v
if seen[key]
return false
else
seen[key] = true
return true
newResults.slice(0, limit) if limit?
return newResults
MAX_SIZE: 1024*1024 # make these configurable parameters
MAX_COUNT: 512
convertDocsToPacks: (docs, callback) ->
packs = []
top = null
docs.forEach (d,i) ->
# skip existing packs
if d.pack?
top = null
return
sz = BSON.calculateObjectSize(d)
# decide if this doc can be added to the current pack
validLength = top? && (top.pack.length < PackManager.MAX_COUNT)
validSize = top? && (top.sz + sz < PackManager.MAX_SIZE)
bothPermanent = top? && (top.expiresAt? is false) && (d.expiresAt? is false)
bothTemporary = top? && (top.expiresAt? is true) && (d.expiresAt? is true)
within1Day = bothTemporary && (d.meta.start_ts - top.meta.start_ts < 24 * 3600 * 1000)
if top? && validLength && validSize && (bothPermanent || (bothTemporary && within1Day))
top.pack = top.pack.concat {v: d.v, meta: d.meta, op: d.op, _id: d._id}
top.sz += sz
top.n += 1
top.v_end = d.v
top.meta.end_ts = d.meta.end_ts
top.expiresAt = d.expiresAt if top.expiresAt?
return
else
# create a new pack
top = _.clone(d)
top.pack = [ {v: d.v, meta: d.meta, op: d.op, _id: d._id} ]
top.meta = { start_ts: d.meta.start_ts, end_ts: d.meta.end_ts }
top.sz = sz
top.n = 1
top.v_end = d.v
delete top.op
delete top._id
packs.push top
callback(null, packs)
checkHistory: (docs, callback) ->
errors = []
prev = null
error = (args...) ->
errors.push args
docs.forEach (d,i) ->
if d.pack?
n = d.pack.length
last = d.pack[n-1]
error('bad pack v_end', d) if d.v_end != last.v
error('bad pack start_ts', d) if d.meta.start_ts != d.pack[0].meta.start_ts
error('bad pack end_ts', d) if d.meta.end_ts != last.meta.end_ts
d.pack.forEach (p, i) ->
prev = v
v = p.v
error('bad version', v, 'in', p) if v <= prev
#error('expired op', p, 'in pack') if p.expiresAt?
else
prev = v
v = d.v
error('bad version', v, 'in', d) if v <= prev
if errors.length
callback(errors)
else
callback()
insertPack: (packObj, callback) ->
bulk = db.docHistory.initializeOrderedBulkOp()
doc_id = packObj.doc_id
expect_nInserted = 1
expect_nRemoved = packObj.pack.length
logger.log {doc_id: doc_id}, "adding pack, removing #{expect_nRemoved} ops"
bulk.insert packObj
ids = (op._id for op in packObj.pack)
bulk.find({_id:{$in:ids}}).remove()
bulk.execute (err, result) ->
if err?
logger.error {doc_id: doc_id}, "error adding pack"
callback(err, result)
else if result.nInserted != expect_nInserted or result.nRemoved != expect_nRemoved
logger.error {doc_id: doc_id, result}, "unexpected result adding pack"
callback(new Error(
msg: 'unexpected result'
expected: {expect_nInserted, expect_nRemoved}
), result)
else
db.docHistoryStats.update {doc_id:doc_id}, {
$inc:{update_count:-expect_nRemoved},
$currentDate:{last_packed:true}
}, {upsert:true}, () ->
callback(err, result)
# retrieve document ops/packs and check them
getDocHistory: (doc_id, callback) ->
db.docHistory.find({doc_id:ObjectId(doc_id)}).sort {v:1}, (err, docs) ->
return callback(err) if err?
# for safety, do a consistency check of the history
logger.log {doc_id}, "checking history for document"
PackManager.checkHistory docs, (err) ->
return callback(err) if err?
callback(err, docs)
#PackManager.deleteExpiredPackOps docs, (err) ->
# return callback(err) if err?
# callback err, docs
packDocHistory: (doc_id, options, callback) ->
if typeof callback == "undefined" and typeof options == 'function'
callback = options
options = {}
LockManager.runWithLock(
"HistoryLock:#{doc_id}",
(releaseLock) ->
PackManager._packDocHistory(doc_id, options, releaseLock)
, callback
)
_packDocHistory: (doc_id, options, callback) ->
logger.log {doc_id},"starting pack operation for document history"
PackManager.getDocHistory doc_id, (err, docs) ->
return callback(err) if err?
origDocs = 0
origPacks = 0
for d in docs
if d.pack? then origPacks++ else origDocs++
PackManager.convertDocsToPacks docs, (err, packs) ->
return callback(err) if err?
total = 0
for p in packs
total = total + p.pack.length
logger.log {doc_id, origDocs, origPacks, newPacks: packs.length, totalOps: total}, "document stats"
if packs.length
if options['dry-run']
logger.log {doc_id}, 'dry-run, skipping write packs'
return callback()
PackManager.savePacks packs, (err) ->
return callback(err) if err?
# check the history again
PackManager.getDocHistory doc_id, callback
else
logger.log {doc_id}, "no packs to write"
# keep a record that we checked this one to avoid rechecking it
db.docHistoryStats.update {doc_id:doc_id}, {
$currentDate:{last_checked:true}
}, {upsert:true}, () ->
callback null, null
DB_WRITE_DELAY: 100
savePacks: (packs, callback) ->
async.eachSeries packs, PackManager.safeInsert, (err, result) ->
if err?
logger.log {err, result}, "error writing packs"
callback err, result
else
callback()
safeInsert: (packObj, callback) ->
PackManager.insertPack packObj, (err, result) ->
setTimeout () ->
callback(err,result)
, PackManager.DB_WRITE_DELAY
deleteExpiredPackOps: (docs, callback) ->
now = Date.now()
toRemove = []
toUpdate = []
docs.forEach (d,i) ->
if d.pack?
newPack = d.pack.filter (op) ->
if op.expiresAt? then op.expiresAt > now else true
if newPack.length == 0
toRemove.push d
else if newPack.length < d.pack.length
# adjust the pack properties
d.pack = newPack
first = d.pack[0]
last = d.pack[d.pack.length - 1]
d.v_end = last.v
d.meta.start_ts = first.meta.start_ts
d.meta.end_ts = last.meta.end_ts
toUpdate.push d
if toRemove.length or toUpdate.length
bulk = db.docHistory.initializeOrderedBulkOp()
toRemove.forEach (pack) ->
console.log "would remove", pack
#bulk.find({_id:pack._id}).removeOne()
toUpdate.forEach (pack) ->
console.log "would update", pack
#bulk.find({_id:pack._id}).updateOne(pack);
bulk.execute callback
else
callback()
MAX_COUNT: 1024
insertCompressedUpdates: (project_id, doc_id, lastUpdate, newUpdates, temporary, callback = (error) ->) ->
return callback() if newUpdates.length == 0
@@ -509,7 +95,12 @@ module.exports = PackManager =
if temporary
newPack.expiresAt = new Date(Date.now() + 7 * DAYS)
logger.log {project_id, doc_id, newUpdates}, "inserting updates into new pack"
db.docHistory.insert newPack, callback
db.docHistory.save newPack, (err, result) ->
return callback(err) if err?
if temporary
return callback()
else
PackManager.updateIndex project_id, doc_id, callback
appendUpdatesToExistingPack: (project_id, doc_id, lastUpdate, newUpdates, temporary, callback = (error) ->) ->
first = newUpdates[0]
@@ -533,11 +124,330 @@ module.exports = PackManager =
if lastUpdate.expiresAt and temporary
update.$set.expiresAt = new Date(Date.now() + 7 * DAYS)
logger.log {project_id, doc_id, lastUpdate, newUpdates}, "appending updates to existing pack"
db.docHistory.findAndModify {query, update}, callback
db.docHistory.findAndModify {query, update, new:true, fields:{meta:1,v_end:1}}, callback
listDocs: (options, callback) ->
query = {"op.p":{$exists:true}}
query.doc_id = {$gt: ObjectId(options.doc_id)} if options.doc_id?
db.docHistory.find(query, {doc_id:true}).sort({doc_id:1}).limit (options.limit||100), (err, docs) ->
# Retrieve all changes for a document
getOpsByVersionRange: (project_id, doc_id, fromVersion, toVersion, callback = (error, updates) ->) ->
PackManager.loadPacksByVersionRange project_id, doc_id, fromVersion, toVersion, (error) ->
query = {doc_id:ObjectId(doc_id.toString())}
query.v = {$lte:toVersion} if toVersion?
query.v_end = {$gte:fromVersion} if fromVersion?
#console.log "query:", query
db.docHistory.find(query).sort {v:-1}, (err, result) ->
return callback(err) if err?
#console.log "getOpsByVersionRange:", err, result
updates = []
opInRange = (op, from, to) ->
return false if fromVersion? and op.v < fromVersion
return false if toVersion? and op.v > toVersion
return true
for docHistory in result
#console.log 'adding', docHistory.pack
for op in docHistory.pack.reverse() when opInRange(op, fromVersion, toVersion)
op.project_id = docHistory.project_id
op.doc_id = docHistory.doc_id
#console.log "added op", op.v, fromVersion, toVersion
updates.push op
callback(null, updates)
loadPacksByVersionRange: (project_id, doc_id, fromVersion, toVersion, callback) ->
PackManager.getIndex doc_id, (err, indexResult) ->
return callback(err) if err?
callback(null, docs)
indexPacks = indexResult?.packs or []
packInRange = (pack, from, to) ->
return false if fromVersion? and pack.v_end < fromVersion
return false if toVersion? and pack.v > toVersion
return true
neededIds = (pack._id for pack in indexPacks when packInRange(pack, fromVersion, toVersion))
PackManager.fetchPacksIfNeeded project_id, doc_id, neededIds, callback
fetchPacksIfNeeded: (project_id, doc_id, pack_ids, callback) ->
db.docHistory.find {_id: {$in: (ObjectId(id) for id in pack_ids)}}, {_id:1}, (err, loadedPacks) ->
return callback(err) if err?
allPackIds = (id.toString() for id in pack_ids)
loadedPackIds = (pack._id.toString() for pack in loadedPacks)
packIdsToFetch = _.difference allPackIds, loadedPackIds
logger.log {loadedPackIds, allPackIds, packIdsToFetch}, "analysed packs"
async.eachLimit packIdsToFetch, 4, (pack_id, cb) ->
MongoAWS.unArchivePack project_id, doc_id, pack_id, cb
, (err) ->
return callback(err) if err?
logger.log "done unarchiving"
callback()
# Retrieve all changes across a project
makeProjectIterator: (project_id, before, callback) ->
# get all the docHistory Entries
db.docHistory.find({project_id: ObjectId(project_id)},{pack:false}).sort {"meta.end_ts":-1}, (err, packs) ->
return callback(err) if err?
allPacks = []
seenIds = {}
for pack in packs
allPacks.push pack
seenIds[pack._id] = true
db.docHistoryIndex.find {project_id: ObjectId(project_id)}, (err, indexes) ->
return callback(err) if err?
for index in indexes
for pack in index.packs when not seenIds[pack._id]
pack.project_id = index.project_id
pack.doc_id = index._id
pack.fromIndex = true
allPacks.push pack
seenIds[pack._id] = true
callback(null, new ProjectIterator(allPacks, before, PackManager.getPackById))
getPackById: (project_id, doc_id, pack_id, callback) ->
db.docHistory.findOne {_id: pack_id}, (err, pack) ->
return callback(err) if err?
if not pack?
MongoAWS.unArchivePack project_id, doc_id, pack_id, callback
else if pack.expiresAt?
# we only need to touch the TTL on the listing of changes in the project
# because diffs on individual documents are always done after that
PackManager.increaseTTL pack, callback
else
callback(null, pack)
increaseTTL: (pack, callback) ->
if pack.expiresAt < new Date(Date.now() + 6 * DAYS)
# update cache expiry since we are using this pack
db.docHistory.findAndModify {
query: {_id: pack._id}
update: {$set: {expiresAt: new Date(Date.now() + 7 * DAYS)}}
}, (err) ->
return callback(err, pack)
else
callback(null, pack)
# Manage docHistoryIndex collection
getIndex: (doc_id, callback) ->
db.docHistoryIndex.findOne {_id:ObjectId(doc_id.toString())}, callback
getPackFromIndex: (doc_id, pack_id, callback) ->
db.docHistoryIndex.findOne {_id:ObjectId(doc_id.toString()), "packs._id": pack_id}, {"packs.$":1}, callback
getLastPackFromIndex: (doc_id, callback) ->
db.docHistoryIndex.findOne {_id: ObjectId(doc_id.toString())}, {packs:{$slice:-1}}, (err, indexPack) ->
return callback(err) if err?
return callback() if not indexPack?
callback(null,indexPack[0])
getIndexWithKeys: (doc_id, callback) ->
PackManager.getIndex doc_id, (err, index) ->
return callback(err) if err?
return callback() if not index?
for pack in index?.packs or []
index[pack._id] = pack
callback(null, index)
initialiseIndex: (project_id, doc_id, callback) ->
PackManager.findCompletedPacks project_id, doc_id, (err, packs) ->
#console.log 'err', err, 'packs', packs, packs?.length
return callback(err) if err?
return callback() if not packs?
PackManager.insertPacksIntoIndexWithLock project_id, doc_id, packs, callback
updateIndex: (project_id, doc_id, callback) ->
# find all packs prior to current pack
PackManager.findUnindexedPacks project_id, doc_id, (err, newPacks) ->
return callback(err) if err?
return callback() if not newPacks?
PackManager.insertPacksIntoIndexWithLock project_id, doc_id, newPacks, (err) ->
return callback(err) if err?
logger.log {project_id, doc_id, newPacks}, "added new packs to index"
callback()
findCompletedPacks: (project_id, doc_id, callback) ->
query = { doc_id: ObjectId(doc_id.toString()) }
db.docHistory.find(query, {pack:false}).sort {v:1}, (err, packs) ->
return callback(err) if err?
return callback() if not packs?
return callback() if packs?.length <= 1
packs.pop() # discard the last pack, it's still in progress
callback(null, packs)
# findPacks: (project_id, doc_id, queryFilter, callback) ->
# query = { doc_id: ObjectId(doc_id.toString()) }
# query = _.defaults query, queryFilter if queryFilter?
# db.docHistory.find(query, {pack:false}).sort {v:1}, callback
findUnindexedPacks: (project_id, doc_id, callback) ->
PackManager.getIndexWithKeys doc_id, (err, indexResult) ->
return callback(err) if err?
PackManager.findCompletedPacks project_id, doc_id, (err, historyPacks) ->
return callback(err) if err?
return callback() if not historyPacks?
# select only the new packs not already in the index
newPacks = (pack for pack in historyPacks when not indexResult[pack._id]?)
newPacks = (_.omit(pack, 'doc_id', 'project_id', 'n', 'sz') for pack in newPacks)
logger.log {project_id, doc_id, n: newPacks.length}, "found new packs"
callback(null, newPacks)
insertPacksIntoIndexWithLock: (project_id, doc_id, newPacks, callback) ->
LockManager.runWithLock(
"HistoryIndexLock:#{doc_id}",
(releaseLock) ->
PackManager._insertPacksIntoIndex project_id, doc_id, newPacks, releaseLock
callback
)
_insertPacksIntoIndex: (project_id, doc_id, newPacks, callback) ->
db.docHistoryIndex.findAndModify {
query: {_id:ObjectId(doc_id.toString())}
update:
$setOnInsert: project_id: ObjectId(project_id.toString())
$push:
packs: {$each: newPacks, $sort: {v: 1}}
upsert: true
}, callback
# Archiving packs to S3
archivePack: (project_id, doc_id, pack_id, callback) ->
clearFlagOnError = (err, cb) ->
if err? # clear the inS3 flag on error
PackManager.clearPackAsArchiveInProgress project_id, doc_id, pack_id, (err2) ->
return cb(err2) if err2?
return cb(err)
else
cb()
async.series [
(cb) ->
PackManager.checkArchiveNotInProgress project_id, doc_id, pack_id, cb
(cb) ->
PackManager.markPackAsArchiveInProgress project_id, doc_id, pack_id, cb
(cb) ->
MongoAWS.archivePack project_id, doc_id, pack_id, (err) ->
clearFlagOnError(err, cb)
(cb) ->
PackManager.checkArchivedPack project_id, doc_id, pack_id, (err) ->
clearFlagOnError(err, cb)
(cb) ->
PackManager.markPackAsArchived project_id, doc_id, pack_id, cb
(cb) ->
PackManager.setTTLOnArchivedPack project_id, doc_id, pack_id, callback
], callback
checkArchivedPack: (project_id, doc_id, pack_id, callback) ->
db.docHistory.findOne {_id: pack_id}, (err, pack) ->
return callback(err) if err?
return callback new Error("pack not found") if not pack?
MongoAWS.readArchivedPack project_id, doc_id, pack_id, (err, result) ->
delete result.last_checked
delete pack.last_checked
# need to compare ids as ObjectIds with .equals()
for key in ['_id', 'project_id', 'doc_id']
result[key] = pack[key] if result[key].equals(pack[key])
for op, i in result.pack
op._id = pack.pack[i]._id if op._id? and op._id.equals(pack.pack[i]._id)
if _.isEqual pack, result
callback()
else
logger.err {pack, result, jsondiff: JSON.stringify(pack) is JSON.stringify(result)}, "difference when comparing packs"
callback new Error("pack retrieved from s3 does not match pack in mongo")
# Processing old packs via worker
processOldPack: (project_id, doc_id, pack_id, callback) ->
markAsChecked = (err) ->
PackManager.markPackAsChecked project_id, doc_id, pack_id, (err2) ->
return callback(err2) if err2?
callback(err)
logger.log {project_id, doc_id}, "processing old packs"
db.docHistory.findOne {_id:pack_id}, (err, pack) ->
return markAsChecked(err) if err?
return markAsChecked() if not pack?
return callback() if pack.expiresAt? # return directly
PackManager.updateIndexIfNeeded project_id, doc_id, (err) ->
return markAsChecked(err) if err?
PackManager.findUnarchivedPacks project_id, doc_id, (err, unarchivedPacks) ->
return markAsChecked(err) if err?
if not unarchivedPacks?.length
logger.log "no packs need archiving"
return markAsChecked()
async.eachSeries unarchivedPacks, (pack, cb) ->
PackManager.archivePack project_id, doc_id, pack._id, cb
, (err) ->
return markAsChecked(err) if err?
logger.log "done processing"
markAsChecked()
updateIndexIfNeeded: (project_id, doc_id, callback) ->
logger.log {project_id, doc_id}, "archiving old packs"
PackManager.getIndexWithKeys doc_id, (err, index) ->
return callback(err) if err?
if not index?
PackManager.initialiseIndex project_id, doc_id, callback
else
PackManager.updateIndex project_id, doc_id, callback
markPackAsChecked: (project_id, doc_id, pack_id, callback) ->
logger.log {project_id, doc_id, pack_id}, "marking pack as checked"
db.docHistory.findAndModify {
query: {_id: pack_id}
update: {$currentDate: {"last_checked":true}}
}, callback
findUnarchivedPacks: (project_id, doc_id, callback) ->
PackManager.getIndex doc_id, (err, indexResult) ->
return callback(err) if err?
indexPacks = indexResult?.packs or []
unArchivedPacks = (pack for pack in indexPacks when not pack.inS3?)
logger.log {project_id, doc_id, n: unArchivedPacks.length}, "find unarchived packs"
callback(null, unArchivedPacks)
# Archive locking flags
checkArchiveNotInProgress: (project_id, doc_id, pack_id, callback) ->
logger.log {project_id, doc_id, pack_id}, "checking if archive in progress"
PackManager.getPackFromIndex doc_id, pack_id, (err, result) ->
return callback(err) if err?
return callback new Error("pack not found in index") if not result?
if result.inS3?
callback new Error("pack archiving already in progress")
else
callback()
markPackAsArchiveInProgress: (project_id, doc_id, pack_id, callback) ->
logger.log {project_id, doc_id}, "marking pack as archive in progress status"
db.docHistoryIndex.findAndModify {
query: {_id:ObjectId(doc_id.toString()), packs: {$elemMatch: {"_id": pack_id, inS3: {$exists:false}}}}
fields: { "packs.$": 1 }
update: {$set: {"packs.$.inS3":false}}
}, (err, result) ->
return callback(err) if err?
return callback new Error("archive is already in progress") if not result?
logger.log {project_id, doc_id, pack_id}, "marked as archive in progress"
callback()
clearPackAsArchiveInProgress: (project_id, doc_id, pack_id, callback) ->
logger.log {project_id, doc_id, pack_id}, "clearing as archive in progress"
db.docHistoryIndex.findAndModify {
query: {_id:ObjectId(doc_id.toString()), "packs" : {$elemMatch: {"_id": pack_id, inS3: false}}}
fields: { "packs.$": 1 }
update: {$unset: {"packs.$.inS3":true}}
}, callback
markPackAsArchived: (project_id, doc_id, pack_id, callback) ->
logger.log {project_id, doc_id, pack_id}, "marking pack as archived"
db.docHistoryIndex.findAndModify {
query: {_id:ObjectId(doc_id.toString()), "packs" : {$elemMatch: {"_id": pack_id, inS3: false}}}
fields: { "packs.$": 1 }
update: {$set: {"packs.$.inS3":true}}
}, (err, result) ->
return callback(err) if err?
return callback new Error("archive is not marked as progress") if not result?
logger.log {project_id, doc_id, pack_id}, "marked as archived"
callback()
setTTLOnArchivedPack: (project_id, doc_id, pack_id, callback) ->
db.docHistory.findAndModify {
query: {_id: pack_id}
update: {$set: {expiresAt: new Date(Date.now() + 1*DAYS)}}
}, (err) ->
logger.log {project_id, doc_id, pack_id}, "set expiry on pack"
callback()

View File

@@ -7,11 +7,13 @@ logger.initialize("track-changes-packworker")
if Settings.sentry?.dsn?
logger.initializeErrorReporting(Settings.sentry.dsn)
DAYS = 24 * 3600 * 1000
LockManager = require "./LockManager"
PackManager = require "./PackManager"
# this worker script is forked by the main process to look for
# document histories which can be packed
# document histories which can be archived
LIMIT = Number(process.argv[2]) || 1000
DOCUMENT_PACK_DELAY = Number(process.argv[3]) || 1000
@@ -24,7 +26,7 @@ shutDownTimer = setTimeout () ->
shutDownRequested = true
# do a hard shutdown after a further 5 minutes
setTimeout () ->
logger.error "HARD TIMEOUT in pack worker"
logger.error "HARD TIMEOUT in pack archive worker"
process.exit()
, 5*60*1000
, TIMEOUT
@@ -47,54 +49,59 @@ finish = () ->
db.close () ->
logger.log 'closing LockManager Redis Connection'
LockManager.close () ->
logger.log 'ready to exit from pack worker'
logger.log 'ready to exit from pack archive worker'
hardTimeout = setTimeout () ->
logger.error 'hard exit from pack worker'
logger.error 'hard exit from pack archive worker'
process.exit(1)
, 5*1000
hardTimeout.unref()
process.on 'exit', (code) ->
logger.log {code}, 'pack worker exited'
logger.log {code}, 'pack archive worker exited'
processUpdates = (pending) ->
async.eachSeries pending, (doc_id, callback) ->
PackManager.packDocHistory doc_id, (err, result) ->
async.eachSeries pending, (result, callback) ->
{_id, project_id, doc_id} = result
if not project_id? or not doc_id?
logger.log {project_id, doc_id}, "skipping pack, missing project/doc id"
return callback()
PackManager.processOldPack project_id, doc_id, _id, (err, result) ->
if err?
logger.error {err, result}, "error in pack worker"
logger.error {err, result}, "error in pack archive worker"
return callback(err)
if shutDownRequested
logger.error "shutting down pack worker"
logger.error "shutting down pack archive worker"
return callback(new Error("shutdown"))
setTimeout () ->
callback(err, result)
, DOCUMENT_PACK_DELAY
, (err, results) ->
if err? and err.message != "shutdown"
logger.error {err}, 'error in pack worker processUpdates'
logger.error {err}, 'error in pack archive worker processUpdates'
finish()
# find the documents which can be packed, by checking the number of
# unpacked updates in the docHistoryStats collection
# find the packs which can be archived
db.docHistoryStats.find({
update_count: {$gt : PackManager.MIN_COUNT}
}).sort({
update_count:-1
ObjectIdFromDate = (date) ->
id = Math.floor(date.getTime() / 1000).toString(16) + "0000000000000000";
return ObjectId(id)
# new approach, two passes
# find packs to be marked as finalised:true, those which have a newer pack present
# then only consider finalised:true packs for archiving
db.docHistory.find({
expiresAt: {$exists: false}
project_id: {$exists: true}
v_end: {$exists: true}
_id: {$lt: ObjectIdFromDate(new Date(Date.now() - 7 * DAYS))}
}, {_id:1, doc_id:1, project_id:1}).sort({
last_checked:1
}).limit LIMIT, (err, results) ->
if err?
logger.log {err}, 'error checking for updates'
finish()
return
results = _.filter results, (doc) ->
if doc.last_checked? and doc.last_checked > doc.last_update
# skip documents which don't have any updates since last check
return false
else if doc.last_packed? and doc.last_packed > doc.last_update
# skip documents which don't have any updates since last pack
return false
else
return true
pending = _.pluck results, 'doc_id'
logger.log "found #{pending.length} documents to pack"
pending = _.uniq results, false, (result) -> result.doc_id.toString()
logger.log "found #{pending.length} documents to archive"
processUpdates pending

View File

@@ -0,0 +1,66 @@
async = require "async"
_ = require "underscore"
{db, ObjectId, BSON} = require "./mongojs"
logger = require "logger-sharelatex"
Heap = require "heap"
module.exports = ProjectIterator =
class ProjectIterator
constructor: (packs, @before, @getPackByIdFn) ->
byEndTs = (a,b) -> (b.meta.end_ts - a.meta.end_ts) || (a.fromIndex - b.fromIndex)
@packs = packs.slice().sort byEndTs
@queue = new Heap(byEndTs)
next: (callback) ->
# what's up next
#console.log ">>> top item", iterator.packs[0]
iterator = this
before = @before
queue = iterator.queue
opsToReturn = []
nextPack = iterator.packs[0]
lowWaterMark = nextPack?.meta.end_ts || 0
nextItem = queue.peek()
#console.log "queue empty?", queue.empty()
#console.log "nextItem", nextItem
#console.log "nextItem.meta.end_ts", nextItem?.meta.end_ts
#console.log "lowWaterMark", lowWaterMark
while before? and nextPack?.meta.start_ts > before
# discard pack that is outside range
iterator.packs.shift()
nextPack = iterator.packs[0]
lowWaterMark = nextPack?.meta.end_ts || 0
if (queue.empty() or nextItem?.meta.end_ts <= lowWaterMark) and nextPack?
# retrieve the next pack and populate the queue
return @getPackByIdFn nextPack.project_id, nextPack.doc_id, nextPack._id, (err, pack) ->
return callback(err) if err?
iterator.packs.shift() # have now retrieved this pack, remove it
#console.log "got pack", pack
for op in pack.pack when (not before? or op.meta.end_ts < before)
#console.log "adding op", op
op.doc_id = nextPack.doc_id
op.project_id = nextPack.project_id
queue.push op
# now try again
return iterator.next(callback)
#console.log "nextItem", nextItem, "lowWaterMark", lowWaterMark
while nextItem? and (nextItem?.meta.end_ts > lowWaterMark)
opsToReturn.push nextItem
queue.pop()
nextItem = queue.peek()
#console.log "queue empty?", queue.empty()
#console.log "nextPack", nextPack?
if queue.empty() and not nextPack? # got everything
iterator._done = true
callback(null, opsToReturn)
done: () ->
return @_done

View File

@@ -7,7 +7,6 @@ WebApiManager = require "./WebApiManager"
UpdateTrimmer = require "./UpdateTrimmer"
logger = require "logger-sharelatex"
async = require "async"
DocArchiveManager = require "./DocArchiveManager"
_ = require "underscore"
Settings = require "settings-sharelatex"
@@ -17,6 +16,8 @@ module.exports = UpdatesManager =
if length == 0
return callback()
# FIXME: we no longer need the lastCompressedUpdate, so change functions not to need it
# CORRECTION: we do use it to log the time in case of error
MongoManager.peekLastCompressedUpdate doc_id, (error, lastCompressedUpdate, lastVersion) ->
# lastCompressedUpdate is the most recent update in Mongo, and
# lastVersion is its sharejs version number.
@@ -58,46 +59,10 @@ module.exports = UpdatesManager =
logger.error err: error, doc_id: doc_id, project_id: project_id, size: size, rawUpdate: rawUpdate, "dropped op - too big"
rawUpdate.op = []
if (not lastCompressedUpdate?) or lastCompressedUpdate.pack? # handle pack append as a special case
UpdatesManager._updatePack project_id, doc_id, rawUpdates, temporary, lastCompressedUpdate, lastVersion, callback
else #use the existing op code
UpdatesManager._updateOp project_id, doc_id, rawUpdates, temporary, lastCompressedUpdate, lastVersion, callback
_updatePack: (project_id, doc_id, rawUpdates, temporary, lastCompressedUpdate, lastVersion, callback) ->
compressedUpdates = UpdateCompressor.compressRawUpdates null, rawUpdates
PackManager.insertCompressedUpdates project_id, doc_id, lastCompressedUpdate, compressedUpdates, temporary, (error, result) ->
return callback(error) if error?
logger.log {project_id, doc_id, orig_v: lastCompressedUpdate?.v, new_v: result.v}, "inserted updates into pack" if result?
callback()
_updateOp: (project_id, doc_id, rawUpdates, temporary, lastCompressedUpdate, lastVersion, callback) ->
compressedUpdates = UpdateCompressor.compressRawUpdates lastCompressedUpdate, rawUpdates
if not lastCompressedUpdate?
# no existing update, insert everything
updateToModify = null
updatesToInsert = compressedUpdates
else
# there are existing updates, see what happens when we
# compress them together with the new ones
[firstUpdate, additionalUpdates...] = compressedUpdates
if firstUpdate.v == lastCompressedUpdate.v and _.isEqual(firstUpdate, lastCompressedUpdate)
# first update version hasn't changed, skip it and insert remaining updates
# this is an optimisation, we could update the existing op with itself
updateToModify = null
updatesToInsert = additionalUpdates
else
# first update version did changed, modify it and insert remaining updates
updateToModify = firstUpdate
updatesToInsert = additionalUpdates
MongoManager.modifyCompressedUpdate lastCompressedUpdate, updateToModify, (error, result) ->
return callback(error) if error?
logger.log {project_id, doc_id, orig_v: lastCompressedUpdate.v, new_v: result.v}, "applied update in-place" if result?
MongoManager.insertCompressedUpdates project_id, doc_id, updatesToInsert, temporary,(error) ->
compressedUpdates = UpdateCompressor.compressRawUpdates null, rawUpdates
PackManager.insertCompressedUpdates project_id, doc_id, lastCompressedUpdate, compressedUpdates, temporary, (error, result) ->
return callback(error) if error?
logger.log project_id: project_id, doc_id: doc_id, rawUpdatesLength: rawUpdates.length, compressedUpdatesLength: compressedUpdates.length, "compressed doc updates"
logger.log {project_id, doc_id, orig_v: lastCompressedUpdate?.v, new_v: result.v}, "inserted updates into pack" if result?
callback()
REDIS_READ_BATCH_SIZE: 100
@@ -151,7 +116,12 @@ module.exports = UpdatesManager =
getDocUpdates: (project_id, doc_id, options = {}, callback = (error, updates) ->) ->
UpdatesManager.processUncompressedUpdatesWithLock project_id, doc_id, (error) ->
return callback(error) if error?
MongoManager.getDocUpdates doc_id, options, callback
#console.log "options", options
PackManager.getOpsByVersionRange project_id, doc_id, options.from, options.to, (error, updates) ->
return callback(error) if error?
UpdatesManager.fillUserInfo updates, (err, results) ->
return callback(err) if err?
callback null, results
getDocUpdatesWithUserInfo: (project_id, doc_id, options = {}, callback = (error, updates) ->) ->
UpdatesManager.getDocUpdates project_id, doc_id, options, (error, updates) ->
@@ -160,98 +130,83 @@ module.exports = UpdatesManager =
return callback(error) if error?
callback null, updates
getProjectUpdates: (project_id, options = {}, callback = (error, updates) ->) ->
UpdatesManager.processUncompressedUpdatesForProject project_id, (error) ->
return callback(error) if error?
MongoManager.getProjectUpdates project_id, options, (error, updates) ->
jobs = []
for update in updates
if update.inS3
do (update) ->
jobs.push (callback) -> DocArchiveManager.unArchiveDocChanges update.project_id, update.doc_id, callback
if jobs.length?
async.series jobs, (err) ->
MongoManager.getProjectUpdates project_id, options, callback
else
callback(error, updates)
getProjectUpdatesWithUserInfo: (project_id, options = {}, callback = (error, updates) ->) ->
UpdatesManager.getProjectUpdates project_id, options, (error, updates) ->
return callback(error) if error?
UpdatesManager.fillUserInfo updates, (error, updates) ->
return callback(error) if error?
callback null, updates
getSummarizedProjectUpdates: (project_id, options = {}, callback = (error, updates) ->) ->
options.min_count ||= 25
summarizedUpdates = []
before = options.before
do fetchNextBatch = () ->
UpdatesManager._extendBatchOfSummarizedUpdates project_id, summarizedUpdates, before, options.min_count, (error, updates, nextBeforeUpdate) ->
return callback(error) if error?
if !nextBeforeUpdate? or updates.length >= options.min_count
callback null, updates, nextBeforeUpdate
else
before = nextBeforeUpdate
summarizedUpdates = updates
fetchNextBatch()
_extendBatchOfSummarizedUpdates: (
project_id,
existingSummarizedUpdates,
before, desiredLength,
callback = (error, summarizedUpdates, endOfDatabase) ->
) ->
UpdatesManager.getProjectUpdatesWithUserInfo project_id, { before: before, limit: 3 * desiredLength }, (error, updates) ->
nextBeforeTimestamp = null
UpdatesManager.processUncompressedUpdatesForProject project_id, (error) ->
return callback(error) if error?
PackManager.makeProjectIterator project_id, before, (err, iterator) ->
return callback(err) if err?
# repeatedly get updates and pass them through the summariser to get an final output with user info
async.whilst () ->
#console.log "checking iterator.done", iterator.done()
return summarizedUpdates.length < options.min_count and not iterator.done()
, (cb) ->
iterator.next (err, partialUpdates) ->
return callback(err) if err?
#logger.log {partialUpdates}, 'got partialUpdates'
return cb() if partialUpdates.length is 0 ## FIXME should try to avoid this happening
nextBeforeTimestamp = partialUpdates[partialUpdates.length - 1].meta.end_ts
# add the updates to the summary list
summarizedUpdates = UpdatesManager._summarizeUpdates partialUpdates, summarizedUpdates
cb()
, () ->
# finally done all updates
#console.log 'summarized Updates', summarizedUpdates
UpdatesManager.fillSummarizedUserInfo summarizedUpdates, (err, results) ->
return callback(err) if err?
callback null, results, if not iterator.done() then nextBeforeTimestamp else undefined
# Suppose in this request we have fetch the solid updates. In the next request we need
# to fetch the dotted updates. These are defined by having an end timestamp less than
# the last update's end timestamp (updates are ordered by descending end_ts). I.e.
# start_ts--v v--end_ts
# doc1: |......| |...| |-------|
# doc2: |------------------|
# ^----- Next time, fetch all updates with an
# end_ts less than this
#
if updates? and updates.length > 0
nextBeforeTimestamp = updates[updates.length - 1].meta.end_ts
if nextBeforeTimestamp >= before
error = new Error("history order is broken")
logger.error err: error, project_id:project_id, nextBeforeTimestamp: nextBeforeTimestamp, before:before, "error in project history"
return callback(error)
else
nextBeforeTimestamp = null
summarizedUpdates = UpdatesManager._summarizeUpdates(
updates, existingSummarizedUpdates
)
callback null,
summarizedUpdates,
nextBeforeTimestamp
fillUserInfo: (updates, callback = (error, updates) ->) ->
users = {}
for update in updates
if UpdatesManager._validUserId(update.meta.user_id)
users[update.meta.user_id] = true
fetchUserInfo: (users, callback = (error, fetchedUserInfo) ->) ->
jobs = []
fetchedUserInfo = {}
for user_id of users
do (user_id) ->
jobs.push (callback) ->
WebApiManager.getUserInfo user_id, (error, userInfo) ->
return callback(error) if error?
users[user_id] = userInfo
fetchedUserInfo[user_id] = userInfo
callback()
async.series jobs, (error) ->
async.series jobs, (err) ->
return callback(err) if err?
callback(null, fetchedUserInfo)
fillUserInfo: (updates, callback = (error, updates) ->) ->
users = {}
for update in updates
user_id = update.meta.user_id
if UpdatesManager._validUserId(user_id)
users[user_id] = true
UpdatesManager.fetchUserInfo users, (error, fetchedUserInfo) ->
return callback(error) if error?
for update in updates
user_id = update.meta.user_id
delete update.meta.user_id
if UpdatesManager._validUserId(user_id)
update.meta.user = users[user_id]
update.meta.user = fetchedUserInfo[user_id]
callback null, updates
fillSummarizedUserInfo: (updates, callback = (error, updates) ->) ->
users = {}
for update in updates
user_ids = update.meta.user_ids or []
for user_id in user_ids
if UpdatesManager._validUserId(user_id)
users[user_id] = true
UpdatesManager.fetchUserInfo users, (error, fetchedUserInfo) ->
return callback(error) if error?
for update in updates
user_ids = update.meta.user_ids or []
update.meta.users = []
delete update.meta.user_ids
for user_id in user_ids
if UpdatesManager._validUserId(user_id)
update.meta.users.push fetchedUserInfo[user_id]
callback null, updates
_validUserId: (user_id) ->
@@ -260,7 +215,6 @@ module.exports = UpdatesManager =
else
return !!user_id.match(/^[a-f0-9]{24}$/)
TIME_BETWEEN_DISTINCT_UPDATES: fiveMinutes = 5 * 60 * 1000
_summarizeUpdates: (updates, existingSummarizedUpdates = []) ->
summarizedUpdates = existingSummarizedUpdates.slice()
@@ -269,13 +223,7 @@ module.exports = UpdatesManager =
if earliestUpdate and earliestUpdate.meta.start_ts - update.meta.end_ts < @TIME_BETWEEN_DISTINCT_UPDATES
# check if the user in this update is already present in the earliest update,
# if not, add them to the users list of the earliest update
userExists = false
for user in earliestUpdate.meta.users
if (!user and !update.meta.user) or (user?.id == update.meta.user?.id)
userExists = true
break
if !userExists
earliestUpdate.meta.users.push update.meta.user
earliestUpdate.meta.user_ids = _.union earliestUpdate.meta.user_ids, update.meta.user_id
doc_id = update.doc_id.toString()
doc = earliestUpdate.docs[doc_id]
@@ -292,7 +240,7 @@ module.exports = UpdatesManager =
else
newUpdate =
meta:
users: []
user_ids: []
start_ts: update.meta.start_ts
end_ts: update.meta.end_ts
docs: {}
@@ -300,7 +248,7 @@ module.exports = UpdatesManager =
newUpdate.docs[update.doc_id.toString()] =
fromV: update.v
toV: update.v
newUpdate.meta.users.push update.meta.user
newUpdate.meta.user_ids.push update.meta.user_id
summarizedUpdates.push newUpdate
return summarizedUpdates

View File

@@ -1,7 +1,7 @@
Settings = require "settings-sharelatex"
mongojs = require "mongojs"
bson = require "bson"
db = mongojs(Settings.mongo.url, ["docHistory", "projectHistoryMetaData", "docHistoryStats"])
db = mongojs(Settings.mongo.url, ["docHistory", "projectHistoryMetaData", "docHistoryIndex"])
module.exports =
db: db
ObjectId: mongojs.ObjectId

View File

@@ -23,7 +23,8 @@
"underscore": "~1.7.0",
"mongo-uri": "^0.1.2",
"s3-streams": "^0.3.0",
"JSONStream": "^1.0.4"
"JSONStream": "^1.0.4",
"heap": "^0.2.6"
},
"devDependencies": {
"chai": "~1.9.0",

View File

@@ -9,12 +9,12 @@ describe "MongoAWS", ->
beforeEach ->
@MongoAWS = SandboxedModule.require modulePath, requires:
"settings-sharelatex": @settings =
filestore:
trackchanges:
s3:
secret: "s3-secret"
key: "s3-key"
stores:
user_files: "s3-bucket"
doc_history: "s3-bucket"
"child_process": @child_process = {}
"mongo-uri": @mongouri = {}
"logger-sharelatex": @logger = {log: sinon.stub(), error: sinon.stub(), err:->}

View File

@@ -101,66 +101,6 @@ describe "MongoManager", ->
@callback.calledWith(null, null, @update.v).should.equal true
describe "insertCompressedUpdate", ->
beforeEach ->
@update = { op: "op", meta: "meta", v: "v"}
@db.docHistory =
insert: sinon.stub().callsArg(1)
describe "temporarly", ->
beforeEach ->
@MongoManager.insertCompressedUpdate @project_id, @doc_id, @update, true, @callback
it "should insert the update with a expiresAt field one week away", ->
@db.docHistory.insert
.calledWith({
project_id: ObjectId(@project_id),
doc_id: ObjectId(@doc_id),
op: @update.op,
meta: @update.meta,
v: @update.v
expiresAt: new Date(Date.now() + 7 * 24 * 60 * 60 * 1000)
})
.should.equal true
it "should call the callback", ->
@callback.called.should.equal true
describe "permanenty", ->
beforeEach ->
@MongoManager.insertCompressedUpdate @project_id, @doc_id, @update, false, @callback
it "should insert the update with no expiresAt field", ->
@db.docHistory.insert
.calledWith({
project_id: ObjectId(@project_id),
doc_id: ObjectId(@doc_id),
op: @update.op,
meta: @update.meta,
v: @update.v
})
.should.equal true
it "should call the callback", ->
@callback.called.should.equal true
describe "insertCompressedUpdates", ->
beforeEach (done) ->
@updates = [ "mock-update-1", "mock-update-2" ]
@MongoManager.insertCompressedUpdate = sinon.stub().callsArg(4)
@MongoManager.insertCompressedUpdates @project_id, @doc_id, @updates, @temporary = true, (args...) =>
@callback(args...)
done()
it "should insert each update", ->
for update in @updates
@MongoManager.insertCompressedUpdate
.calledWith(@project_id, @doc_id, update, @temporary)
.should.equal true
it "should call the callback", ->
@callback.called.should.equal true
describe "getDocUpdates", ->
beforeEach ->
@results = [

View File

@@ -233,184 +233,3 @@ describe "PackManager", ->
it "should call the callback", ->
@callback.called.should.equal true
describe "convertDocsToPacks", ->
describe "with several small ops", ->
beforeEach ->
@ops = [
{ _id: "3", op: "op-3", meta: {start_ts: "ts3_s", end_ts: "ts3_e", user_id: "u3"}, v: 3},
{ _id: "4", op: "op-4", meta: {start_ts: "ts4_s", end_ts: "ts4_e", user_id: "u4"}, v: 4},
]
@PackManager.convertDocsToPacks @ops, @callback
it "should create a single pack", (done) ->
@PackManager.convertDocsToPacks @ops, (err, packs) =>
assert.deepEqual packs, [ {
pack: [
{ _id: "3", op: "op-3", meta: {start_ts: "ts3_s", end_ts: "ts3_e", user_id: "u3"}, v: 3},
{ _id: "4", op: "op-4", meta: {start_ts: "ts4_s", end_ts: "ts4_e", user_id: "u4"}, v: 4},
]
v: 3
v_end: 4
meta: {start_ts: "ts3_s", end_ts: "ts4_e"}
n: 2
sz: 202
}]
done()
it "should call the callback", ->
@callback.called.should.equal true
describe "with many small ops", ->
beforeEach ->
@ops = ({ _id: "#{n}", op: "op-#{n}", meta: {start_ts: "ts#{n}_s", end_ts: "ts#{n}_e", user_id: "u#{n}"}, v: n} for n in [0...1024])
@PackManager.convertDocsToPacks @ops, @callback
it "should create two packs", (done) ->
@PackManager.convertDocsToPacks @ops, (err, packs) =>
assert.deepEqual packs, [ {
pack: @ops[0...512]
v: 0
v_end: 511
meta: {start_ts: "ts0_s", end_ts: "ts511_e"}
n: 512
sz: 56282
}, {
pack: @ops[512...1024]
v: 512
v_end: 1023
meta: {start_ts: "ts512_s", end_ts: "ts1023_e"}
n: 512
sz: 56952
}]
done()
it "should call the callback", ->
@callback.called.should.equal true
describe "with many temporary ops", ->
beforeEach ->
@ops = ({ _id: "#{n}", op: "op-#{n}", meta: {start_ts: n, end_ts: n+1, user_id: "u#{n}"}, v: n, expiresAt: n+24*3600*1000 } for n in [0...1024])
@PackManager.convertDocsToPacks @ops, @callback
it "should create two packs", (done) ->
@PackManager.convertDocsToPacks @ops, (err, packs) =>
assert.deepEqual packs, [ {
pack: (_.omit(op, "expiresAt") for op in @ops[0...512])
v: 0
v_end: 511
meta: {start_ts: 0 , end_ts: 512}
n: 512
sz: 55990
expiresAt: @ops[511].expiresAt
}, {
pack: (_.omit(op, "expiresAt") for op in @ops[512...1024])
v: 512
v_end: 1023
meta: {start_ts: 512, end_ts: 1024}
n: 512
sz: 56392
expiresAt: @ops[1023].expiresAt
}]
done()
it "should call the callback", ->
@callback.called.should.equal true
describe "with temporary ops spanning more than 1 day", ->
beforeEach ->
@ops = ({ _id: "#{n}", op: "op-#{n}", meta: {start_ts: n*3600*1000, end_ts: n*3600*1000+1, user_id: "u#{n}"}, v: n, expiresAt: n+24*3600*1000 } for n in [0...48])
@PackManager.convertDocsToPacks @ops, @callback
it "should create two packs", (done) ->
@PackManager.convertDocsToPacks @ops, (err, packs) =>
assert.deepEqual packs, [ {
pack: (_.omit(op, "expiresAt") for op in @ops[0...24])
v: 0
v_end: 23
meta: {start_ts: 0 , end_ts: 23*3600*1000+1}
n: 24
sz: 2538
expiresAt: @ops[23].expiresAt
}, {
pack: (_.omit(op, "expiresAt") for op in @ops[24...48])
v: 24
v_end: 47
meta: {start_ts: 24*3600*1000, end_ts: 47*3600*1000+1}
n: 24
sz: 2568
expiresAt: @ops[47].expiresAt
}]
done()
it "should call the callback", ->
@callback.called.should.equal true
describe "with mixture of temporary and permanent ops", ->
beforeEach ->
@ops = ({ _id: "#{n}", op: "op-#{n}", meta: {start_ts: n*3600*1000, end_ts: n*3600*1000+1, user_id: "u#{n}"}, v: n, expiresAt: n+24*3600*1000 } for n in [0...48])
for n in [10...48]
delete @ops[n].expiresAt
@PackManager.convertDocsToPacks @ops, @callback
it "should create two packs", (done) ->
@PackManager.convertDocsToPacks @ops, (err, packs) =>
assert.deepEqual packs, [ {
pack: (_.omit(op, "expiresAt") for op in @ops[0...10])
v: 0
v_end: 9
meta: {start_ts: 0 , end_ts: 9*3600*1000+1}
n: 10
sz: 1040
expiresAt: @ops[9].expiresAt
}, {
pack: (_.omit(op, "expiresAt") for op in @ops[10...48])
v: 10
v_end: 47
meta: {start_ts: 10*3600*1000, end_ts: 47*3600*1000+1}
n: 38
sz: 3496
}]
done()
it "should call the callback", ->
@callback.called.should.equal true
describe "with mixture of temporary and permanent ops and an existing pack", ->
beforeEach ->
@ops = ({ _id: "#{n}", op: "op-#{n}", meta: {start_ts: n*3600*1000, end_ts: n*3600*1000+1, user_id: "u#{n}"}, v: n, expiresAt: n+24*3600*1000 } for n in [0...48])
for n in [10...48]
delete @ops[n].expiresAt
# convert op 16 into a pack
@ops[16].pack = [ @ops[16].op ]
delete @ops[16].op
@PackManager.convertDocsToPacks @ops, @callback
it "should create three packs", (done) ->
@PackManager.convertDocsToPacks @ops, (err, packs) =>
assert.deepEqual packs, [ {
pack: (_.omit(op, "expiresAt") for op in @ops[0...10])
v: 0
v_end: 9
meta: {start_ts: 0 , end_ts: 9*3600*1000+1}
n: 10
sz: 1040
expiresAt: @ops[9].expiresAt
}, {
pack: @ops[10...16]
v: 10
v_end: 15
meta: {start_ts: 10*3600*1000, end_ts: 15*3600*1000+1}
n: 6
sz: 552
}, {
pack: @ops[17...48]
v: 17
v_end: 47
meta: {start_ts: 17*3600*1000, end_ts: 47*3600*1000+1}
n: 31
sz: 2852
}]
done()
it "should call the callback", ->
@callback.called.should.equal true

View File

@@ -26,12 +26,10 @@ describe "UpdatesManager", ->
describe "when there are no raw ops", ->
beforeEach ->
@MongoManager.peekLastCompressedUpdate = sinon.stub()
@MongoManager.insertCompressedUpdates = sinon.stub()
@UpdatesManager.compressAndSaveRawUpdates @project_id, @doc_id, [], @temporary, @callback
it "should not need to access the database", ->
@MongoManager.peekLastCompressedUpdate.called.should.equal false
@MongoManager.insertCompressedUpdates.called.should.equal false
it "should call the callback", ->
@callback.called.should.equal true
@@ -65,8 +63,6 @@ describe "UpdatesManager", ->
@compressedUpdates = [ { v: 12, op: "compressed-op-11+12" }, { v: 13, op: "compressed-op-12" } ]
@MongoManager.peekLastCompressedUpdate = sinon.stub().callsArgWith(1, null, @lastCompressedUpdate, @lastCompressedUpdate.v)
@MongoManager.modifyCompressedUpdate = sinon.stub().callsArg(2)
@MongoManager.insertCompressedUpdates = sinon.stub().callsArg(4)
@PackManager.insertCompressedUpdates = sinon.stub().callsArg(5)
@UpdateCompressor.compressRawUpdates = sinon.stub().returns(@compressedUpdates)
@@ -80,19 +76,14 @@ describe "UpdatesManager", ->
.calledWith(@doc_id)
.should.equal true
it "should compress the last compressed op and the raw ops", ->
it "should compress the raw ops", ->
@UpdateCompressor.compressRawUpdates
.calledWith(@lastCompressedUpdate, @rawUpdates)
.calledWith(null, @rawUpdates)
.should.equal true
it "should update the existing op", ->
@MongoManager.modifyCompressedUpdate
.calledWith(@lastCompressedUpdate, @compressedUpdates[0])
.should.equal true
it "should save the new compressed ops", ->
@MongoManager.insertCompressedUpdates
.calledWith(@project_id, @doc_id, @compressedUpdates[1..], @temporary)
it "should save the new compressed ops into a pack", ->
@PackManager.insertCompressedUpdates
.calledWith(@project_id, @doc_id, @lastCompressedUpdate, @compressedUpdates, @temporary)
.should.equal true
it "should call the callback", ->
@@ -130,7 +121,7 @@ describe "UpdatesManager", ->
it "should only compress the more recent raw ops", ->
@UpdateCompressor.compressRawUpdates
.calledWith(@lastCompressedUpdate, @rawUpdates.slice(-2))
.calledWith(null, @rawUpdates.slice(-2))
.should.equal true
describe "when the raw ops do not follow from the last compressed op version", ->
@@ -143,11 +134,8 @@ describe "UpdatesManager", ->
.calledWith(new Error("Tried to apply raw op at version 13 to last compressed update with version 11"))
.should.equal true
it "should not modify any update in mongo", ->
@MongoManager.modifyCompressedUpdate.called.should.equal false
it "should not insert any update into mongo", ->
@MongoManager.insertCompressedUpdates.called.should.equal false
@PackManager.insertCompressedUpdates.called.should.equal false
describe "when the raw ops need appending to existing history which is in S3", ->
beforeEach ->