Merge pull request #36 from sharelatex/bg-atomic-history-update

atomic history update
This commit is contained in:
Brian Gough
2017-05-09 13:15:54 +01:00
committed by GitHub
11 changed files with 75 additions and 72 deletions
@@ -21,16 +21,17 @@ module.exports = HistoryManager =
error = new Error("track changes api returned a failure status code: #{res.statusCode}")
return callback(error)
FLUSH_EVERY_N_OPS: 50
pushUncompressedHistoryOps: (project_id, doc_id, ops = [], callback = (error) ->) ->
FLUSH_EVERY_N_OPS: 100
recordAndFlushHistoryOps: (project_id, doc_id, ops = [], length, callback = (error) ->) ->
if ops.length == 0
return callback()
HistoryRedisManager.pushUncompressedHistoryOps project_id, doc_id, ops, (error, length) ->
HistoryRedisManager.recordDocHasHistoryOps project_id, doc_id, ops, (error) ->
return callback(error) if error?
# We want to flush every 50 ops, i.e. 50, 100, 150, etc
# Find out which 'block' (i.e. 0-49, 50-99) we were in before and after pushing these
# ops. If we've changed, then we've gone over a multiple of 50 and should flush.
# (Most of the time, we will only hit 50 and then flushing will put us back to 0)
return callback() if not length? # don't flush unless we know the length
# We want to flush every 100 ops, i.e. 100, 200, 300, etc
# Find out which 'block' (i.e. 0-99, 100-199) we were in before and after pushing these
# ops. If we've changed, then we've gone over a multiple of 100 and should flush.
# (Most of the time, we will only hit 100 and then flushing will put us back to 0)
previousLength = length - ops.length
prevBlock = Math.floor(previousLength / HistoryManager.FLUSH_EVERY_N_OPS)
newBlock = Math.floor(length / HistoryManager.FLUSH_EVERY_N_OPS)
@@ -41,4 +42,4 @@ module.exports = HistoryManager =
HistoryManager.flushDocChanges project_id, doc_id, (error) ->
if error?
logger.error err: error, doc_id: doc_id, project_id: project_id, "error flushing doc to track changes api"
callback()
callback()
@@ -5,16 +5,10 @@ async = require "async"
logger = require('logger-sharelatex')
module.exports = HistoryRedisManager =
pushUncompressedHistoryOps: (project_id, doc_id, ops = [], callback = (error, length) ->) ->
recordDocHasHistoryOps: (project_id, doc_id, ops = [], callback = (error) ->) ->
if ops.length == 0
return callback(new Error("cannot push no ops")) # This should never be called with no ops, but protect against a redis error if we sent an empty array to rpush
opVersions = ops.map (op) -> op?.v
logger.log project_id: project_id, doc_id: doc_id, op_versions: opVersions, "pushing uncompressed history ops"
jsonOps = ops.map (op) -> JSON.stringify op
async.parallel [
(cb) -> rclient.rpush Keys.uncompressedHistoryOps({doc_id}), jsonOps..., cb
(cb) -> rclient.sadd Keys.docsWithHistoryOps({project_id}), doc_id, cb
], (error, results) ->
logger.log project_id: project_id, doc_id: doc_id, "marking doc in project for history ops"
rclient.sadd Keys.docsWithHistoryOps({project_id}), doc_id, (error) ->
return callback(error) if error?
[length, _] = results
callback(error, length)
callback()
@@ -25,6 +25,7 @@ MEGABYTES = 1024 * 1024
MAX_RANGES_SIZE = 3 * MEGABYTES
keys = Settings.redis.documentupdater.key_schema
historyKeys = Settings.redis.history.key_schema
module.exports = RedisManager =
rclient: rclient
@@ -167,32 +168,37 @@ module.exports = RedisManager =
logger.error err: error, doc_id: doc_id, newDocLines: newDocLines, error.message
return callback(error)
newHash = RedisManager._computeHash(newDocLines)
logger.log doc_id: doc_id, version: newVersion, hash: newHash, "updating doc in redis"
opVersions = appliedOps.map (op) -> op?.v
logger.log doc_id: doc_id, version: newVersion, hash: newHash, op_versions: opVersions, "updating doc in redis"
RedisManager._serializeRanges ranges, (error, ranges) ->
if error?
logger.error {err: error, doc_id}, error.message
return callback(error)
multi = rclient.multi()
multi.eval setScript, 1, keys.docLines(doc_id:doc_id), newDocLines
multi.set keys.docVersion(doc_id:doc_id), newVersion
multi.set keys.docHash(doc_id:doc_id), newHash
if jsonOps.length > 0
multi.rpush keys.docOps(doc_id: doc_id), jsonOps...
multi.expire keys.docOps(doc_id: doc_id), RedisManager.DOC_OPS_TTL
multi.ltrim keys.docOps(doc_id: doc_id), -RedisManager.DOC_OPS_MAX_LENGTH, -1
multi.eval setScript, 1, keys.docLines(doc_id:doc_id), newDocLines # index 0
multi.set keys.docVersion(doc_id:doc_id), newVersion # index 1
multi.set keys.docHash(doc_id:doc_id), newHash # index 2
multi.expire keys.docOps(doc_id: doc_id), RedisManager.DOC_OPS_TTL # index 3
multi.ltrim keys.docOps(doc_id: doc_id), -RedisManager.DOC_OPS_MAX_LENGTH, -1 # index 4
if ranges?
multi.set keys.ranges(doc_id:doc_id), ranges
multi.set keys.ranges(doc_id:doc_id), ranges # index 5
else
multi.del keys.ranges(doc_id:doc_id)
multi.del keys.ranges(doc_id:doc_id) # also index 5
# push the ops last so we can get the lengths at fixed index positions 6 and 7
if jsonOps.length > 0
multi.rpush keys.docOps(doc_id: doc_id), jsonOps... # index 6
multi.rpush historyKeys.uncompressedHistoryOps(doc_id: doc_id), jsonOps... # index 7
multi.exec (error, result) ->
return callback(error) if error?
# check the hash computed on the redis server
writeHash = result?[0]
if logHashWriteErrors and writeHash? and writeHash isnt newHash
logger.error doc_id: doc_id, writeHash: writeHash, origHash: newHash, docLines:newDocLines, "hash mismatch on updateDocument"
return callback()
# return length of uncompressedHistoryOps queue (index 7)
uncompressedHistoryOpsLength = result?[7]
return callback(null, uncompressedHistoryOpsLength)
getDocIdsInProject: (project_id, callback = (error, doc_ids) ->) ->
rclient.smembers keys.docsInProject(project_id: project_id), callback
@@ -61,9 +61,9 @@ module.exports = UpdateManager =
return callback(error) if error?
RangesManager.applyUpdate project_id, doc_id, ranges, appliedOps, updatedDocLines, (error, new_ranges) ->
return callback(error) if error?
RedisManager.updateDocument doc_id, updatedDocLines, version, appliedOps, new_ranges, (error) ->
RedisManager.updateDocument doc_id, updatedDocLines, version, appliedOps, new_ranges, (error, historyOpsLength) ->
return callback(error) if error?
HistoryManager.pushUncompressedHistoryOps project_id, doc_id, appliedOps, callback
HistoryManager.recordAndFlushHistoryOps project_id, doc_id, appliedOps, historyOpsLength, callback
lockUpdatesAndDo: (method, project_id, doc_id, args..., callback) ->
LockManager.getLock doc_id, (error, lockValue) ->
@@ -207,7 +207,7 @@ describe "Applying updates to a doc", ->
before (done) ->
[@project_id, @doc_id] = [DocUpdaterClient.randomId(), DocUpdaterClient.randomId()]
updates = []
for v in [0..99] # Should flush after 50 ops
for v in [0..199] # Should flush after 100 ops
updates.push
doc_id: @doc_id,
op: [i: v.toString(), p: 0]
@@ -219,7 +219,7 @@ describe "Applying updates to a doc", ->
# Send updates in chunks to causes multiple flushes
actions = []
for i in [0..9]
for i in [0..19]
do (i) =>
actions.push (cb) =>
DocUpdaterClient.sendUpdates @project_id, @doc_id, updates.slice(i*10, (i+1)*10), cb
@@ -40,18 +40,18 @@ describe "HistoryManager", ->
it "should return the callback with an error", ->
@callback.calledWith(new Error("track changes api return non-success code: 500")).should.equal true
describe "pushUncompressedHistoryOps", ->
describe "recordAndFlushHistoryOps", ->
beforeEach ->
@ops = ["mock-ops"]
@HistoryManager.flushDocChanges = sinon.stub().callsArg(2)
describe "pushing the op", ->
beforeEach ->
@HistoryRedisManager.pushUncompressedHistoryOps = sinon.stub().callsArgWith(3, null, 1)
@HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, @callback
@HistoryRedisManager.recordDocHasHistoryOps = sinon.stub().callsArgWith(3, null)
@HistoryManager.recordAndFlushHistoryOps @project_id, @doc_id, @ops, 1, @callback
it "should push the ops into redis", ->
@HistoryRedisManager.pushUncompressedHistoryOps
@HistoryRedisManager.recordDocHasHistoryOps
.calledWith(@project_id, @doc_id, @ops)
.should.equal true
@@ -63,9 +63,9 @@ describe "HistoryManager", ->
describe "when we hit a multiple of FLUSH_EVERY_N_OPS ops", ->
beforeEach ->
@HistoryRedisManager.pushUncompressedHistoryOps =
sinon.stub().callsArgWith(3, null, 2 * @HistoryManager.FLUSH_EVERY_N_OPS)
@HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, @callback
@HistoryRedisManager.recordDocHasHistoryOps =
sinon.stub().callsArgWith(3, null)
@HistoryManager.recordAndFlushHistoryOps @project_id, @doc_id, @ops, 2 * @HistoryManager.FLUSH_EVERY_N_OPS,@callback
it "should tell the track changes api to flush", ->
@HistoryManager.flushDocChanges
@@ -75,9 +75,9 @@ describe "HistoryManager", ->
describe "when we go over a multiple of FLUSH_EVERY_N_OPS ops", ->
beforeEach ->
@ops = ["op1", "op2", "op3"]
@HistoryRedisManager.pushUncompressedHistoryOps =
sinon.stub().callsArgWith(3, null, 2 * @HistoryManager.FLUSH_EVERY_N_OPS + 1)
@HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, @callback
@HistoryRedisManager.recordDocHasHistoryOps =
sinon.stub().callsArgWith(3, null)
@HistoryManager.recordAndFlushHistoryOps @project_id, @doc_id, @ops, 2 * @HistoryManager.FLUSH_EVERY_N_OPS + 1, @callback
it "should tell the track changes api to flush", ->
@HistoryManager.flushDocChanges
@@ -86,10 +86,10 @@ describe "HistoryManager", ->
describe "when HistoryManager errors", ->
beforeEach ->
@HistoryRedisManager.pushUncompressedHistoryOps =
sinon.stub().callsArgWith(3, null, 2 * @HistoryManager.FLUSH_EVERY_N_OPS)
@HistoryRedisManager.recordDocHasHistoryOps =
sinon.stub().callsArgWith(3, null)
@HistoryManager.flushDocChanges = sinon.stub().callsArgWith(2, @error = new Error("oops"))
@HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, @callback
@HistoryManager.recordAndFlushHistoryOps @project_id, @doc_id, @ops, 2 * @HistoryManager.FLUSH_EVERY_N_OPS, @callback
it "should log out the error", ->
@logger.error
@@ -103,10 +103,10 @@ describe "HistoryManager", ->
describe "with no ops", ->
beforeEach ->
@HistoryRedisManager.pushUncompressedHistoryOps = sinon.stub().callsArgWith(3, null, 1)
@HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, [], @callback
@HistoryRedisManager.recordDocHasHistoryOps = sinon.stub().callsArgWith(3, null)
@HistoryManager.recordAndFlushHistoryOps @project_id, @doc_id, [], 1, @callback
it "should not call HistoryRedisManager.pushUncompressedHistoryOps", ->
@HistoryRedisManager.pushUncompressedHistoryOps.called.should.equal false
it "should not call HistoryRedisManager.recordDocHasHistoryOps", ->
@HistoryRedisManager.recordDocHasHistoryOps.called.should.equal false
@@ -24,42 +24,28 @@ describe "HistoryRedisManager", ->
@project_id = "project-id-123"
@callback = sinon.stub()
describe "pushUncompressedHistoryOps", ->
describe "recordDocHasHistoryOps", ->
beforeEach ->
@ops = [{ op: [{ i: "foo", p: 4 }] },{ op: [{ i: "bar", p: 56 }] }]
@rclient.rpush = sinon.stub().yields(null, @length = 42)
@rclient.sadd = sinon.stub().yields()
describe "with ops", ->
beforeEach (done) ->
@HistoryRedisManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, (args...) =>
@HistoryRedisManager.recordDocHasHistoryOps @project_id, @doc_id, @ops, (args...) =>
@callback(args...)
done()
it "should push the doc op into the doc ops list as JSON", ->
@rclient.rpush
.calledWith("UncompressedHistoryOps:#{@doc_id}", JSON.stringify(@ops[0]), JSON.stringify(@ops[1]))
.should.equal true
it "should add the doc_id to the set of which records the project docs", ->
@rclient.sadd
.calledWith("DocsWithHistoryOps:#{@project_id}", @doc_id)
.should.equal true
it "should call the callback with the length", ->
@callback.calledWith(null, @length).should.equal true
describe "with no ops", ->
beforeEach (done) ->
@HistoryRedisManager.pushUncompressedHistoryOps @project_id, @doc_id, [], (args...) =>
@HistoryRedisManager.recordDocHasHistoryOps @project_id, @doc_id, [], (args...) =>
@callback(args...)
done()
it "should not push the doc op into the doc ops list as JSON", ->
@rclient.rpush
.called
.should.equal false
it "should not add the doc_id to the set of which records the project docs", ->
@rclient.sadd
.called
@@ -19,6 +19,12 @@ describe 'LockManager - releasing the lock', ()->
error:->
"redis-sharelatex":
createClient : () => @client
"settings-sharelatex": {
redis:
lock:
key_schema:
blockingKey: ({doc_id}) -> "Blocking:#{doc_id}"
}
"./Metrics": {inc: () ->}
@LockManager = SandboxedModule.require(modulePath, requires: mocks)
@lockValue = "lock-value-stub"
@@ -13,6 +13,12 @@ describe 'LockManager - trying the lock', ->
auth:->
set: @set = sinon.stub()
"./Metrics": {inc: () ->}
"settings-sharelatex": {
redis:
lock:
key_schema:
blockingKey: ({doc_id}) -> "Blocking:#{doc_id}"
}
@callback = sinon.stub()
@doc_id = "doc-id-123"
@@ -19,7 +19,7 @@ describe "RedisManager", ->
documentupdater: {logHashErrors: {write:true, read:true}}
redis:
documentupdater:
key_schema:
key_schema:
blockingKey: ({doc_id}) -> "Blocking:#{doc_id}"
docLines: ({doc_id}) -> "doclines:#{doc_id}"
docOps: ({doc_id}) -> "DocOps:#{doc_id}"
@@ -29,6 +29,10 @@ describe "RedisManager", ->
pendingUpdates: ({doc_id}) -> "PendingUpdates:#{doc_id}"
docsInProject: ({project_id}) -> "DocsIn:#{project_id}"
ranges: ({doc_id}) -> "Ranges:#{doc_id}"
history:
key_schema:
uncompressedHistoryOps: ({doc_id}) -> "UncompressedHistoryOps:#{doc_id}"
docsWithHistoryOps: ({project_id}) -> "DocsWithHistoryOps:#{project_id}"
}
"redis-sharelatex":
createClient: () => @rclient
@@ -166,7 +166,7 @@ describe "UpdateManager", ->
@ShareJsUpdateManager.applyUpdate = sinon.stub().yields(null, @updatedDocLines, @version, @appliedOps)
@RedisManager.updateDocument = sinon.stub().yields()
@RealTimeRedisManager.sendData = sinon.stub()
@HistoryManager.pushUncompressedHistoryOps = sinon.stub().callsArg(3)
@HistoryManager.recordAndFlushHistoryOps = sinon.stub().callsArg(4)
describe "normally", ->
beforeEach ->
@@ -188,7 +188,7 @@ describe "UpdateManager", ->
.should.equal true
it "should push the applied ops into the history queue", ->
@HistoryManager.pushUncompressedHistoryOps
@HistoryManager.recordAndFlushHistoryOps
.calledWith(@project_id, @doc_id, @appliedOps)
.should.equal true