Versions

  • 1.0 – Alan Conway, Jun 27 2008

Reliable Broker Cluster

A Reliable Broker Cluster is a group of brokers collaborating to present the illusion of a single broker. If any broker fails, clients can fail-over to any other broker in the cluster and resume sessions.

In active-active mode, clients can connect to any broker. In the event of a failure, clients can fail-over to any other broker in the cluster.

In active-passive mode, there is exactly one active broker. All clients connect to the active broker, other brokers are passive back-ups. If the active broker fails, one of the passive brokers becomes active and all clients fail over to it. If a client tries to connect to a passive broker it is re-directed to the active broker.

From the discussion below it appears that active-active and active-passive modes have identical synchronization requirements, so active-passive becomes just a special case of active-active. The brokers can run identical synchronization code in both cases.

Virtual Synchrony

The cluster will use the CPG (Closed Process Group) protocol, part of the Open AIS suite. CPG is a virtual synchrony protocol. CPG members multicast to the group. This document refers to CPG multicast messages as “events” to avoid confusion with AMQP messages.

CPG guarantees that each group member gets an identically ordered stream of events. To get consistent behavior each member acts on events in the order they are received from the cluster. Members send their own events to the cluster but do not act on them until they are self-delivered back from the cluster.

Broker State

Broker state can be divided into:

  • Conversational state: related to the conversation with a particular client.

    • Per-session command IDs, replay buffers, transaction buffers, message builders etc.

    • Consumers: subscriptions, flow control.

    • Open connection identities.

  • Shared state: affects the output sent to many clients.

    • Wiring: exchanges, queue definitions, bindings.

    • Queue contents.

Each broker maintains state for the entire “virtual broker” represented by the cluster. Each broker has identical shared state. The broker directly connected to a client has “live” conversational state for that client. Other brokers have “shadow” state. Only the directly connected broker sends data to the client, but all brokers update their shadow state “as if” the were sending the same data to the client. Thus all brokers are ready to take over any failed-over session at all times.

Replicate inputs, infer outputs

We replicate input data to all members of the cluster as CPG events. Each member independently compute the changes to shared state and output to clients by following a deterministic algorithm. There are two kinds of input:

  • Input from clients

  • Non-deterministic message allocation decisions (see section on the dequeue problem below)

All brokers act on input events, even locally generated ones, only when received from CPG, so all brokers will compute consistent results.

There are 3 processing phases:

  • Input: read and decode data from clients, send input events to cluster, update conversational state.

  • Process: Update the shared model based on events from cluster, generate output for clients.

  • Output: Encode and send client output.



Handling dequeues

Dequeue events

Currently the broker sometimes dequeues messages based on writability of client connections. This introduces a non-deterministic element in the cluster as other brokers can't know what the connection state is.

When a broker wants to send a message to a local client, it issues a “dequeue event” via CPG indicating the queue and consumer to receive the message. No message is actually dequeued until the dequeue event returns via CPG so it is ordered with respect to other changes to the queue. If the queue is not empty when the event returns, the broker can dequeue and send to the client, unless the queue is empty. In that case the broker may send another dequeue or do nothing depending on the state of consumers and queues.

Brokers receiving a foreign dequeue notice from CPG will dequeue the message (if the queue is not empty) and update the shadow session for the outgoing message.transfer. They do not actually send the message.

Implicit dequeues

Some dequeues are predictable based on client commands alone. For example if a messages is added to an empty queue with only one consumer, and that consumer has sufficient credit for the message then the dequeue is implied and can be done immediately by all brokers as part of processing the message.transfer.

Explicit dequeue events are required only when a broker makes a decision that is not determined entirely by cluster input events to date, for example because a client connection becomes writable.

Error Handling

An error is recoverable if an isolated broker could kill the problematic sessions or connections but continue to provide service. A core dump is an example of an unrecoverable error.

Handling unrecoverable errors is straightforward: the affected broker leaves the cluster – probably because its process was killed.

There are two types of recoverable error

  • Predictable errors occur in the same way on all brokers as a predictable consequence of cluster events. For example binding a queue to a non-existent exchange.

  • Unpredictable errors may not occur on all brokers. For example running out of journal space to store a message, or an IO error from the journal.

Unpredictable errors must be handled in such a way that the cluster does not become inconsistent. In a situation where one broker experiences an unpredictable error and the others do not, we want the broker in error to shut down and leave the cluster so its clients can fail over to healthy brokers.

However some unpredictable errors may happen identically on all brokers. For example brokers with identical journal configuration may all run out of disk space at the same point. We don't want to kill the entire cluster in this case.

To deal with this case, a broker that experiences an unpredictable error updating shared state stalls. I.e.

  • Issues a stall notice to the cluster indicating where in the event stream it stalled and why.

  • Stops processing cluster events and therefore stops making shared state updates.

  • Stores events received after the stall for possible future processing.

