Cockroach: perf: asynchronously apply Raft entries, decouple from append/commit stage

Created on 8 Aug 2017  Â·  28Comments  Â·  Source: cockroachdb/cockroach

EDIT: The original optimization proposed here was implemented in https://github.com/cockroachdb/cockroach/pull/38954. See https://github.com/cockroachdb/cockroach/issues/17500#issuecomment-425689048 for the remainder of this issue.

Raft, along with most consensus protocols in the Paxos family, distinguishes committed entries from executed entries. An entry is committed when the Raft protocol has completed and the entry has been sufficiently replicated and persisted in the Raft log. The entry is then executed when it is applied to a given replica's state machine. This distinction is important because in cases where the execution of an entry onto the state machine does not produce a result, it is not necessary to wait for execution before sending a commit notification to clients.

Currently in Cockroach, the proposing replica waits until command execution before responding to clients. However, changes made for PropEval KV assured that all MVCC related logic is made upstream of Raft and that by the time we reach entry execution, we're simply applying a WriteBatch to RockDB. While some of these execution steps can create ReplicaCorruptionErrors, I don't think it's necessary or possibly even correct that we attach these errors to the proposal response itself. This is because the entry has already been replicated through Raft, so a local ReplicaCorruptionError doesn't mean that all replicas are corrupted or that the command failed. In fact, after looking at the code I don't think that proposalResult needs anything populated at execution time for correctness. Because of this, I have a suspicion that all of the time spent in applyRaftCommand, including writing to RocksDB and performing stats computations, is unnecessary latency.

Preliminary results from a (very) rough draft of the change show a 2-3% improvement on average latency for the KV benchmark with a --batch size of 4:

_Without change (batch=4):_

_elapsed___errors_____ops(total)___ops/sec(cum)__avg(ms)__p50(ms)__p95(ms)__p99(ms)_pMax(ms)
  600.0s        0         597164          995.3     32.2     32.5     41.9     65.0    939.5

BenchmarkBlocks   597164       1004753.2 ns/op

_With change (batch=4):_

_elapsed___errors_____ops(total)___ops/sec(cum)__avg(ms)__p50(ms)__p95(ms)__p99(ms)_pMax(ms)
  600.0s        0         612652         1021.1     31.4     30.4     41.9     58.7    604.0

BenchmarkBlocks   612652        979351.4 ns/op

With a --batch size of 40, the results were even more pronounced:

_Without change (batch=40):_

_elapsed___errors_____ops(total)___ops/sec(cum)__avg(ms)__p50(ms)__p95(ms)__p99(ms)_pMax(ms)
  300.0s        0         616040         2053.4    134.0     54.5    151.0   2415.9  10200.5

BenchmarkBlocks   616040        486990.5 ns/op

_With change (batch=40):_

_elapsed___errors_____ops(total)___ops/sec(cum)__avg(ms)__p50(ms)__p95(ms)__p99(ms)_pMax(ms)
  300.0s        0         708040         2360.1    114.7     54.5    130.0   1140.9  10200.5

BenchmarkBlocks   708040        423713.8 ns/op

Note that while I think we can respond to clients sooner, I don't think we can pop the command out of the CommandQueue until after it has actually been executed because of how proposer evaluated kv works upstream of Raft.

@tschottdorf @bdarnell

A-kv-replication C-performance

Most helpful comment

I threw together a prototype that does exactly this. It acks all clients for committed entries at the beginning of handleRaftReadyRaftMuLocked instead of at the end. This gives about an 8% throughput win and an 8% p50 latency win on kv0 running on a 3-node cluster of n1-standard-16 machines. Looks like a worthwhile change.

All 28 comments

But there's a lease check in processRaftCommand (which is "downstream" of Raft). The result depends on that check...

The lease check happens before applyRaftCommand is called though, which is what I'm trying to avoid waiting on. We can still check the lease before replying like we do now while avoiding the extra latency created by the majority of entry execution. Somewhat accidentally, this is exactly what my PoC branch does.

I can't think of anything off-hand that would break with this change, but @tschottdorf and @bdarnell know this area of code the best.

I think it would be fine to respond to the client before applying the write batch. I'm not sure it's OK to do so before we update the local HLC with the command's timestamp, so I'd move it down a little further.

How does this interact with @irfansharif's #16624? Applying the write batch should be faster when we've decoupled it from the raft log and its synchronous writes, so the benefit of this change may be smaller.

Mentioned this to @nvanbenschoten in person: a further optimization here would be to respond to all of the committed commands before applying any of the associated write batches. I have an abandoned PR that combined the Raft batches. Probably worth taking another look at it: https://github.com/cockroachdb/cockroach/pull/15648

@petermattis the only thing to be careful with there is that none of the batches' corresponding commands can be removed from the CommandQueue until we apply the merged WriteBatch. I like the idea though and will investigate because there's some serious overlap here.

I'm not sure how this interacts with #16624, but I would also expect it to reduce the benefit we can gain from this change. Still, right now we're just leaving extra latency on the table, which could be especially detrimental to large Raft batches.

