On Sun, 25 Jan 2015, singh.janmejay wrote:

if you want true guarantees, then you need to replicate your data to
storage. If you are willing to settle for statistically reliable, then
replicating to RAM on multiple machines may be 'good enough', but it's not
the same.


Im working with a simple definition of reliability. If something goes
wrong, be it rsyslog crash, hardware failure or anything in between, as far
as the cluster is concerned, its a node loss. The cluster should be able to
tolerate loss of upto K nodes without losing data.

I'm looking a little wider than that. I'm looking at what happens if you loose a chunk of network or all the power in an area of the datacenter. Things that can cause you to 'loose' lots of nodes at one time.

 - Serviceability concerns:
 * One of the constraints large-scale puts is you can't handle problems
manually anymore. One of the economical things in a cloud for instance, is
to terminate and start a new instance as first response to any trouble,
and
using online diagnostics to find machines with problems and fix them out
of
band.


this causes data loss if there is any data unique to that machine, be it
in ram or on disk


No, it doesn't. It causes node-loss, but once the upper-tier, or
upper-system detects the loss, it re-floods un-delivered messages to the
replacement. The last 2 sequence-diagrams in the first mail walk through 2
ways of recovering from node-failures without any data-loss.

imprecise wording on my part, I meant that there is no attempt to preserve that data so that the node will send it later. That _copy_ of the data is lost.

 - Impact of message modification
 * The message routing and ingestion into the datastore is transparent to
user. So by definition, no elements of routing path must leak into
functional representation of the message. So user will not know, or care,
about what nodes the messages traversed.


as an admin of the system I may want to know what nodes it traversed, even
if the person generating the message doesn't. If you have some messages
being delayed significantly or corrupted, it's really helpful to have the
messages tagged with the nodes they traverse so you can see what node is
likely to be having problems.


Yes, but then you would rather depend on failure-logs from cluster nodes,
and stats around lag across tiers like lag-percentiles etc. Isn't it?

where do you get those stats? you have to have info to extract those stats from.

Also, even with routing info being added to the message,
idempotent-ingestion-layer can choose to store all duplicate message's
metadata in a way which allows such reporting.

I'm saying that you have to have the metadata being passed around in the first place. This means that you have to add info to what's being passed around.

 - Kind of complexity that exists in writing/reading from the storage tier
 * With data volume of a few million messages per second, a distributed
datastore is anyway necessary (no single-instance datastore built for
commodity hardware can ingest data at that rate).


So now you are talking about a unique data storage that rsyslog doesn't
currently support.

Development of that should be it's own project.


Yes, that is true.  But assuming the output module required for that out of
the scope of this discussion, it doesn't matter that much.

But you are right, its usefulness in absence of integration with such a
data-store is limited.

However, HDFS can be considered such a data-store. All output-modules need
to log to the same directory, but in different files. Idempotency will be
taken care of in reduce-phase of data processing.



  * We provide at-least-once delivery guarantee, which means datastore
should have a way to identify duplicates either during ingestion, or
during
serving reads. While multiple solutions are possible here, a rather simple
scheme that comes to my mind: hash the message by some key (say
session_id)
and send it to the same machine in data-store cluster, and maintain
session-level idempotent-ingestion mechanism (with fixed sliding window of
just sequence number ranges, use range-representation to make it
efficient)
on that node.


you can't have your farm of tier-1 nodes picking the unique id for the
message, coordinating that would be too much overhead. Any unique id for
the message should be per-sender, and it would need to have sender-id,
generation-id and sequence-id (you can reliably store a generation-id and
not re-use it if you restart, but trying to reliably store a sequence-id
gets you back to the fsync problem, so you accept that you will re-use
sequence-ids on the same machine, but make sure you will never use the same
sequence-id with the same generation-id on the same machine)


No, cluster doesn't generate session_ids. Session_ids are random-uuids
generated by message-producer.

Ok.

 - Replication topology differences
 * Replication topology can be of 2 types, as you correctly pointed out:
K-way fan-out vs hop-by-hop with K hops.
 * K-way fanout design(using omrelp, or imrelp) that RELP based design
adopts, requires another complex component, which in case of node-failure,
needs to decide what the surviving replica must do.
Such a component must be aware of placement of replicas, must be able to
elect master and allow master to propagate changes further, and must
re-run
master-election in case one of the replicas is lost. This requires
distributed coordination which is a hard problem, as you pointed out
earlier. Not just this, it must elect new replica as well from a pool to
replace the lost replica, as replication factor would have fallen by 1.


