Versions
Reliable Broker ClusterA Reliable Broker Cluster is a group of brokers collaborating to present the illusion of a single broker. If any broker fails, clients can fail-over to any other broker in the cluster and resume sessions. In active-active mode, clients can connect to any broker. In the event of a failure, clients can fail-over to any other broker in the cluster. In active-passive mode, there is exactly one active broker. All clients connect to the active broker, other brokers are passive back-ups. If the active broker fails, one of the passive brokers becomes active and all clients fail over to it. If a client tries to connect to a passive broker it is re-directed to the active broker. From the discussion below it appears that active-active and active-passive modes have identical synchronization requirements, so active-passive becomes just a special case of active-active. The brokers can run identical synchronization code in both cases. Virtual SynchronyThe cluster will use the CPG (Closed Process Group) protocol, part of the Open AIS suite. CPG is a virtual synchrony protocol. CPG members multicast to the group. This document refers to CPG multicast messages as âeventsâ to avoid confusion with AMQP messages. CPG guarantees that each group member gets an identically ordered stream of events. To get consistent behavior each member acts on events in the order they are received from the cluster. Members send their own events to the cluster but do not act on them until they are self-delivered back from the cluster. Broker StateBroker state can be divided into:
Each broker maintains state for the entire âvirtual brokerâ represented by the cluster. Each broker has identical shared state. The broker directly connected to a client has âliveâ conversational state for that client. Other brokers have âshadowâ state. Only the directly connected broker sends data to the client, but all brokers update their shadow state âas ifâ the were sending the same data to the client. Thus all brokers are ready to take over any failed-over session at all times. Replicate inputs, infer outputsWe replicate input data to all members of the cluster as CPG events. Each member independently compute the changes to shared state and output to clients by following a deterministic algorithm. There are two kinds of input:
All brokers act on input events, even locally generated ones, only when received from CPG, so all brokers will compute consistent results. There are 3 processing phases:
Handling dequeuesDequeue eventsCurrently the broker sometimes dequeues messages based on writability of client connections. This introduces a non-deterministic element in the cluster as other brokers can't know what the connection state is. When a broker wants to send a message to a local client, it issues a âdequeue eventâ via CPG indicating the queue and consumer to receive the message. No message is actually dequeued until the dequeue event returns via CPG so it is ordered with respect to other changes to the queue. If the queue is not empty when the event returns, the broker can dequeue and send to the client, unless the queue is empty. In that case the broker may send another dequeue or do nothing depending on the state of consumers and queues. Brokers receiving a foreign dequeue notice from CPG will dequeue the message (if the queue is not empty) and update the shadow session for the outgoing message.transfer. They do not actually send the message. Implicit dequeuesSome dequeues are predictable based on client commands alone. For example if a messages is added to an empty queue with only one consumer, and that consumer has sufficient credit for the message then the dequeue is implied and can be done immediately by all brokers as part of processing the message.transfer. Explicit dequeue events are required only when a broker makes a decision that is not determined entirely by cluster input events to date, for example because a client connection becomes writable. Error HandlingAn error is recoverable if an isolated broker could kill the problematic sessions or connections but continue to provide service. A core dump is an example of an unrecoverable error. Handling unrecoverable errors is straightforward: the affected broker leaves the cluster â probably because its process was killed. There are two types of recoverable error
Unpredictable errors must be handled in such a way that the cluster does not become inconsistent. In a situation where one broker experiences an unpredictable error and the others do not, we want the broker in error to shut down and leave the cluster so its clients can fail over to healthy brokers. However some unpredictable errors may happen identically on all brokers. For example brokers with identical journal configuration may all run out of disk space at the same point. We don't want to kill the entire cluster in this case. To deal with this case, a broker that experiences an unpredictable error updating shared state stalls. I.e.
A broker that receives a stall notice and successfully passes or has already passed the point of the stall sends a âshoot yourselfâ notice referencing the stall notice and continues processing. The outcomes for stalled broker(s) are:
In the event of multiple non-identical stalls around the same time, we need a deterministic way to pick the âwinnerâ.
This ensures that a single recoverable failure, or even multiple failures will never bring down the entire cluster and we continue with the largest cluster possible without causing inconsistency. TransactionsTransactions are conversational state, allowing a session to collect changes for the shared state and then apply them all at once or not at all. For maximum safety and simplicity all cluster members participate in the transaction. For TX transactions each broker creates an identical transaction, they all succeed or fail identically since they're all being fed identical input (see Error Handling above for what happens if a broker doesn't reach the same conclusion.) For DTX transactions, the originating broker creates a new transaction or joins an existing one. All other brokers join the same transaction as the originating broker. (For the case of a new transaction either transaction ids must be predictable or an extra cluster event is needed to propagate the txid.) The DTX system ensures that all brokers succeed or fail together. For TX transactions it is possible to fail-over mid transaction since all brokers have the same transaction state. For DTX transactions a broker failure will roll bach An alternative approach previously discussed is that only the origin broker has a transaction, transactions. Persistence and Asynchronous JournalingAsynchronous updates to the message store create non-determinism around the timing of completions sent to the client. TODO:
Implementation notesIn previous designs the input events were individual AMQP frames. Another alternative approach is to use raw connection read buffers as input events. Read buffers typically contain multiple frames so this may give better throughput in the cluster. By replicating raw buffers all cluster members can decode concurrently, whereas to replicate frames the origin broker must decode frames then re-encode them to send to the cluster, other brokers cannot begin decoding till this is complete. ConcurrencyThe current stand-alone broker has two roles for threads:
Only one thread (read or write) is allowed to work on behalf of a connection at a time so conversational state is not locked. Shared state is locked for thread safety on a per-queue and per-exchange basis. The first cut broker implementation will add a third thread role â cluster â and modify the work done in read and write threads as follows:
The objection to this model is that it serializes all the brokers activity and will not scale on multiple CPUs. It serves as a first cut to prove the concepts, when its working we will work to improve concurrency. Remember that the shared state updates have to be serialized across the cluster for consistency. Decoding and conversational state changes do not, so they can be moved into separate threads. One possible approach is to modify CPG to distinguish âper connectionâ and âglobalâ events. Per connection events are input from a connection. Per-connection events are dispatched concurrently for each connection, as we do now in a single broker. Per-connection-event threads execute just the input stage (parsing and conversational updates) then issue global events for shared state changes. Extra events are required because dispatching per-connection events concurrently means shared state changes from different connections are now in non-deterministic order. The additional events can be very concise however, as they only need refer to previous input events via some efficient indexing system. Global indexes.Cluster events will need to refer to various entites: connections, sessions, consumers, input events etc. All are uniquely identifiable but some have awkward, variable-length identifires. E.g. a session is uniquely identified by session-name+authentication-principal. The cluster will assign a globally unique 64-bit integer index to all such entities with a single monotonically increasing wrap-around counter. Every broker maintains its own counter, indexes are associated with entities in the order they are first mentioned in the CPG event stream. This order is the same for all cluster members. This has the slightly unusual property that you don't know an object's cluster index until after you re-receive the input event where it is introduced. However since these Ids are for use in cluster events, they can only ever refer to objects introduced in an earlier event, so there's no need to know an index until after the event that introduces it is delivered. Cluster events themselves are identified by a separate monotonically increasing 64 bit wrap-around counter. |
Powered by
Atlassian Confluence
(Version: 2.2.9 Build:#527 Sep 07, 2006)
-
Bug/feature request
Unsubscribe or edit your notifications preferences
Unsubscribe or edit your notifications preferences