Note that #16624 isn't making it into 1.1 due to the modest improvement combined with the stability concerns. We should keep #16624 in mind while making changes, but certainly not use it to block nearer term wins.

OK, if we've decided to punt #16624 into 1.2, this seems like a good idea.

Another area for investigation is handling applying Raft commands differently on followers vs the leader. Followers need to keep track of the commit Raft log index, but they don't actually need to apply the commands until they become the leader. At the very least this suggests there are opportunities for batching of the command application.

they don't actually need to apply the commands until they become the leader

They need to be caught up (at least with respect to all ChangeReplicas commits) to become a candidate, not just the leader. And I think even as a follower, falling too far behind on ChangeReplicas can be a bad thing. But yes, queueing up changes and batching them on followers can be helpful.

Another possibility: Currently, in Replica.handleRaftReady, we write all new log entries and the HardState to disk before sending any messages. This is conservative; some messages can be sent concurrently with the write to disk and this would improve latency by allowing the followers to start their disk syncs before the leader has completed its.

Specifically, a MsgApp can be sent before the Entries it contains have been synced to disk. However, MsgApp also contains the Commit index (stored in the HardState), and it cannot be sent until the corresponding HardState has been persisted (I think). In theory, most MsgApps either contain new Entries or an updated Commit index, but not both, so many MsgApp messages should be able to take advantage of this optimization.

Batching the application of Raft commands would also be simpler on followers because followers don't need to deal with the proposal of future commands. Prop eval KV requires that future proposals look at the MVCC/engine layer to compute its WriteBatch after all prerequisite commands have applied their commands. This constrains us in the way I said before:

Note that while I think we can respond to clients sooner, I don't think we can pop the command out of the CommandQueue until after it has actually been executed because of how proposer evaluated kv works upstream of Raft.

I foresee this constraint making it more difficult for any batching mechanism on the leaseholder, because batching might delay the proposal of future commands.

@bdarnell you're referencing the optimized Raft pipeline from section 10.2.1 in the Raft thesis, right?

screen shot 2017-08-30 at 3 53 22 pm

It certainly seems like a clear win, although I'm not sure the interface exposed by etcd/raft would be suitable for the full extent of the optimization:

The leader may even commit an entry before it has been written to its own disk, if a majority of
followers have written it to their disks; this is still safe

I doubt supporting that case is very important in practice anyway, though.

Yes, more or less, although that diagram doesn't quite work for the architecture of etcd/raft. And I agree that the edge case of a leader committing an entry without having it in its own log is not worth supporting.

Most of this was addressed in https://github.com/cockroachdb/cockroach/pull/19229. The original idea was tested in https://github.com/cockroachdb/cockroach/pull/18710, where it did not show a significant speedup. I may revisit that branch in the future.

There are a few other ideas here related to batching of disk writes/syncs beneath Raft. None of these will be addressed in the 2.0 timeframe.

This isn't very interesting anymore thanks to txn pipelining. The optimization here allows us to acknowledge writes earlier but it doesn't allow us to take them out of the command queue any sooner. Since txn pipelining uses QueryIntent requests to wait in the command queue while waiting for writes to finish, it won't see any difference with this change. Closing for now.

I'm reviving this issue with a few adjustments. Originally, the issue focused on when to respond to clients waiting on Raft proposals. I think we should revisit this question but look to address it by reworking the Raft pipeline itself. Specifically, Raft's append/commit stage can be decoupled from its apply stage. From Raft's perspective, entry application can be fully asynchronous. In practice, this would mean that Raft's main processing loop would enqueue entries in rd.CommittedEntries to be applied on a separate goroutine instead of applying them itself. This has been discussed before, such as in https://github.com/cockroachdb/cockroach/issues/19156.

Initially, the goal of the change would be to increase single-Range throughput. Applying entries synchronously in Raft's single-threaded processing pipeline places an upper-bound on its throughput. This is because it delays each Raft processing iteration (proportionally to the size of the commands being processed), which results in the raftMu being held for extended periods of time, delaying communication to peer replicas. If entries were applied asynchronously then iterations of the append/commit stage could be performed in faster succession, allowing new commands to be proposed faster and commands with a quorum of acks to be committed faster.

A separate apply stage would also make it easier to merge together the WriteBatches of all entries in a single pass. It's unclear how much this would help speed up the RocksDB writes because our RocksDB commit pipeline already attempts to do essentially this.

A future extension to this change which doesn't need to be addressed immediately is to reconsider when we acknowledge clients in the new Raft processing pipeline. To start, we could consider acknowledge clients after command application like we currently do. I believe this is necessary for commands which rely on the acknowledgement to establish an ordering between commands and flush all previous commands to RocksDB (like lease acquisitions?). Not all commands rely on this though. For instance, we could acknowledge a Put immediately after it has been committed as long as we don't remove it from the commandQueue until after it has been applied. This is already similar to what we do with AsyncConsensus and signalProposalResult.

If we apply Raft entries asynchronously is there a danger of applies falling behind appends?