there is a huge advantage in having the cluster maintain it's state and
control membership rather than having the things sending the cluster
maintain that state.

First, you will have a lot fewer machines in the cluster than sending to
the cluster.

Second, determining if a machine is healthy is a lot more than can you
connect to it, and that sort of knowledge can only be determined from
inside the machines of the cluster, so you either have the cluster running
health checks and reporting them to the clients for them to decide if the
node is healthy enough to use, or you have the cluster running health
checks and reporting it to the cluster management software on the cluster.

Third, having multiple views of what machines are in the cluster (one per
client) is a recipe for confusion as the different clients can end up with
different opinions on what nodes are healty.


Membership, yes. The zookeeper (ephemeral znodes) based discovery solution
that I mentioned was one of the ways I was thinking of solving that.
Corosync based solution was another way.

Membership info is essential, so all producers can easily discover Tier-1
nodes, Tier-1 nodes can discover Tier-2 nodes etc. My contention is with
having to manage failed nodes, re-replicate data on failure discovery,
identify replacement node centrally etc. In event of network partitions,
the way such systems fail becomes much worse with complex cluster
management techniques. Self-organizing techniques in my observation work
better (fewer nodes talking, fewer failure modes).

I agree that fewere nodes talking, fewer failure modes. That's why I'm saying that having the clients figure this out is wrong. It needs to be done by the much smaller number of nodes in the cluster, not by every machine that could send data to them.

  * Impact on solution: Burstability needs to be built before datastore.
Between replicated queues vs. backpressure and backlog-buildup on
producer,
replicated queues are preferable for redundancy reasons.


assuming that your distributed datastore allows you to feed the same
message to it multiple times and have it weed out duplicates. I think that
this is a much harder problem than you are thinking.


Yes, it is certainly hard. But is there a way to avoid it? Regardless of
what we do, that problem has to be solved.

only if multiple deliveries are common.

Where are the bottlenecks in RELP? it does allow a window of X messages to
be waiting for acks. Does that window need to be larger? Do we need to
allow for multiple acks to be sent as one message?

your approach assumes that if message 45 is acked, then messages 1-44 are
also acked, not necessarily a valid assumption.


No, it doesn't assume that. It expects acks to work in terms of ranges. 1 -
30 may not be acked, 31 - 45 may be. The state-machine will force retry on
ack-wait timeout. Eventually, it'll prevent sliding window from moving
forward, so it will have to be handled.

so your windows are a micro-optimization over a list of acked messages. Not a significant architectural issue.

  * In hop-by-hop replication topology, we care about ACKs flowing
end-to-end, so additional reliability between each hop doesn't add a lot
of
value.


you are again arguing based on your solution rather than the problem. You
are also contradicting yourself

If you are doing end-to-end acks, then the sender cannot forget the
message until the final destination acks.

If you are doing hop-by-hop acks, then the sender can forget the message
as soon as the it is reliably at the next tier.

which is it?


Sorry, I think I should have done a better job of explaining inter-system
ack mechanism upfront.

It is end-to-end acking, but end-to-end is not first-system to last-system,
it is end to end within the cluster.

Assume multiple such clusters, A, B, C and D.

Let us say message flows from A -> B -> C -> D.

We expect each of these has capability to keep messages replicated and safe
as long as next system doesn't confirm that it has replicated it to desired
level.

Once C has replicated it sufficiently, B can discard it. C has multiple
tiers, and acks flow the two watermark values from last tier to first tier.

C's internal ack flow is what I mean by end-to-end ack flow.

If we assume A, B, C and D to be single node systems, then you can think if
this as next-hop acking (not end-to-end acking).

The sequence-diagrams I shared mark these systems, A, B and C in pink, grey
and blue respectively (there is no D in that).

I think if you look at how acks are used to purge old messages (shown in
black on ack arrow) across systems in the diagrams, that may explain it
better than the text here.

so that is what I consider hop-by-hop acks, not end-to-end acks, and other than the fact that it's going to multiple machines, this is exactly what RELP does today.

If you were to modify imrelp so that it didn't send it's ack until it had sent the message out to all the other machines in the cluster, you would have exactly what you are talking about implementing, right?

  * Given wide-enough sliding-windows and pipelining (one of my original
sequence diagrams talk about pipelining, I don't show it in every diagram
for brevity reasons), anything built over ptcp will run as close to
un-acked speed as possible.
 * Given its TCP, and we have last-hop-to-source L7 acks, we can't really
