On Fri, 30 Jan 2015, singh.janmejay wrote:

David, can you clarify the JSON thing?

what I do on my relay boxes is

set $!msg = $msg;
set $!trusted!sourceip = $fromhost-ip;
set $!trusted!relay = $$myhostname;
set $!trusted!environ = 'production';
etc..

this adds metadata to the log messages, but leaves me with the ability to output the traditional message format easily.

This approach would let a rsyslog server receive a standard message from an application and add a sequence-id etc to it without disrupting the message

David Lang


--
Regards,
Janmejay

PS: Please blame the typos in this mail on my phone's uncivilized soft
keyboard sporting it's not-so-smart-assist technology.

On Jan 25, 2015 11:42 PM, "singh.janmejay" <[email protected]> wrote:



On Sun, Jan 25, 2015 at 5:09 AM, David Lang <[email protected]> wrote:

On Sun, 25 Jan 2015, singh.janmejay wrote:

 if you want true guarantees, then you need to replicate your data to
storage. If you are willing to settle for statistically reliable, then
replicating to RAM on multiple machines may be 'good enough', but it's
not
the same.


Im working with a simple definition of reliability. If something goes
wrong, be it rsyslog crash, hardware failure or anything in between, as
far
as the cluster is concerned, its a node loss. The cluster should be able
to
tolerate loss of upto K nodes without losing data.


I'm looking a little wider than that. I'm looking at what happens if you
loose a chunk of network or all the power in an area of the datacenter.
Things that can cause you to 'loose' lots of nodes at one time.


Yes, any more than K-nodes loss can cause data-loss.

You point out later that its easier said than done at a cloud provisioning
level. I had a simple scheme in mind, obviously a bigger outage radius will
still mess it up, but I think its worth putting it out.

I was thinking of each tier of the cluster being built using a different
availability-zone. So power problems, minor disaster scenarios(limited to
availability zone) etc will not cause any data loss, even though it'll
pause data-movement completely if entire availability-zone is lost (until
the tier is partly rebuilt using replacements).

Basically, whatever is deleted by upstream is already replicated across
tiers, so single tier loss can't kill it.




  - Serviceability concerns:

 * One of the constraints large-scale puts is you can't handle problems
manually anymore. One of the economical things in a cloud for
instance, is
to terminate and start a new instance as first response to any trouble,
and
using online diagnostics to find machines with problems and fix them
out
of
band.


this causes data loss if there is any data unique to that machine, be it
in ram or on disk


No, it doesn't. It causes node-loss, but once the upper-tier, or
upper-system detects the loss, it re-floods un-delivered messages to the
replacement. The last 2 sequence-diagrams in the first mail walk through
2
ways of recovering from node-failures without any data-loss.


imprecise wording on my part, I meant that there is no attempt to
preserve that data so that the node will send it later. That _copy_ of the
data is lost.