That depends on when we pull commands out of the quota pool. If we pull them out after they commit, then we'd need an extra form of backpressure. If we pull them out after they apply, I don't think we would.

With this proposal in mind, I'd like us to start looking at our single-Range, multi-node write throughput. A kv0 --splits=0 roachtest would be sufficient to track this, although we'd have to be careful to disable load-based splitting when it arrives.

A separate apply stage would also make it easier to merge together the WriteBatches of all entries in a single pass. It's unclear how much this would help speed up the RocksDB writes because our RocksDB commit pipeline already attempts to do essentially this.

This should be an improvement for single-range throughput, since the rocksdb commit batching only happens when two ranges are applying commands simultaneously.

Is it that much easier to merge the WriteBatches with a separate apply stage? I haven't thought about this recently but it seems to me it should be just as easy/difficult to merge them in the current structure, in which case we could also merge the HardState.Commit update into the same batch. Separating commit and apply forces the HardState update to be a separate write (maybe batched with other log appends, but not necessarily).

but it seems to me it should be just as easy/difficult to merge them in the current structure

Yes, it's possible that we could merge them in the current structure, although it would require a pretty big refactoring (i.e. build up giant batch across all commands, commit, ack all commands, apply local side effects for all commands). I figured this change would already force us to restructure some of this, so it would be a good time to make the change. But you're correct that they're for the most part orthogonal.

in which case we could also merge the HardState.Commit update into the same batch

We don't want to do this because we need to wait for a sync when writing the HardState and new entries, but don't need to when applying the entries.

Yes, it's possible that we could merge them in the current structure, although it would require a pretty big refactoring (i.e. build up giant batch across all commands, commit, ack all commands, apply local side effects for all commands).

I had a change to do this at one point. See https://github.com/cockroachdb/cockroach/pull/15648.

in which case we could also merge the HardState.Commit update into the same batch

We don't want to do this because we need to wait for a sync when writing the HardState and new entries, but don't need to when applying the entries.

True, but if the only update is HardState.Commit, the MustSync flag is not set, so we could still batch it into the unsynced apply batch.

True, but if the only update is HardState.Commit, the MustSync flag is not set, so we could still batch it into the unsynced apply batch.

I think that's still moving in the wrong direction. We should be making the main Raft processing loop run as fast and as frequently as possible, otherwise we're delaying inter-node coordination and slowing down consensus. Trying to apply entries in the same loop works against this goal. Instead, we should be pulling as much as possible out of the processing loop and relying on lower levels (e.g. RocksDB's commit batching) to ensure disk writes maintain high throughput.

@ajwerner and I were talking through this more and in addition to pulling Raft entry application out of the single-threaded Raft proposal/commit loop, we think there's value in revisiting the original focus of this issue -- responding to Raft proposals after entries commit, not apply.

What I missed in https://github.com/cockroachdb/cockroach/pull/18710 is that we don't even need to wait for the Raft hard state to be sync-ed before ack-ing clients. In fact, we should be able to pull commit entry acknowledgement up above writing of new proposed entries. This could pull an entire fsync out of the path for any Raft proposal on a Range with sufficiently high concurrency!

I threw together a prototype that does exactly this. It acks all clients for committed entries at the beginning of handleRaftReadyRaftMuLocked instead of at the end. This gives about an 8% throughput win and an 8% p50 latency win on kv0 running on a 3-node cluster of n1-standard-16 machines. Looks like a worthwhile change.

Another improvement that we might consider along these lines is extending etcd/raft to support the leader appending to its own log asynchronously. Right now, etcd/raft assumes that the leader will always durably append any entries in a raft.Ready to its log before advancing. This means that any newly proposed entry needs to wait for the raft ready loop to come back around before being proposed to followers. The same delay occurs in a few other places as well. Under heavy load, this means that the average entry waits for 2.5 fsyncs, not 1 (1/2 before proposing, 1/2 before appending to a follower's log, 1 during log append, 1/2 before acknowledging the client on the leader). This doesn't even include the latency of non-durable entry application, which can still take a large amount of time. To solve this issue, we would need to decouple the latency of each handleRaftReady iteration from the durable log write.

One way to address this is to remove the assumption that log writes are synchronous on the leader in etcd/raft. This seems like a pretty fundamental assumption, but it may not actually be all that hard to break. etcd/raft already has a notion of "async log writes" – they're called msgProps to followers. So I wonder how hard it would be to expose an option that allows leaders to opt-in to similar handling. Instead of assuming that a log entry is immediately durable on a leader, the leader could track its own durable progress and only count itself as having voted after its log entry has applied. This would allow us to lift Raft log writes out of handleRaftReady and into some other goroutine.

There would undoubtedly be some kinks to figure out here, but I think the various ideas in this issue are driving towards a really important point. In order for the Raft pipeline to be as efficient as possible under heavy load, we need to be able to drive its state machine at a constant rate, regardless of load or the latency of various stages in the pipeline. This is clearly not true today, but it is a goal we should keep working towards.

Was this page helpful?
0 / 5 - 0 ratings