lose an intermediate message. So per message ACKs are not valuable in this
case.


only if all messages follow the same path. If some messages go to machine
(tier-node) 1-1, 2-1, 3 and other messages go 1-1, 2-2, 3, and yet others
go 1-2, 2-1, 3 you no longer have a single TCP flow and so message will not
necessarily arrive in the same order


The need not arrive in the same order. It doesn't matter because we aren't
making any ordering guarantees. As long as all messages come through
between host from Tier-n to Tier-n+1, we are ok.

It's sounding more and more like you are talking about a modified version of RELP, with the added requirement to replicate across machines before sending the ack.

 I agree that implementing it in Rsyslog will be significant amount of work
(but it'll be significant amount of work even if its implemented in a
dedicated ingestion-relay that only does message movement, nothing else).
In my head the counter-weight is:
- Same interface for users (they talk rulesets, actions, plugins etc
across
ack'ed and un-ack'ed pipelines
- Capability gain. Being able to work at very large scale with efficiency
using syslog.


How much complexity will it add to rsyslog? and how many people would
actually use this architecture? How close can we come to achieving the
results without having to change the fundamental way that rsyslog works?

Specifically, the fact that rsyslog queues are one-directional. An input
is finished processing a message once it's added to the queue. Implementing
end-to-end acks requires that the input still keep track of the sender
until after the output has processed the message so that the ack can flow
back upstream to the sender.

I want us to figure out what we can do without breaking this fundamental
design factor of rsyslog.



thinking about it a bit more, RELP between nodes may not be needed,
because you are already saying that if a machine hiccups, you are willing
to loose all messages that machine is processing. So if you just use TCP as
your transport, as long as you don't have any SPOF between your sender and
next hop recipients that could cut all your connections at once, you can
count on that redundancy to protect you. If you don't guarentee that there
is no SPOF, then you do need an application level ack and we are back to
RELP.


I agree. The provisioning system will have to be smart to provision nodes
from different availability zones, different chassis etc.

it's messier than that makes it sound

The sender can already craft and add a sequence-id and it's hostname can
be sender-id. Generation-id is a bit harder, but could be done in a wrapper
around rsyslog or by (mis)using the table lookup mechanism to read a value
from disk after rsyslog starts. Generating a unique sequence-id will slow
the throughput of rsyslog because it requires coordination between threads,
but a modification to rsyslog to have the output module create a
sequence-id where the bottom bits of the id aren't incramented, but instead
are the thread-id would address this performance bottleneck.


I was thinking of both session_id and sequence_id being handled by
producer.

Say producer is a multi-threaded system. Each logging thread can generate a
random-uuid and keep it as a thread-local. All logging uses it as
session_id, and each thread has a thread-local counter, which serves as
seq_id, gets incremented for each log etc.

That way all that complexity will stay out of rsyslog.

Well, if it's in the message delivered to rsyslog in the first place, then rsyslog doesn't care about it.

But as I say, I think it's very reasonable to have the leaf rsyslog node generate the data.

I think that the biggest bottleneck that you would end up running into
would be the formatting of the output, especially if using JSON formatting
so that you can send the sequence-id independently of the original message
contents.


Yes, in some cases it may be easier to change functional definition to have
a wrapper which carries this non-functional stuff too.

that doesn't solve the problem, it just shifts it slightly. Something still needs to take the provided data and the metadata and create a sequence of bytes that will be sent over the wire.

Rsyslog has the ability to have string modules (sm*) that do this in C code. It used to be that all of the standard message formats were created using the template engine, but after I raised the issue, we found that optimizing the common formats down to C was enough of a win to result in a ~10% improvement in rsyslog's overall performance.

I think we are going to find that we really should do something along the same lines for JSON output. And if you opt to send the data in some other structure, you will need to do the same for your format.

David Lang
_______________________________________________
rsyslog mailing list
http://lists.adiscon.net/mailman/listinfo/rsyslog
http://www.rsyslog.com/professional-services/
What's up with rsyslog? Follow https://twitter.com/rgerhards
NOTE WELL: This is a PUBLIC mailing list, posts are ARCHIVED by a myriad of 
sites beyond our control. PLEASE UNSUBSCRIBE and DO NOT POST if you DON'T LIKE 
THAT.

Reply via email to