True, writing to disk I think is a valid option, and a valuable one when
trade-off is acceptable. But in my opinion it shouldn't be the only option
(I don't think you are proposing it as the only option either).



  - Impact of message modification

 * The message routing and ingestion into the datastore is transparent
to
user. So by definition, no elements of routing path must leak into
functional representation of the message. So user will not know, or
care,
about what nodes the messages traversed.


as an admin of the system I may want to know what nodes it traversed,
even
if the person generating the message doesn't. If you have some messages
being delayed significantly or corrupted, it's really helpful to have
the
messages tagged with the nodes they traverse so you can see what node is
likely to be having problems.


Yes, but then you would rather depend on failure-logs from cluster nodes,
and stats around lag across tiers like lag-percentiles etc. Isn't it?


where do you get those stats? you have to have info to extract those
stats from.


Oh, I thought you were talking about output that goes to the data-store.

For messages that get exchanged between tiers and even systems, we'll have
to figure out some way of carrying session_id, seq_no etc.



 Also, even with routing info being added to the message,
idempotent-ingestion-layer can choose to store all duplicate message's
metadata in a way which allows such reporting.


I'm saying that you have to have the metadata being passed around in the
first place. This means that you have to add info to what's being passed
around.


Agree.




  - Kind of complexity that exists in writing/reading from the storage
tier

 * With data volume of a few million messages per second, a distributed
datastore is anyway necessary (no single-instance datastore built for
commodity hardware can ingest data at that rate).


So now you are talking about a unique data storage that rsyslog doesn't
currently support.

Development of that should be it's own project.


Yes, that is true.  But assuming the output module required for that out
of
the scope of this discussion, it doesn't matter that much.

But you are right, its usefulness in absence of integration with such a
data-store is limited.

However, HDFS can be considered such a data-store. All output-modules
need
to log to the same directory, but in different files. Idempotency will be
taken care of in reduce-phase of data processing.



  * We provide at-least-once delivery guarantee, which means datastore

should have a way to identify duplicates either during ingestion, or
during
serving reads. While multiple solutions are possible here, a rather
simple
scheme that comes to my mind: hash the message by some key (say
session_id)
and send it to the same machine in data-store cluster, and maintain
session-level idempotent-ingestion mechanism (with fixed sliding
window of
just sequence number ranges, use range-representation to make it
efficient)
on that node.


you can't have your farm of tier-1 nodes picking the unique id for the
message, coordinating that would be too much overhead. Any unique id for
the message should be per-sender, and it would need to have sender-id,
generation-id and sequence-id (you can reliably store a generation-id
and
not re-use it if you restart, but trying to reliably store a sequence-id
gets you back to the fsync problem, so you accept that you will re-use
sequence-ids on the same machine, but make sure you will never use the
same
sequence-id with the same generation-id on the same machine)


No, cluster doesn't generate session_ids. Session_ids are random-uuids
generated by message-producer.


Ok.


  - Replication topology differences

 * Replication topology can be of 2 types, as you correctly pointed
out:
K-way fan-out vs hop-by-hop with K hops.
 * K-way fanout design(using omrelp, or imrelp) that RELP based design
adopts, requires another complex component, which in case of
node-failure,
needs to decide what the surviving replica must do.
Such a component must be aware of placement of replicas, must be able
to
elect master and allow master to propagate changes further, and must
re-run
master-election in case one of the replicas is lost. This requires
distributed coordination which is a hard problem, as you pointed out
earlier. Not just this, it must elect new replica as well from a pool
to
replace the lost replica, as replication factor would have fallen by 1.


there is a huge advantage in having the cluster maintain it's state and
control membership rather than having the things sending the cluster
maintain that state.

First, you will have a lot fewer machines in the cluster than sending to
the cluster.

Second, determining if a machine is healthy is a lot more than can you
connect to it, and that sort of knowledge can only be determined from
inside the machines of the cluster, so you either have the cluster
running
health checks and reporting them to the clients for them to decide if
the
node is healthy enough to use, or you have the cluster running health
checks and reporting it to the cluster management software on the
cluster.

Third, having multiple views of what machines are in the cluster (one
per
client) is a recipe for confusion as the different clients can end up
with
different opinions on what nodes are healty.


Membership, yes. The zookeeper (ephemeral znodes) based discovery
solution
that I mentioned was one of the ways I was thinking of solving that.
Corosync based solution was another way.

Membership info is essential, so all producers can easily discover Tier-1
nodes, Tier-1 nodes can discover Tier-2 nodes etc. My contention is with
having to manage failed nodes, re-replicate data on failure discovery,
identify replacement node centrally etc. In event of network partitions,
the way such systems fail becomes much worse with complex cluster
management techniques. Self-organizing techniques in my observation work
better (fewer nodes talking, fewer failure modes).


I agree that fewere nodes talking, fewer failure modes. That's why I'm
saying that having the clients figure this out is wrong. It needs to be
done by the much smaller number of nodes in the cluster, not by every
machine that could send data to them.


From systems that I have some experience with, having dedicated controller
nodes for making distributed systems decisions allows arbitrary complexity
in decision making. It eventually leads to undesirable complexity and more
types of failure as a distributed-system.

I have been trying to work towards a design where there is no requirement
for a separate decision-maker. This obviously works when each node in
isolation can make decisions, which forces us to keep the decision-making
simple.

In this case, all it needs to know (from a discovery mechanism or
otherwise), is how to reach some machine in the next tier (or 1st tier of
the next system, for inter-system case). It doesn't need to know or work
with complexity of is all data K-way replicated, what data is not, which
node should it be replicated to, which of these nodes must send data
downstream, how to agree upon which node will make such decisions, how to
failover to some other node in event of network-partition of various types
etc.

Using tiers for replication is a way to achieve coordinator-free design.

Clients (as in message producers) are nothing more than a general upstream
system (as in the sequence diagrams). They are just a single-replica
system(with disk based message ingestion, but single replica at node level
anyway), so the quicker we move data out of them, the better.



   * Impact on solution: Burstability needs to be built before datastore.

Between replicated queues vs. backpressure and backlog-buildup on
producer,
replicated queues are preferable for redundancy reasons.


assuming that your distributed datastore allows you to feed the same
message to it multiple times and have it weed out duplicates. I think
that
this is a much harder problem than you are thinking.


Yes, it is certainly hard. But is there a way to avoid it? Regardless of
what we do, that problem has to be solved.


only if multiple deliveries are common.


Well, that depends on guarantees that users of this data desire. For audit
kind of usecase for some important transactions for instance, even one
duplicate will be undesirable.

But did you have a scheme in mind which reduces the duplicate problem
enough for us to be able to ignore it (like 1 in a million, which can be
handed by visual inspection if the data is ever refereed to)?



 Where are the bottlenecks in RELP? it does allow a window of X messages
to
be waiting for acks. Does that window need to be larger? Do we need to
allow for multiple acks to be sent as one message?

your approach assumes that if message 45 is acked, then messages 1-44
are
also acked, not necessarily a valid assumption.


No, it doesn't assume that. It expects acks to work in terms of ranges.
1 -
30 may not be acked, 31 - 45 may be. The state-machine will force retry
on
ack-wait timeout. Eventually, it'll prevent sliding window from moving
forward, so it will have to be handled.


so your windows are a micro-optimization over a list of acked messages.
Not a significant architectural issue.


Agree.




   * In hop-by-hop replication topology, we care about ACKs flowing

end-to-end, so additional reliability between each hop doesn't add a
lot
of
value.


you are again arguing based on your solution rather than the problem.
You
are also contradicting yourself

If you are doing end-to-end acks, then the sender cannot forget the
message until the final destination acks.

If you are doing hop-by-hop acks, then the sender can forget the message
as soon as the it is reliably at the next tier.

which is it?


Sorry, I think I should have done a better job of explaining inter-system
ack mechanism upfront.

It is end-to-end acking, but end-to-end is not first-system to
last-system,
it is end to end within the cluster.

Assume multiple such clusters, A, B, C and D.

Let us say message flows from A -> B -> C -> D.

We expect each of these has capability to keep messages replicated and
safe
as long as next system doesn't confirm that it has replicated it to
desired
level.

Once C has replicated it sufficiently, B can discard it. C has multiple
tiers, and acks flow the two watermark values from last tier to first
tier.

C's internal ack flow is what I mean by end-to-end ack flow.

If we assume A, B, C and D to be single node systems, then you can think
if
this as next-hop acking (not end-to-end acking).

The sequence-diagrams I shared mark these systems, A, B and C in pink,
grey
and blue respectively (there is no D in that).

I think if you look at how acks are used to purge old messages (shown in
black on ack arrow) across systems in the diagrams, that may explain it
better than the text here.


so that is what I consider hop-by-hop acks, not end-to-end acks, and
other than the fact that it's going to multiple machines, this is exactly
what RELP does today.

If you were to modify imrelp so that it didn't send it's ack until it had
sent the message out to all the other machines in the cluster, you would
have exactly what you are talking about implementing, right?


Well, RELP or something else here is not a critical decision the way I am
seeing it.

Consider this, we are talking about building ack-flow across queues,
backwards. If this is built, it'll allow any protocol for input/output to
utilize it. RELP can utilize it out of the box (the contract won't be the
same as it stand today though, as it'll be acking on completion of
replication and completion of delivery rather than on reaching the input
bound queue). But other protocols like ptcp will be able to utilize it
well. Its upto the implementer of input/output module to decide if they
want to support this or not.

If we address throughput limitations of RELP, we should be able to use it
just fine. Or we can enhance ptcp to support this. RELP is still a notch
ahead in functionallity, because it is capable of acking each message, and
is capable of acking on delivery to input-bound-queue, but the two are not
quite the same thing.



   * Given wide-enough sliding-windows and pipelining (one of my original

sequence diagrams talk about pipelining, I don't show it in every
diagram
for brevity reasons), anything built over ptcp will run as close to
un-acked speed as possible.
 * Given its TCP, and we have last-hop-to-source L7 acks, we can't
really
lose an intermediate message. So per message ACKs are not valuable in
this
case.


only if all messages follow the same path. If some messages go to
machine
(tier-node) 1-1, 2-1, 3 and other messages go 1-1, 2-2, 3, and yet
others
go 1-2, 2-1, 3 you no longer have a single TCP flow and so message will
not
necessarily arrive in the same order


The need not arrive in the same order. It doesn't matter because we
aren't
making any ordering guarantees. As long as all messages come through
between host from Tier-n to Tier-n+1, we are ok.


It's sounding more and more like you are talking about a modified version
of RELP, with the added requirement to replicate across machines before
sending the ack.


A modified version of RELP will do.




  I agree that implementing it in Rsyslog will be significant amount of
work

(but it'll be significant amount of work even if its implemented in a
dedicated ingestion-relay that only does message movement, nothing
else).
In my head the counter-weight is:
- Same interface for users (they talk rulesets, actions, plugins etc
across
ack'ed and un-ack'ed pipelines
- Capability gain. Being able to work at very large scale with
efficiency
using syslog.


How much complexity will it add to rsyslog? and how many people would
actually use this architecture? How close can we come to achieving the
results without having to change the fundamental way that rsyslog works?

Specifically, the fact that rsyslog queues are one-directional. An input
is finished processing a message once it's added to the queue.
Implementing
end-to-end acks requires that the input still keep track of the sender
until after the output has processed the message so that the ack can
flow
back upstream to the sender.

I want us to figure out what we can do without breaking this fundamental
design factor of rsyslog.



thinking about it a bit more, RELP between nodes may not be needed,
because you are already saying that if a machine hiccups, you are
willing
to loose all messages that machine is processing. So if you just use
TCP as
your transport, as long as you don't have any SPOF between your sender
and
next hop recipients that could cut all your connections at once, you can
count on that redundancy to protect you. If you don't guarentee that
there
is no SPOF, then you do need an application level ack and we are back to
RELP.


I agree. The provisioning system will have to be smart to provision nodes
from different availability zones, different chassis etc.


it's messier than that makes it sound


Like I mentioned earlier, its as simple as allocating each tier of a
cluster in a separate availability zone.

The allocation-stack needs to be smart enough to have concept of
availability zones, but not much more.



 The sender can already craft and add a sequence-id and it's hostname can
be sender-id. Generation-id is a bit harder, but could be done in a
wrapper
around rsyslog or by (mis)using the table lookup mechanism to read a
value
from disk after rsyslog starts. Generating a unique sequence-id will
slow
the throughput of rsyslog because it requires coordination between
threads,
but a modification to rsyslog to have the output module create a
sequence-id where the bottom bits of the id aren't incramented, but
instead
are the thread-id would address this performance bottleneck.


I was thinking of both session_id and sequence_id being handled by
producer.

Say producer is a multi-threaded system. Each logging thread can
generate a
random-uuid and keep it as a thread-local. All logging uses it as
session_id, and each thread has a thread-local counter, which serves as
seq_id, gets incremented for each log etc.

That way all that complexity will stay out of rsyslog.


Well, if it's in the message delivered to rsyslog in the first place,
then rsyslog doesn't care about it.

But as I say, I think it's very reasonable to have the leaf rsyslog node
generate the data.


The reason I think it should be with source-system: it allows them to
control order. We may delivery it out of order(so a newer event may become
visible before an older one) to the data-store, but if they have an option
to see it in order, its valuable to let them decide the order too.



 I think that the biggest bottleneck that you would end up running into
would be the formatting of the output, especially if using JSON
formatting
so that you can send the sequence-id independently of the original
message
contents.



Yes, in some cases it may be easier to change functional definition to
have
a wrapper which carries this non-functional stuff too.


that doesn't solve the problem, it just shifts it slightly. Something
still needs to take the provided data and the metadata and create a
sequence of bytes that will be sent over the wire.


Yes, this was due to a confusion, my bad. I assumed you meant data that is
sent from last tier to data-store.



Rsyslog has the ability to have string modules (sm*) that do this in C
code. It used to be that all of the standard message formats were created
using the template engine, but after I raised the issue, we found that
optimizing the common formats down to C was enough of a win to result in a
~10% improvement in rsyslog's overall performance.

I think we are going to find that we really should do something along the
same lines for JSON output. And if you opt to send the data in some other
structure, you will need to do the same for your format.


Sorry, I lost you on this one. Can you elaborate a little about why JSON
is relevant?




David Lang
_______________________________________________
rsyslog mailing list
http://lists.adiscon.net/mailman/listinfo/rsyslog
http://www.rsyslog.com/professional-services/
What's up with rsyslog? Follow https://twitter.com/rgerhards
NOTE WELL: This is a PUBLIC mailing list, posts are ARCHIVED by a myriad
of sites beyond our control. PLEASE UNSUBSCRIBE and DO NOT POST if you
DON'T LIKE THAT.




--
Regards,
Janmejay
http://codehunk.wordpress.com

_______________________________________________
rsyslog mailing list
http://lists.adiscon.net/mailman/listinfo/rsyslog
http://www.rsyslog.com/professional-services/
What's up with rsyslog? Follow https://twitter.com/rgerhards
NOTE WELL: This is a PUBLIC mailing list, posts are ARCHIVED by a myriad of 
sites beyond our control. PLEASE UNSUBSCRIBE and DO NOT POST if you DON'T LIKE 
THAT.

_______________________________________________
rsyslog mailing list
http://lists.adiscon.net/mailman/listinfo/rsyslog
http://www.rsyslog.com/professional-services/
What's up with rsyslog? Follow https://twitter.com/rgerhards
NOTE WELL: This is a PUBLIC mailing list, posts are ARCHIVED by a myriad of 
sites beyond our control. PLEASE UNSUBSCRIBE and DO NOT POST if you DON'T LIKE 
THAT.

Reply via email to