On Sat, Jan 24, 2015 at 3:12 AM, David Lang <[email protected]> wrote: > On Sat, 24 Jan 2015, singh.janmejay wrote: > > On Sat, Jan 24, 2015 at 2:19 AM, David Lang <[email protected]> wrote: >> >> RELP is the network protocol you need for this sort of reliability. >>> However, you would also need to not allow any message to be stored in >>> memory (because it would be lost if rsyslog crashes or the system reboots >>> unexpectedly). You would have to use disk queues (not disk assisted >>> queues) >>> everywhere and do some other settings (checkpoint interval of 1 for >>> example) >>> >>> This would absolutly cripple your performance due to the disk I/O >>> limitations. I did some testing of this a few years ago. I was using a >>> high-end PCI SSD (a 160G card cost >$5K at the time) and depending on the >>> filesystem I used, I could get rsyslog to receive between 2K and 8K >>> messages/sec. The same hardware writing to a 7200rpm SATA drive with >>> memory >>> buffering allowed could handle 380K messages/sec (the limiting factor was >>> the Gig-E network) >>> >>> Doing this sort of reliability on a 15Krpm SAS drive would limit you to >>> ~50 logs/sec. Modern SSDs would be able to do better, I would guess a few >>> hundred logs/sec from a good drive, but you would be chewing through the >>> drive lifetime several thousand times faster than if you were allowing >>> memory buffering. >>> >>> Very few people have logs that are critical enough to warrent this sort >>> of >>> performance degredation. >>> >>> >> I didn't particularly have disk-based queues in mind for reliability >> reasons. However, messages may need to overflow to disk to manage bursts >> (but only for burstability reasons). For a large-architecture for this >> nature, its generally useful to classify failures in a broad way (rather >> than very granular failure modes, that we identify for transactional >> databases etc). The reason for this ties back to self-healing. Its easier >> to build self-healing mechanisms assuming only one kind of failure, node >> loss. It could happen for multiple reasons, but if we treat it that way, >> all we have to do is build room for managing the cluster when 1 (or k) >> nodes are lost. >> >> So thinking of it that way, a rsyslog crash, or a machine-crash or a disk >> failure are all the same to me. They are just node loss (we may be able to >> bring the node back with some offline procedure), but it'll come back as a >> fresh machine with no state. >> >> Which is why I treat K-safety as a basic design parameter. If K nodes >> disappear, data will be lost. >> >> With this kind of coarse-grained failure-mode, messages can easily be kept >> in memory. >> > > whatever machine sends the ack saying that the message was delivered will > have to do disk queues or other equivalent, otherwise the message can be > lost after the ack is sent.
In the cluster, ACK for 'replicated till seq_no' is done only by the queue in the last tier (Tier-3, in the example). The last tier node is not special (or different) from any other node in the cluster. It still queues things up in memory, and sends a ack saying it has received messages till a certain seq_no. Because its the last tier (i.e. k-th tier) this ack also means message has been k-way replicated. Let us say the process / machine crashes at this point. The tier-2 machine will correct this by re-transmitting all messages from what it thinks is the current 'delivered till seq_no'. This is different from 'replicated till seq_no' (and is always <= replicated till). This is the value that tells us, that the downstream cluster has replicated messages till a certain point. That is, replicated_till_seq_no(system N+1) == delivered_till_seq_no(system N), similarly replicated_till_seq_no(system N-1) == delivered_till_seq_no(system N) etc. The last two sequence-diagrams deal with failure scenarios (although I show it in tier-2, the same recovery mechanism applies for either first or last tier too). > > > In addition, this sort of reliability is saying that you would rather have >>> your applications freeze than have them do something and not have it >>> logged. And that you are willing to have your application slow down to >>> the >>> speed of the logging. Very few people are willing to do this. >>> >>> >>> >>> You are proposing doing the application ack across multiple hops instead >>> of doing it hop-by-hop. This would avoid the problem that can happen with >>> hop-by-hop acks where a machine that has acked a message then dies and >>> needs to be recovered before the message can get delivered (assuming you >>> have redundant storage and enough of the storage survives to be able to >>> be >>> read, the message would eventually get through). >>> >>> But you now have the problem that the sender needs to know how many >>> destinations the logs are going to. If you have any filters to decide >>> what >>> to do with the logs, the sender needs to know if the log got lost, or if >>> a >>> filter decided to not write the log. If the rules would deliver the logs >>> to >>> multiple places, the sender will need to know how many places it's going >>> to >>> be delivered to so that it can know how many different acks it's supposed >>> to get back. >>> >>> >> So the design expects clusters to be broken in multiple tiers. Let us take >> a 3 tier example. >> >> Say we have 100 machines, we break them into 3 tiers of 34, 33 and 33 >> machines. >> >> Assuming every producer wants at the most 2-safety, I can use 3 tiers to >> build this design. >> >> So first producer discovers Tier-1 nodes, and hashes its session_id to >> pick >> one of the 34 nodes, if it is not able to connect, it discards that node >> from the collection(now we end up with 33 nodes in Tier-1) and hashes to a >> different node (again a Tier-1 node, of-course). >> >> One it finds a node that it can connect to, it sends its session_id and >> message-batch to it. >> >> The selected Tier-1 node now hashes the session_id and finds a Tier-2 node >> (it again discovers all Tier-2 nodes via external discovery mechanism). If >> it fails to connect, it discards that node and hashes again to one of the >> remaining 32 nodes, and so on. >> >> Eventually it reaches Tier-3, which is where ruleset has a clause which >> checks for replica_number == 1, and handles the message differently. It is >> handed over to an action which delivers it to the downstream system (which >> may in-turn again be a syslog-cluster, or a datastore etc). >> >> So each node only has to worry about the next hop that it needs to deliver >> to. >> >> > delivering to one of N machines is trivial to do and requires no software > changes. CLUSTERIP managed by corosync will do this using standard syslog > software today. Specifically, this allows you to have X machines in a > cluster and a system sending a log to the cluster doesn't care which > machine it's delivered to. It just connects to the cluster IP and sends a > message. If the machine it's hitting fails or is dead, it just tries > connecting again and will almost always hit a different box immediately > (and if it's unlucky enough, just try again). Corosync will remove the > machine from the cluster (and add it back as appropriate) without any need > for the clients to know about it. > Thanks for pointing this out. I was thinking of using something like zookeeper for discovery(ephemeral z-nodes, that disappear when syslog-nodes die etc.), but this may be a simpler way of doing it. Have never used it before, so need to learn the details, but from how you describe it, seems like it does exactly what I want. > > the problem is at the destination. If the ruleset on tier 3 says that the > message should be delivered to multiple destiantions (say Elastic Search, a > couple of local files, and another remote machine), does the tier 3 machine > send the ack? or does it need to get acks from all the destinations? what > if one local file can be written and the other can't? What if the ruleset > says to discard the message? what if the ruleset says to discard the > message based on the current value of a global variable? > This is what I addressed in the 'user configuration level implementation' where I said user needs to choose one destination (one action) that is used as reliable destination (once written, data wouldn't be lost). However, we can implement composite ack-aggregators that ack when each destination is written to, and pass it up to queue only when all desired destinations have ack'ed. > > When does an ack get sent? > > What happens if a box in the middle is supposed to modify the message? > This is not a problem. Any of the boxes may modify messages in any way. This is fine because downstream-tier considers the modified message as source of truth, so as far as a tier is concerned, it got the message, and it needs to deliver it downstream. If it fails, the modified message will be re-transmitted to another node picked from the same tier (by hashing, or using Corosync, or some other way). > > >>> These problems make it so that I don't see how you would reasonably >>> manage >>> this sort of environment. >>> >>> >>> >>> I would suggest that you think hard about what your requirements really >>> are. >>> >>> It may be that you are only sending to one place, in which case, you >>> really want to just be inserting your messages into an ACID complient >>> database. >>> >>> It may be that your requirements for absolute reliability are not quite >>> as >>> severe as you are initially thinking that they are, and that you can then >>> use the existing hop-by-hop reliability. Or they are even less severe and >>> you can accept some amount of memory buffering to get a few orders of >>> magnatude better performance from your logging. Remember that we are >>> talking about performance differences of 10,000x on normal hardware. A >>> bit >>> less, but still 100x or so on esoteric, high-end hardware. >>> >>> >>> Yep, I completely agree. In most cases extreme reliability such as this >> is >> not required, and is best avoided for cost reasons. >> >> But for select applications it is lifesaver. >> > > true, but at that point, is rsyslog the right software to be used? > I think syslog is the right tool for log-movement. The biggest reason is, same tooling applies to both un-acked and fully-reliable pipelines. So any user configuration can be used interchangeability(be it message modification, log-rotation mechanism, queues, actions, rulesets, it could be anything at all). > > >> >>> I will also say that there are messaging systems that claim to have the >>> properties that you are looking for (Flume for example), but almost >>> nobody >>> operates them in their full reliability mode because of the performance >>> issues. And they do not have the filtering and multiple destination >>> capabilities that *syslog provides. >>> >>> >> Yes, Flume is one of the best options. But it comes with some unique >> problems too (its not light-weight enough for running producer side + >> managed-environment overhead (GC etc) cause their own set of problems). >> There is also value in offering the same interface to producers for >> ingestion into un-acked and reliable pipeline (because a lot of other >> things, like integration with other systems can be reused). It also keeps >> things simple because producers do all operations in one way, with one >> tool, regardless of its ingestion mechanism being acked/replicated etc. >> >> Reliability in this case is built end-to-end, so building stronger >> guarantees over-the-wire parts of the pipeline doesn't seem very valuable >> to me. Why do you feel RELP will be necessary? >> > > what you are doing is basically just extending RELP from being per-hop to > being from the original source to the final destination. This requires that > the original source hold on to the message the entire time rather than > being able to delegate it to whatever box is just before the breakage. As I > say above, it also requires that the source know about all of the final > destinations. > Not really. Take a look at the following diagram: ingestion_relay_6_purge_shown.png <https://docs.google.com/file/d/0B_XhUZLNFT4dSk9obFRLYXdOY0k/edit?usp=drive_web> I have added what the sliding window in nodes from different tiers and upstream look like for the first diagram. I haven't shown the sliding-window details for all message exchanges to avoid unnecessary verbosity in the diagram, but it conveys the idea. Note that last tier discards messages when downstream-system has replicated it. Tiers in the same cluster discard messages in response to last tier delivering it to downstream system. So first tier (or upstream system) doesn't need to hold messages any longer than it takes for next-system to replicate it. This allows for burst as large as queue size that next-system can provide. Its a typical store-and-forward design. > > At that point you no longer have the syslog async multiple relay > architecture. > > What advantage do you get by having multile special machines in the middle > as opposed to just having the source machine talk directly to the > destination machine? You are not allowing the machines in the middle to > store the message, and you are waiting until the final machine responds > before you can release the message. > The advantage is burstability. If downstream is capable of keeping up with very high bursts of messages, intermediate cluster may not be required. However, in most cases that kind of over-provisioning for a database is hard. The intermediate cluster allows smoothing out bursts for the final tier. > > With rsyslog today you could have a source box do RELP through multiple > tiers of redundant routers or dumb TCP proxies to get to a rsyslog box > receiving the message and delivering it (with the need for disk queues, > fsync on every output message, etc that cripple speed) > > no need for any sequence-id, message-id, or any message changes at all. > Yep, but this imposes 2 important restrictions. One is fsync at each message which restricts throughput of relay. The second is it has no protection from disk-failures, which are going to be common in a very large-cluster. The throughput limitation makes scaling this cluster very expensive. High throughput is necessary to keep it cheap. > > In fact, you can have two different copies of rsyslog running on the > machines, one operating normally, one in audit-grade reliability mode and > the client just decides which copy to send a particular log message to > (/dev/log can only be one, but the client can connect to localhost on > multiple ports) > I wasn't thinking 2 different instances. It was actually 2 different pipelines, that is different inputs, bound to different rulesets and actions (one that is acked, other unacked). > > David Lang > > > > >>> David Lang >>> >>> >>> >>> On Sat, 24 Jan 2015, singh.janmejay wrote: >>> >>> Date: Sat, 24 Jan 2015 01:48:18 +0530 >>> >>>> From: singh.janmejay <[email protected]> >>>> Reply-To: rsyslog-users <[email protected]> >>>> To: rsyslog-users <[email protected]> >>>> Subject: [rsyslog] [RFC: Ingestion Relay] End-to-end reliable >>>> 'at-least-once' >>>> message delivery at large scale >>>> >>>> >>>> Greetings, >>>> >>>> This is a proposal for new-feature, and im inviting thoughts. >>>> >>>> The aim is to use a set of rsyslog nodes(let us call it a cluster) to be >>>> able to move messages reliably from source to destination. >>>> >>>> Let us make a few assumptions so we can define the expected properties >>>> clearly. >>>> >>>> >>>> Assumptions: >>>> >>>> - Data once successfully delivered to the Destination (typically a >>>> datastore) is considered safe. >>>> - Source-crashing with incomplete message hand-off to the cluster is >>>> outside the scope of this. In such a case, source must retry. >>>> - The cluster must be designed to support a maximum of K node failures >>>> without any message loss >>>> >>>> >>>> Here are the properties that may be desirable in such a service(the >>>> cluster >>>> is implementation of this service): >>>> >>>> - No message should ever be lost once handed over to the >>>> delivery-network >>>> except in a disaster scenario >>>> - Disaster scenario is a condition where more than k nodes in the >>>> cluster >>>> fail >>>> - Each source may pick a desirable value of k, where (k <= K) >>>> - Any cluster nodes must re-transmit messages at a timeout T, if >>>> downstream >>>> fails to ACK it before the timeout. >>>> - Such a cluster should ideally be composable, in the sense, user should >>>> be >>>> able to chain multiple such clusters. >>>> >>>> >>>> This requires the cluster to support k-way replication of messages in >>>> the >>>> cluster. >>>> >>>> Implementation: >>>> >>>> High level: >>>> - The cluster is divided in multiple tiers (let us call them >>>> replication-tiers (or rep-tiers). >>>> - The cluster can handle multiple sessions at a time. >>>> - Session_ids are unique and are generated by producer system when they >>>> start producing messages >>>> - Within a session, we have a notion of sequence-number (or seq_no), >>>> which >>>> is a monotonically increasing number(incremented by 1 per message). This >>>> requirement can possibly be relaxed for performance reasons, and gaps in >>>> seq-id may be acceptable. >>>> - Replication is basically managed by lower tiers sending data over to >>>> higher tiers within the cluster, until replica-number (an attribute each >>>> message carries, falls to 1) >>>> - When replica-number falls to zero, we transmit message to desired >>>> destination. (This can alternatively be done at the earliest >>>> opportunity, >>>> i.e. in Tier-1, under special-circumstances, but let us discuss that >>>> later >>>> if we find enough interest in doing so). >>>> - There must be several nodes in each Tier, allocated to minimize >>>> possibility of all of them going down at once (across availability >>>> zones, >>>> different chassis etc). >>>> - There must be a mechanism which allows nodes from upstream system to >>>> discover nodes of Tier-1 of the cluster, and Tier-1 nodes to discover >>>> nodes >>>> in Tier-2 of the cluster and so on. Hence nodes in Tier-K of the cluster >>>> should be able to discover downstream nodes. >>>> - Each session (or multiple sessions bundled according to arbitrary >>>> logic, >>>> such as hashing), must pick one node from each tier as >>>> downstream-tier-node. >>>> - Each node must maintain 2 watermarks: >>>> * Replicated till seq_no : till what sequence number have messages >>>> been >>>> k-way replicated in the cluster >>>> * Delivered till seq_no: till what sequence number have messages been >>>> delivered to downstream system >>>> - Each send-operation (i.e. transmission of messages) from upstream to >>>> cluster's Tier-1 or from lower tier in cluster to higher tier in cluster >>>> will pass messages such that highest seq_no of any message(per session) >>>> in >>>> transmitted batch is known >>>> - Each receive-operation in cluster's Tier-1 or in upper-tiers within >>>> cluster must respond/reply to transmitter with the two water-mark values >>>> (i.e Replicated seq_no and Delivered seq_no) per session. >>>> - Lower tiers (within the cluster) are free to discard messages all >>>> message >>>> with seq_no <= Delivered till seq_no >>>> - Upstream system is free to discard all messages with seq_no <= >>>> Replicated >>>> till seq_no of cluster >>>> - Upstream and downstream systems can be chained as instances of such >>>> clusters if need be >>>> - Maximum replication factor 'K' is dictated by cluster design (number >>>> of >>>> tiers) >>>> - Desired replication factor 'k' is a per-message controllable attribute >>>> (decided by the upstream) >>>> >>>> The sequence-diagrams below explain this visually: >>>> >>>> Here is a case with an upstream sending messages with k = K : >>>> ingestion_relay_1_max_replication.png >>>> <https://docs.google.com/file/d/0B_XhUZLNFT4dN21TLTZBQjZMdUk/ >>>> edit?usp=drive_web> >>>> >>>> This is a case with k < K : >>>> ingestion_relay_2_low_replication.png >>>> <https://docs.google.com/file/d/0B_XhUZLNFT4da1lKMnRKdU9JUkU/ >>>> edit?usp=drive_web> >>>> >>>> The above 2 cases show only one transmission going from upstream system >>>> to >>>> downstream system serially, this shows it pipelined : >>>> ingestion_relay_3_pipelining.png >>>> <https://docs.google.com/file/d/0B_XhUZLNFT4dQUpTZGRDdVVXLVU/ >>>> edit?usp=drive_web> >>>> >>>> This demonstrates failure of a node in the cluster, and how it recovers >>>> in >>>> absence of continued transmission (it is recovered by timeout and >>>> retransmission) : >>>> ingestion_relay_4_timeout_based_recovery.png >>>> <https://docs.google.com/file/d/0B_XhUZLNFT4dMm5kUWtaTlVfV1U/ >>>> edit?usp=drive_web> >>>> >>>> This demonstrates failure of a node in the cluster, and how it recovers >>>> due >>>> to continued transmission : >>>> ingestion_relay_5_broken_transmission_based_recovery.png >>>> <https://docs.google.com/file/d/0B_XhUZLNFT4dd3M0SXpUYjFXdlk/ >>>> edit?usp=drive_web> >>>> >>>> >>>> >>>> Rsyslog level implementation sketch: >>>> >>>> - Let us assume there is a way to identify the set of inputs, queues, >>>> rulesets and actions that need to participate as reliable pipeline >>>> components in a cluster node >>>> - Each participating queue, will expect messages to contain a session-id >>>> - Consumer bound to a queue will be expected to provide values for both >>>> watermarks to per-session to dequeue more messages. >>>> - Producer bound to a queue will be provided values for both watermarks >>>> per-session as return value when en-queueing more messages. >>>> - The inputs will transmit (either broadcast or unicast) both watermark >>>> values to upstream actions (unicast is sent over relevant connections, >>>> broadcast is sent across all connections) (please note this has nothing >>>> to >>>> do with network broadcast domains, as everything is over TCP). >>>> - Actions will receive the two watermarks and push it back to the queue >>>> action is bound to, in order to dequeue more messages >>>> - Rulesets will need to pick the relevant actions value across multiple >>>> action-queues according to user-provided configuration, and propagate it >>>> backwards >>>> - Action must have ability to set arbitrarily value for replica-number >>>> when >>>> passing it to downstream-system (so that chaining is possible). >>>> - Inputs may produce the new value for replicated till seq_no when >>>> receiving a message with replica_number == 1 >>>> - Action may produce the new value for delivered till seq_no after >>>> having >>>> successfully delivered a message with replica_number == 1 >>>> >>>> Rsyslog configuration required(from user): >>>> >>>> - User will need to identify machines that are a part of cluster >>>> - These machines will have to be divided in multiple replication tiers >>>> (as >>>> replication will happen only across machines in different tiers) >>>> - User can pass message to the next cluster by setting replica_number >>>> back >>>> to a desired number and passing it to an action which writes it to one >>>> of >>>> the nodes in a downstream cluster >>>> - User needs to check replica_number in the ruleset and take special >>>> action >>>> (to write it to downstream system) when replica_number == 1 >>>> >>>> >>>> Does this have any overlap with RELP? >>>> >>>> I haven't studied RELP in depth yet, but as far as I understand it, it >>>> tries to solve the problem of delivering messages reliably between a >>>> single-producer and a single-consumer losslessly (it targets different >>>> kind >>>> of loss scenarios specifically). In addition to this, its scope is >>>> limited >>>> to ensuring no messages are lost during transportation. In event of a >>>> crash >>>> of the receiver node before it can handle received message reliably, >>>> some >>>> messages may be lost. Someone with deeper knowledge of RELP should chime >>>> in. >>>> >>>> >>>> >>>> Thoughts? >>>> >>>> >>>> >>>> -- >>>> Regards, >>>> Janmejay >>>> http://codehunk.wordpress.com >>>> _______________________________________________ >>>> rsyslog mailing list >>>> http://lists.adiscon.net/mailman/listinfo/rsyslog >>>> http://www.rsyslog.com/professional-services/ >>>> What's up with rsyslog? Follow https://twitter.com/rgerhards >>>> NOTE WELL: This is a PUBLIC mailing list, posts are ARCHIVED by a myriad >>>> of sites beyond our control. PLEASE UNSUBSCRIBE and DO NOT POST if you >>>> DON'T LIKE THAT. >>>> >>> >>> >>> _______________________________________________ >>> rsyslog mailing list >>> http://lists.adiscon.net/mailman/listinfo/rsyslog >>> http://www.rsyslog.com/professional-services/ >>> What's up with rsyslog? Follow https://twitter.com/rgerhards >>> NOTE WELL: This is a PUBLIC mailing list, posts are ARCHIVED by a myriad >>> of sites beyond our control. PLEASE UNSUBSCRIBE and DO NOT POST if you >>> DON'T LIKE THAT. >>> >>> >> >> >> -- >> Regards, >> Janmejay >> http://codehunk.wordpress.com >> _______________________________________________ >> rsyslog mailing list >> http://lists.adiscon.net/mailman/listinfo/rsyslog >> http://www.rsyslog.com/professional-services/ >> What's up with rsyslog? Follow https://twitter.com/rgerhards >> NOTE WELL: This is a PUBLIC mailing list, posts are ARCHIVED by a myriad >> of sites beyond our control. PLEASE UNSUBSCRIBE and DO NOT POST if you >> DON'T LIKE THAT. > > > _______________________________________________ > rsyslog mailing list > http://lists.adiscon.net/mailman/listinfo/rsyslog > http://www.rsyslog.com/professional-services/ > What's up with rsyslog? Follow https://twitter.com/rgerhards > NOTE WELL: This is a PUBLIC mailing list, posts are ARCHIVED by a myriad > of sites beyond our control. PLEASE UNSUBSCRIBE and DO NOT POST if you > DON'T LIKE THAT. > -- Regards, Janmejay http://codehunk.wordpress.com _______________________________________________ rsyslog mailing list http://lists.adiscon.net/mailman/listinfo/rsyslog http://www.rsyslog.com/professional-services/ What's up with rsyslog? Follow https://twitter.com/rgerhards NOTE WELL: This is a PUBLIC mailing list, posts are ARCHIVED by a myriad of sites beyond our control. PLEASE UNSUBSCRIBE and DO NOT POST if you DON'T LIKE THAT.

