David, can you clarify the JSON thing? -- Regards, Janmejay
PS: Please blame the typos in this mail on my phone's uncivilized soft keyboard sporting it's not-so-smart-assist technology. On Jan 25, 2015 11:42 PM, "singh.janmejay" <[email protected]> wrote: > > > On Sun, Jan 25, 2015 at 5:09 AM, David Lang <[email protected]> wrote: > >> On Sun, 25 Jan 2015, singh.janmejay wrote: >> >> if you want true guarantees, then you need to replicate your data to >>>> storage. If you are willing to settle for statistically reliable, then >>>> replicating to RAM on multiple machines may be 'good enough', but it's >>>> not >>>> the same. >>>> >>>> >>> Im working with a simple definition of reliability. If something goes >>> wrong, be it rsyslog crash, hardware failure or anything in between, as >>> far >>> as the cluster is concerned, its a node loss. The cluster should be able >>> to >>> tolerate loss of upto K nodes without losing data. >>> >> >> I'm looking a little wider than that. I'm looking at what happens if you >> loose a chunk of network or all the power in an area of the datacenter. >> Things that can cause you to 'loose' lots of nodes at one time. >> > > Yes, any more than K-nodes loss can cause data-loss. > > You point out later that its easier said than done at a cloud provisioning > level. I had a simple scheme in mind, obviously a bigger outage radius will > still mess it up, but I think its worth putting it out. > > I was thinking of each tier of the cluster being built using a different > availability-zone. So power problems, minor disaster scenarios(limited to > availability zone) etc will not cause any data loss, even though it'll > pause data-movement completely if entire availability-zone is lost (until > the tier is partly rebuilt using replacements). > > Basically, whatever is deleted by upstream is already replicated across > tiers, so single tier loss can't kill it. > > > >> >> - Serviceability concerns: >>>> >>>>> * One of the constraints large-scale puts is you can't handle problems >>>>> manually anymore. One of the economical things in a cloud for >>>>> instance, is >>>>> to terminate and start a new instance as first response to any trouble, >>>>> and >>>>> using online diagnostics to find machines with problems and fix them >>>>> out >>>>> of >>>>> band. >>>>> >>>>> >>>> this causes data loss if there is any data unique to that machine, be it >>>> in ram or on disk >>>> >>>> >>> No, it doesn't. It causes node-loss, but once the upper-tier, or >>> upper-system detects the loss, it re-floods un-delivered messages to the >>> replacement. The last 2 sequence-diagrams in the first mail walk through >>> 2 >>> ways of recovering from node-failures without any data-loss. >>> >> >> imprecise wording on my part, I meant that there is no attempt to >> preserve that data so that the node will send it later. That _copy_ of the >> data is lost. >> > > True, writing to disk I think is a valid option, and a valuable one when > trade-off is acceptable. But in my opinion it shouldn't be the only option > (I don't think you are proposing it as the only option either). > > >> >> - Impact of message modification >>>> >>>>> * The message routing and ingestion into the datastore is transparent >>>>> to >>>>> user. So by definition, no elements of routing path must leak into >>>>> functional representation of the message. So user will not know, or >>>>> care, >>>>> about what nodes the messages traversed. >>>>> >>>>> >>>> as an admin of the system I may want to know what nodes it traversed, >>>> even >>>> if the person generating the message doesn't. If you have some messages >>>> being delayed significantly or corrupted, it's really helpful to have >>>> the >>>> messages tagged with the nodes they traverse so you can see what node is >>>> likely to be having problems. >>>> >>>> >>> Yes, but then you would rather depend on failure-logs from cluster nodes, >>> and stats around lag across tiers like lag-percentiles etc. Isn't it? >>> >> >> where do you get those stats? you have to have info to extract those >> stats from. >> > > Oh, I thought you were talking about output that goes to the data-store. > > For messages that get exchanged between tiers and even systems, we'll have > to figure out some way of carrying session_id, seq_no etc. > > >> >> Also, even with routing info being added to the message, >>> idempotent-ingestion-layer can choose to store all duplicate message's >>> metadata in a way which allows such reporting. >>> >> >> I'm saying that you have to have the metadata being passed around in the >> first place. This means that you have to add info to what's being passed >> around. > > > Agree. > > >> >> >> - Kind of complexity that exists in writing/reading from the storage >>>> tier >>>> >>>>> * With data volume of a few million messages per second, a distributed >>>>> datastore is anyway necessary (no single-instance datastore built for >>>>> commodity hardware can ingest data at that rate). >>>>> >>>>> >>>> So now you are talking about a unique data storage that rsyslog doesn't >>>> currently support. >>>> >>>> Development of that should be it's own project. >>>> >>>> >>> Yes, that is true. But assuming the output module required for that out >>> of >>> the scope of this discussion, it doesn't matter that much. >>> >>> But you are right, its usefulness in absence of integration with such a >>> data-store is limited. >>> >>> However, HDFS can be considered such a data-store. All output-modules >>> need >>> to log to the same directory, but in different files. Idempotency will be >>> taken care of in reduce-phase of data processing. >>> >>> >>> >>>> * We provide at-least-once delivery guarantee, which means datastore >>>> >>>>> should have a way to identify duplicates either during ingestion, or >>>>> during >>>>> serving reads. While multiple solutions are possible here, a rather >>>>> simple >>>>> scheme that comes to my mind: hash the message by some key (say >>>>> session_id) >>>>> and send it to the same machine in data-store cluster, and maintain >>>>> session-level idempotent-ingestion mechanism (with fixed sliding >>>>> window of >>>>> just sequence number ranges, use range-representation to make it >>>>> efficient) >>>>> on that node. >>>>> >>>>> >>>> you can't have your farm of tier-1 nodes picking the unique id for the >>>> message, coordinating that would be too much overhead. Any unique id for >>>> the message should be per-sender, and it would need to have sender-id, >>>> generation-id and sequence-id (you can reliably store a generation-id >>>> and >>>> not re-use it if you restart, but trying to reliably store a sequence-id >>>> gets you back to the fsync problem, so you accept that you will re-use >>>> sequence-ids on the same machine, but make sure you will never use the >>>> same >>>> sequence-id with the same generation-id on the same machine) >>>> >>>> >>> No, cluster doesn't generate session_ids. Session_ids are random-uuids >>> generated by message-producer. >>> >> >> Ok. >> >> >> - Replication topology differences >>>> >>>>> * Replication topology can be of 2 types, as you correctly pointed >>>>> out: >>>>> K-way fan-out vs hop-by-hop with K hops. >>>>> * K-way fanout design(using omrelp, or imrelp) that RELP based design >>>>> adopts, requires another complex component, which in case of >>>>> node-failure, >>>>> needs to decide what the surviving replica must do. >>>>> Such a component must be aware of placement of replicas, must be able >>>>> to >>>>> elect master and allow master to propagate changes further, and must >>>>> re-run >>>>> master-election in case one of the replicas is lost. This requires >>>>> distributed coordination which is a hard problem, as you pointed out >>>>> earlier. Not just this, it must elect new replica as well from a pool >>>>> to >>>>> replace the lost replica, as replication factor would have fallen by 1. >>>>> >>>>> >>>> there is a huge advantage in having the cluster maintain it's state and >>>> control membership rather than having the things sending the cluster >>>> maintain that state. >>>> >>>> First, you will have a lot fewer machines in the cluster than sending to >>>> the cluster. >>>> >>>> Second, determining if a machine is healthy is a lot more than can you >>>> connect to it, and that sort of knowledge can only be determined from >>>> inside the machines of the cluster, so you either have the cluster >>>> running >>>> health checks and reporting them to the clients for them to decide if >>>> the >>>> node is healthy enough to use, or you have the cluster running health >>>> checks and reporting it to the cluster management software on the >>>> cluster. >>>> >>>> Third, having multiple views of what machines are in the cluster (one >>>> per >>>> client) is a recipe for confusion as the different clients can end up >>>> with >>>> different opinions on what nodes are healty. >>>> >>>> >>> Membership, yes. The zookeeper (ephemeral znodes) based discovery >>> solution >>> that I mentioned was one of the ways I was thinking of solving that. >>> Corosync based solution was another way. >>> >>> Membership info is essential, so all producers can easily discover Tier-1 >>> nodes, Tier-1 nodes can discover Tier-2 nodes etc. My contention is with >>> having to manage failed nodes, re-replicate data on failure discovery, >>> identify replacement node centrally etc. In event of network partitions, >>> the way such systems fail becomes much worse with complex cluster >>> management techniques. Self-organizing techniques in my observation work >>> better (fewer nodes talking, fewer failure modes). >>> >> >> I agree that fewere nodes talking, fewer failure modes. That's why I'm >> saying that having the clients figure this out is wrong. It needs to be >> done by the much smaller number of nodes in the cluster, not by every >> machine that could send data to them. >> > > From systems that I have some experience with, having dedicated controller > nodes for making distributed systems decisions allows arbitrary complexity > in decision making. It eventually leads to undesirable complexity and more > types of failure as a distributed-system. > > I have been trying to work towards a design where there is no requirement > for a separate decision-maker. This obviously works when each node in > isolation can make decisions, which forces us to keep the decision-making > simple. > > In this case, all it needs to know (from a discovery mechanism or > otherwise), is how to reach some machine in the next tier (or 1st tier of > the next system, for inter-system case). It doesn't need to know or work > with complexity of is all data K-way replicated, what data is not, which > node should it be replicated to, which of these nodes must send data > downstream, how to agree upon which node will make such decisions, how to > failover to some other node in event of network-partition of various types > etc. > > Using tiers for replication is a way to achieve coordinator-free design. > > Clients (as in message producers) are nothing more than a general upstream > system (as in the sequence diagrams). They are just a single-replica > system(with disk based message ingestion, but single replica at node level > anyway), so the quicker we move data out of them, the better. > > >> >> * Impact on solution: Burstability needs to be built before datastore. >>>> >>>>> Between replicated queues vs. backpressure and backlog-buildup on >>>>> producer, >>>>> replicated queues are preferable for redundancy reasons. >>>>> >>>>> >>>> assuming that your distributed datastore allows you to feed the same >>>> message to it multiple times and have it weed out duplicates. I think >>>> that >>>> this is a much harder problem than you are thinking. >>>> >>>> >>> Yes, it is certainly hard. But is there a way to avoid it? Regardless of >>> what we do, that problem has to be solved. >>> >> >> only if multiple deliveries are common. >> > > Well, that depends on guarantees that users of this data desire. For audit > kind of usecase for some important transactions for instance, even one > duplicate will be undesirable. > > But did you have a scheme in mind which reduces the duplicate problem > enough for us to be able to ignore it (like 1 in a million, which can be > handed by visual inspection if the data is ever refereed to)? > > >> >> Where are the bottlenecks in RELP? it does allow a window of X messages >>>> to >>>> be waiting for acks. Does that window need to be larger? Do we need to >>>> allow for multiple acks to be sent as one message? >>>> >>>> your approach assumes that if message 45 is acked, then messages 1-44 >>>> are >>>> also acked, not necessarily a valid assumption. >>>> >>>> >>> No, it doesn't assume that. It expects acks to work in terms of ranges. >>> 1 - >>> 30 may not be acked, 31 - 45 may be. The state-machine will force retry >>> on >>> ack-wait timeout. Eventually, it'll prevent sliding window from moving >>> forward, so it will have to be handled. >>> >> >> so your windows are a micro-optimization over a list of acked messages. >> Not a significant architectural issue. > > > Agree. > > >> >> >> * In hop-by-hop replication topology, we care about ACKs flowing >>>> >>>>> end-to-end, so additional reliability between each hop doesn't add a >>>>> lot >>>>> of >>>>> value. >>>>> >>>>> >>>> you are again arguing based on your solution rather than the problem. >>>> You >>>> are also contradicting yourself >>>> >>>> If you are doing end-to-end acks, then the sender cannot forget the >>>> message until the final destination acks. >>>> >>>> If you are doing hop-by-hop acks, then the sender can forget the message >>>> as soon as the it is reliably at the next tier. >>>> >>>> which is it? >>>> >>>> >>> Sorry, I think I should have done a better job of explaining inter-system >>> ack mechanism upfront. >>> >>> It is end-to-end acking, but end-to-end is not first-system to >>> last-system, >>> it is end to end within the cluster. >>> >>> Assume multiple such clusters, A, B, C and D. >>> >>> Let us say message flows from A -> B -> C -> D. >>> >>> We expect each of these has capability to keep messages replicated and >>> safe >>> as long as next system doesn't confirm that it has replicated it to >>> desired >>> level. >>> >>> Once C has replicated it sufficiently, B can discard it. C has multiple >>> tiers, and acks flow the two watermark values from last tier to first >>> tier. >>> >>> C's internal ack flow is what I mean by end-to-end ack flow. >>> >>> If we assume A, B, C and D to be single node systems, then you can think >>> if >>> this as next-hop acking (not end-to-end acking). >>> >>> The sequence-diagrams I shared mark these systems, A, B and C in pink, >>> grey >>> and blue respectively (there is no D in that). >>> >>> I think if you look at how acks are used to purge old messages (shown in >>> black on ack arrow) across systems in the diagrams, that may explain it >>> better than the text here. >>> >> >> so that is what I consider hop-by-hop acks, not end-to-end acks, and >> other than the fact that it's going to multiple machines, this is exactly >> what RELP does today. >> >> If you were to modify imrelp so that it didn't send it's ack until it had >> sent the message out to all the other machines in the cluster, you would >> have exactly what you are talking about implementing, right? >> > > Well, RELP or something else here is not a critical decision the way I am > seeing it. > > Consider this, we are talking about building ack-flow across queues, > backwards. If this is built, it'll allow any protocol for input/output to > utilize it. RELP can utilize it out of the box (the contract won't be the > same as it stand today though, as it'll be acking on completion of > replication and completion of delivery rather than on reaching the input > bound queue). But other protocols like ptcp will be able to utilize it > well. Its upto the implementer of input/output module to decide if they > want to support this or not. > > If we address throughput limitations of RELP, we should be able to use it > just fine. Or we can enhance ptcp to support this. RELP is still a notch > ahead in functionallity, because it is capable of acking each message, and > is capable of acking on delivery to input-bound-queue, but the two are not > quite the same thing. > > >> >> * Given wide-enough sliding-windows and pipelining (one of my original >>>> >>>>> sequence diagrams talk about pipelining, I don't show it in every >>>>> diagram >>>>> for brevity reasons), anything built over ptcp will run as close to >>>>> un-acked speed as possible. >>>>> * Given its TCP, and we have last-hop-to-source L7 acks, we can't >>>>> really >>>>> lose an intermediate message. So per message ACKs are not valuable in >>>>> this >>>>> case. >>>>> >>>>> >>>> only if all messages follow the same path. If some messages go to >>>> machine >>>> (tier-node) 1-1, 2-1, 3 and other messages go 1-1, 2-2, 3, and yet >>>> others >>>> go 1-2, 2-1, 3 you no longer have a single TCP flow and so message will >>>> not >>>> necessarily arrive in the same order >>>> >>>> >>> The need not arrive in the same order. It doesn't matter because we >>> aren't >>> making any ordering guarantees. As long as all messages come through >>> between host from Tier-n to Tier-n+1, we are ok. >>> >> >> It's sounding more and more like you are talking about a modified version >> of RELP, with the added requirement to replicate across machines before >> sending the ack. > > > A modified version of RELP will do. > > >> >> >> I agree that implementing it in Rsyslog will be significant amount of >>>> work >>>> >>>>> (but it'll be significant amount of work even if its implemented in a >>>>> dedicated ingestion-relay that only does message movement, nothing >>>>> else). >>>>> In my head the counter-weight is: >>>>> - Same interface for users (they talk rulesets, actions, plugins etc >>>>> across >>>>> ack'ed and un-ack'ed pipelines >>>>> - Capability gain. Being able to work at very large scale with >>>>> efficiency >>>>> using syslog. >>>>> >>>>> >>>> How much complexity will it add to rsyslog? and how many people would >>>> actually use this architecture? How close can we come to achieving the >>>> results without having to change the fundamental way that rsyslog works? >>>> >>>> Specifically, the fact that rsyslog queues are one-directional. An input >>>> is finished processing a message once it's added to the queue. >>>> Implementing >>>> end-to-end acks requires that the input still keep track of the sender >>>> until after the output has processed the message so that the ack can >>>> flow >>>> back upstream to the sender. >>>> >>>> I want us to figure out what we can do without breaking this fundamental >>>> design factor of rsyslog. >>>> >>>> >>>> >>>> thinking about it a bit more, RELP between nodes may not be needed, >>>> because you are already saying that if a machine hiccups, you are >>>> willing >>>> to loose all messages that machine is processing. So if you just use >>>> TCP as >>>> your transport, as long as you don't have any SPOF between your sender >>>> and >>>> next hop recipients that could cut all your connections at once, you can >>>> count on that redundancy to protect you. If you don't guarentee that >>>> there >>>> is no SPOF, then you do need an application level ack and we are back to >>>> RELP. >>>> >>>> >>> I agree. The provisioning system will have to be smart to provision nodes >>> from different availability zones, different chassis etc. >>> >> >> it's messier than that makes it sound >> > > Like I mentioned earlier, its as simple as allocating each tier of a > cluster in a separate availability zone. > > The allocation-stack needs to be smart enough to have concept of > availability zones, but not much more. > > >> >> The sender can already craft and add a sequence-id and it's hostname can >>>> be sender-id. Generation-id is a bit harder, but could be done in a >>>> wrapper >>>> around rsyslog or by (mis)using the table lookup mechanism to read a >>>> value >>>> from disk after rsyslog starts. Generating a unique sequence-id will >>>> slow >>>> the throughput of rsyslog because it requires coordination between >>>> threads, >>>> but a modification to rsyslog to have the output module create a >>>> sequence-id where the bottom bits of the id aren't incramented, but >>>> instead >>>> are the thread-id would address this performance bottleneck. >>>> >>>> >>> I was thinking of both session_id and sequence_id being handled by >>> producer. >>> >>> Say producer is a multi-threaded system. Each logging thread can >>> generate a >>> random-uuid and keep it as a thread-local. All logging uses it as >>> session_id, and each thread has a thread-local counter, which serves as >>> seq_id, gets incremented for each log etc. >>> >>> That way all that complexity will stay out of rsyslog. >>> >> >> Well, if it's in the message delivered to rsyslog in the first place, >> then rsyslog doesn't care about it. >> >> But as I say, I think it's very reasonable to have the leaf rsyslog node >> generate the data. >> > > The reason I think it should be with source-system: it allows them to > control order. We may delivery it out of order(so a newer event may become > visible before an older one) to the data-store, but if they have an option > to see it in order, its valuable to let them decide the order too. > > >> >> I think that the biggest bottleneck that you would end up running into >>>> would be the formatting of the output, especially if using JSON >>>> formatting >>>> so that you can send the sequence-id independently of the original >>>> message >>>> contents. >>>> >>> >>> >>> Yes, in some cases it may be easier to change functional definition to >>> have >>> a wrapper which carries this non-functional stuff too. >>> >> >> that doesn't solve the problem, it just shifts it slightly. Something >> still needs to take the provided data and the metadata and create a >> sequence of bytes that will be sent over the wire. >> > > Yes, this was due to a confusion, my bad. I assumed you meant data that is > sent from last tier to data-store. > > >> >> Rsyslog has the ability to have string modules (sm*) that do this in C >> code. It used to be that all of the standard message formats were created >> using the template engine, but after I raised the issue, we found that >> optimizing the common formats down to C was enough of a win to result in a >> ~10% improvement in rsyslog's overall performance. >> >> I think we are going to find that we really should do something along the >> same lines for JSON output. And if you opt to send the data in some other >> structure, you will need to do the same for your format. > > > Sorry, I lost you on this one. Can you elaborate a little about why JSON > is relevant? > > >> >> >> David Lang >> _______________________________________________ >> rsyslog mailing list >> http://lists.adiscon.net/mailman/listinfo/rsyslog >> http://www.rsyslog.com/professional-services/ >> What's up with rsyslog? Follow https://twitter.com/rgerhards >> NOTE WELL: This is a PUBLIC mailing list, posts are ARCHIVED by a myriad >> of sites beyond our control. PLEASE UNSUBSCRIBE and DO NOT POST if you >> DON'T LIKE THAT. >> > > > > -- > Regards, > Janmejay > http://codehunk.wordpress.com > _______________________________________________ rsyslog mailing list http://lists.adiscon.net/mailman/listinfo/rsyslog http://www.rsyslog.com/professional-services/ What's up with rsyslog? Follow https://twitter.com/rgerhards NOTE WELL: This is a PUBLIC mailing list, posts are ARCHIVED by a myriad of sites beyond our control. PLEASE UNSUBSCRIBE and DO NOT POST if you DON'T LIKE THAT.