A broker that receives a stall notice and successfully passes or has already passed the point of the stall sends a “shoot yourself” notice referencing the stall notice and continues processing.

The outcomes for stalled broker(s) are:

  1. Receive a “shoot yourself” - exit the cluster so clients can fail over to a healthy broker.

  2. Receive an identical stall notice from every cluster member - resume processing, first the stored events received while stalled then continue with new events.

  3. Receive non-identical stall notices from every cluster member – if the broker's stall notice “wins” then resume else leave the cluster.

In the event of multiple non-identical stalls around the same time, we need a deterministic way to pick the “winner”.

  1. Organize notices into groups of identical notices.

  2. If there is a single largest group, it wins.

  3. If there are multiple largest groups the group with the oldest (in cluster order) notice wins.

This ensures that a single recoverable failure, or even multiple failures will never bring down the entire cluster and we continue with the largest cluster possible without causing inconsistency.

Transactions

Transactions are conversational state, allowing a session to collect changes for the shared state and then apply them all at once or not at all.

For maximum safety and simplicity all cluster members participate in the transaction.

For TX transactions each broker creates an identical transaction, they all succeed or fail identically since they're all being fed identical input (see Error Handling above for what happens if a broker doesn't reach the same conclusion.)

For DTX transactions, the originating broker creates a new transaction or joins an existing one. All other brokers join the same transaction as the originating broker. (For the case of a new transaction either transaction ids must be predictable or an extra cluster event is needed to propagate the txid.) The DTX system ensures that all brokers succeed or fail together.

For TX transactions it is possible to fail-over mid transaction since all brokers have the same transaction state. For DTX transactions a broker failure will roll bach

An alternative approach previously discussed is that only the origin broker has a transaction, transactions.

Persistence and Asynchronous Journaling

Asynchronous updates to the message store create non-determinism around the timing of completions sent to the client.

TODO:

  • Do we need async-completion events for the cluster?

  • When do we send completion: cluster done, store done or store+cluster done?

  • Do we need to wait till all cluster members have stored data?

Implementation notes

In previous designs the input events were individual AMQP frames.

Another alternative approach is to use raw connection read buffers as input events. Read buffers typically contain multiple frames so this may give better throughput in the cluster. By replicating raw buffers all cluster members can decode concurrently, whereas to replicate frames the origin broker must decode frames then re-encode them to send to the cluster, other brokers cannot begin decoding till this is complete.

Concurrency

The current stand-alone broker has two roles for threads:

  • Read: triggered by read wake-ups from poller. Read from clients, decode, update conversational and shared state. Dequeue messages based on input, generate output for clients in write queues.

  • Write: triggered by write wake-ups from poller. Encode and write data in write queues. Also dequeue messages when connection becomes writeable, encode and write to clients.

Only one thread (read or write) is allowed to work on behalf of a connection at a time so conversational state is not locked. Shared state is locked for thread safety on a per-queue and per-exchange basis.

The first cut broker implementation will add a third thread role – cluster – and modify the work done in read and write threads as follows:

  • Read: Reads raw byte data from connection and sends a cluster input event with data and connection-id.

  • Cluster: Triggered by CPG events. Same steps as current read thread, but decoding from the event rather than direct from the Connection.

  • Write: Same as currently except for initiating dequeues. Instead of initiating a dequeue, send a dequeue event with queue name and consumer to be executed in the cluster thread.

The objection to this model is that it serializes all the brokers activity and will not scale on multiple CPUs. It serves as a first cut to prove the concepts, when its working we will work to improve concurrency. Remember that the shared state updates have to be serialized across the cluster for consistency. Decoding and conversational state changes do not, so they can be moved into separate threads.

One possible approach is to modify CPG to distinguish “per connection” and “global” events. Per connection events are input from a connection. Per-connection events are dispatched concurrently for each connection, as we do now in a single broker.

Per-connection-event threads execute just the input stage (parsing and conversational updates) then issue global events for shared state changes. Extra events are required because dispatching per-connection events concurrently means shared state changes from different connections are now in non-deterministic order. The additional events can be very concise however, as they only need refer to previous input events via some efficient indexing system.

Global indexes.

Cluster events will need to refer to various entites: connections, sessions, consumers, input events etc. All are uniquely identifiable but some have awkward, variable-length identifires. E.g. a session is uniquely identified by session-name+authentication-principal.

The cluster will assign a globally unique 64-bit integer index to all such entities with a single monotonically increasing wrap-around counter. Every broker maintains its own counter, indexes are associated with entities in the order they are first mentioned in the CPG event stream. This order is the same for all cluster members.

This has the slightly unusual property that you don't know an object's cluster index until after you re-receive the input event where it is introduced. However since these Ids are for use in cluster events, they can only ever refer to objects introduced in an earlier event, so there's no need to know an index until after the event that introduces it is delivered.

Cluster events themselves are identified by a separate monotonically increasing 64 bit wrap-around counter.