Is this replication an existing functionality or the new stuff that is planned to come?
On Tue, May 1, 2012 at 11:47 AM, Felix GV <fe...@mate1inc.com> wrote: > Ah, gotcha, so my usage of the term "in-memory replication" can be > misleading: Kafka still doesn't retain the data in-app (i.e.: in Kafka's > allocated memory), but the data is in-memory nonetheless because of the OS' > file system cache. > > Basically, on the individual node's level, this is not different from what > we already have (without KAFKA-50), but the fact that KAFKA-50 will give us > replication means that the data will reside in the OS' file system cache of > many nodes, giving us much more reliable durability guarantees. > > Thanks for the nitty gritty details Jay :) > > -- > Felix > > > > On Tue, May 1, 2012 at 1:51 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > > > Yes, that is correct. Technically we always immediately write to the > > filesystem, it is just a question of when you fsync the file (that is > > the slow thing). So though it is in memory it is not in application > > memory, so it always survives kill -9 but not unplugging the machine. > > Currently when a broker fails messages that are flushed to disk come > > back if the broker comes back with an intact filesystem (if the broker > > fs is destroyed then it is lost). With replication we retain this same > > flexibility on the flush policy, so you can flush every message to > > disk immediately if you like, however having the message on multiple > > machines is in some ways better durability then the fsync gives, as > > the message will survive destruction of the filesystem, so we think > > you can legitimately allow consumers to consume messages independent > > of the flush policy. > > > > Also when a broker fails it will lose unflushed messages, however when > > it comes back to life it will restore these messages from the other > > replicas before it will serve data to consumers. So the log will be > > byte-for-byte identical across all servers including both the contents > > and the ordering of messages. > > > > -Jay > > > > On Tue, May 1, 2012 at 9:24 AM, Felix GV <fe...@mate1inc.com> wrote: > > > Hmm... interesting! > > > > > > So, if I understanding correctly, what you're saying regarding point 2, > > is > > > that the messages are going to be kept in memory on several nodes, and > > > start being served to consumers as soon as this is completed, rather > than > > > after the data is flushed to disk? This way, we still benefit from the > > > throughput gain of flushing data to disk in batches, but we consider > that > > > the added durability of having in-memory replication is good enough to > > > start serving that data to consumers sooner. > > > > > > Furthermore, this means that in the unlikely event that several nodes > > would > > > fail simultaneously (a correlated failure), the data that is replicated > > to > > > the failed nodes but not yet flushed on any of them would be lost. > > However, > > > when a single node crashes and is then restarted, only the failed node > > will > > > have lost its unflushed data, while the other nodes that had replicated > > > that data will have had the opportunity to flush it to disk later on. > > > > > > Sorry if I'm repeating like a parrot. I just want to make sure I > > understand > > > correctly :) > > > > > > Please correct me if I'm not interpreting this correctly! > > > > > > -- > > > Felix > > > > > > > > > > > > On Mon, Apr 30, 2012 at 5:59 PM, Jay Kreps <jay.kr...@gmail.com> > wrote: > > > > > >> Yes, it is also worth noting that there are couple of different ways > > >> to think about latency: > > >> 1. latency of the request from the producer's point-of-view > > >> 2. end-to-end latency to the consumer > > >> > > >> As Jun mentions (1) may go up a little because the producer was > > >> sending data without checking for any answer from the server. Although > > >> this gives a nice buffering effect it leads to a number of corner > > >> cases that are hard to deal with correctly. It should be the case that > > >> setting the consumer to async has the same effect from the producer > > >> point of view without the corner cases of having no RPC response to > > >> convey errors and other broker misbehavior. > > >> > > >> (2) May actually get significantly better, especially for lower volume > > >> topics. The reason for this is because currently we wait until data is > > >> flushed to disk before giving it to the consumer, this flush policy is > > >> controlled by setting a number of messages or timeout at which the > > >> flush is forced. The reason to configure this is because on > > >> traditional disks each disk is likely to incur at least one seek. In > > >> the new model replication can take the place of waiting on a disk > > >> flush to provide durability (even if the log of the local server loses > > >> unflushed data as long as all servers don't crash at the same time no > > >> messages will be lost). Doing 2 parallel replication round-trips > > >> (perhaps surprisingly) looks like it may be a lot lower-latency than > > >> doing a local disk flush (< 1ms versus >= 10ms). In our own usage > > >> desire for this kind of low-latency consumption is not common, but I > > >> understand that this is a common need for messaging. > > >> > > >> -Jay > > >> > > >> On Thu, Apr 26, 2012 at 2:03 PM, Felix GV <fe...@mate1inc.com> wrote: > > >> > Thanks Jun :) > > >> > > > >> > -- > > >> > Felix > > >> > > > >> > > > >> > > > >> > On Thu, Apr 26, 2012 at 3:26 PM, Jun Rao <jun...@gmail.com> wrote: > > >> > > > >> >> Some comments inlined below. > > >> >> > > >> >> Thanks, > > >> >> > > >> >> Jun > > >> >> > > >> >> On Thu, Apr 26, 2012 at 10:27 AM, Felix GV <fe...@mate1inc.com> > > wrote: > > >> >> > > >> >> > Cool :) Thanks for those insights :) ! > > >> >> > > > >> >> > I changed the subject of the thread, in order not to derail the > > >> original > > >> >> > thread's subject...! I just want to recap to make sure I (and > > others) > > >> >> > understand all of this correctly :) > > >> >> > > > >> >> > So, if I understand correctly, with acks == [0,1] Kafka should > > >> provide a > > >> >> > latency that is similar to what we have now, but with the > > possibility > > >> of > > >> >> > losing a small window of unreplicated events in the case of an > > >> >> > unrecoverable hardware failure, and with acks > 1 (or acks == -1) > > >> there > > >> >> > will probably be a latency penalty but we will be completely > > protected > > >> >> from > > >> >> > (non-correlated) hardware failures, right? > > >> >> > > > >> >> > This is mostly true. The difference is that in 0.7, producer > > doesn't > > >> wait > > >> >> for a TCP response from broker. In 0.8, the producer always waits > > for a > > >> >> response from broker. How quickly the broker sends the response > > depends > > >> on > > >> >> acks. If acks is less than ideal, you may get the response faster, > > but > > >> have > > >> >> some risk of losing the data if there is broker failure. > > >> >> > > >> >> > > >> >> > Also, I guess the above assumptions are correct for a batch size > > of 1, > > >> >> and > > >> >> > that bigger batch sizes could also lead to small windows of > > unwritten > > >> >> data > > >> >> > in cases of failures, just like now...? Although, now that I > think > > of > > >> >> it, I > > >> >> > guess the vulnerability of bigger batch sizes would, again, only > > come > > >> >> into > > >> >> > play in scenarios of unrecoverable correlated failures, since > even > > if > > >> a > > >> >> > machine fails with some partially committed batch, there would be > > >> other > > >> >> > machines who received the same data (through replication) and > would > > >> have > > >> >> > enough time to commit those batches... > > >> >> > > > >> >> > I want to add that if the producer itself dies, it could lose a > > batch > > >> of > > >> >> events. > > >> >> > > >> >> > > >> >> > Finally, I guess that replication (whatever the ack parameter is) > > will > > >> >> > affect the overall throughput capacity of the Kafka cluster, > since > > >> every > > >> >> > node will now be writing its own data as well as the replicated > > data > > >> from > > >> >> > +/- 2 other nodes, right? > > >> >> > > > >> >> > -- > > >> >> > Felix > > >> >> > > > >> >> > > > >> >> > > > >> >> > On Wed, Apr 25, 2012 at 6:32 PM, Jay Kreps <jay.kr...@gmail.com> > > >> wrote: > > >> >> > > > >> >> > > Short answer is yes, both async (acks=0 or 1) and sync > > replication > > >> >> > > (acks > 1) will be both be supported. > > >> >> > > > > >> >> > > -Jay > > >> >> > > > > >> >> > > On Wed, Apr 25, 2012 at 11:22 AM, Jun Rao <jun...@gmail.com> > > wrote: > > >> >> > > > Felix, > > >> >> > > > > > >> >> > > > Initially, we thought we could keep the option of not sending > > acks > > >> >> from > > >> >> > > the > > >> >> > > > broker to the producer. However, this seems hard since in the > > new > > >> >> wire > > >> >> > > > protocol, we need to send at least the error code to the > > producer > > >> >> > (e.g., > > >> >> > > a > > >> >> > > > request is sent to the wrong broker or wrong partition). > > >> >> > > > > > >> >> > > > So, what we allow in the current design is the following. The > > >> >> producer > > >> >> > > can > > >> >> > > > specify the # of acks in the request. By default (acks = -1), > > the > > >> >> > broker > > >> >> > > > will wait for the message to be written to all replicas that > > are > > >> >> still > > >> >> > > > synced up with the leader before acking the producer. > Otherwise > > >> (acks > > >> >> > > >=0), > > >> >> > > > the broker will ack the producer after the message is written > > to > > >> acks > > >> >> > > > replicas. Currently, acks=0 is treated the same as acks=1. > > >> >> > > > > > >> >> > > > Thanks, > > >> >> > > > > > >> >> > > > Jun > > >> >> > > > > > >> >> > > > On Wed, Apr 25, 2012 at 10:39 AM, Felix GV < > fe...@mate1inc.com > > > > > >> >> wrote: > > >> >> > > > > > >> >> > > >> Just curious, but if I remember correctly from the time I > read > > >> >> > KAFKA-50 > > >> >> > > and > > >> >> > > >> the related JIRA issues, you guys plan to implement sync AND > > >> async > > >> >> > > >> replication, right? > > >> >> > > >> > > >> >> > > >> -- > > >> >> > > >> Felix > > >> >> > > >> > > >> >> > > >> > > >> >> > > >> > > >> >> > > >> On Tue, Apr 24, 2012 at 4:42 PM, Jay Kreps < > > jay.kr...@gmail.com> > > >> >> > wrote: > > >> >> > > >> > > >> >> > > >> > Right now we do sloppy failover. That is when a broker > goes > > >> down > > >> >> > > >> > traffic is redirected to the remaining machines, but any > > >> >> unconsumed > > >> >> > > >> > messages are stuck on that server until it comes back, if > > it is > > >> >> > > >> > permanently gone the messages are lost. This is acceptable > > for > > >> us > > >> >> in > > >> >> > > >> > the near-term since our pipeline is pretty real-time so > this > > >> >> window > > >> >> > > >> > between production and consumption is pretty small. The > > >> complete > > >> >> > > >> > solution is the intra-cluster replication in KAFA-50 which > > is > > >> >> coming > > >> >> > > >> > along fairly nicely now that we are working on it. > > >> >> > > >> > > > >> >> > > >> > -Jay > > >> >> > > >> > > > >> >> > > >> > On Tue, Apr 24, 2012 at 12:21 PM, Oliver Krohne > > >> >> > > >> > <oliver.kro...@googlemail.com> wrote: > > >> >> > > >> > > Hi, > > >> >> > > >> > > > > >> >> > > >> > > indeed I thought could be used as failover approach. > > >> >> > > >> > > > > >> >> > > >> > > We use raid for local redundancy but it does not protect > > us > > >> in > > >> >> > case > > >> >> > > of > > >> >> > > >> a > > >> >> > > >> > machine failure, so I am looking for a way to achieve a > > >> >> master/slave > > >> >> > > >> setup > > >> >> > > >> > until KAFKA-50 has been implemented. > > >> >> > > >> > > > > >> >> > > >> > > I think we can solve it for now by having multiple > broker > > so > > >> >> that > > >> >> > > the > > >> >> > > >> > application can continue sending messages if one broker > goes > > >> down. > > >> >> > My > > >> >> > > >> main > > >> >> > > >> > concern is to not introduce a new single point of failure > > which > > >> >> can > > >> >> > > stop > > >> >> > > >> > the application. However as some consumer are not > developed > > by > > >> us > > >> >> > and > > >> >> > > it > > >> >> > > >> is > > >> >> > > >> > not clear how they store the offset in zookeeper we need > to > > >> find > > >> >> out > > >> >> > > how > > >> >> > > >> we > > >> >> > > >> > can manage the consumer in case a broker will never return > > >> after a > > >> >> > > >> failure. > > >> >> > > >> > It will be acceptable to lose a couple of messages if a > > broker > > >> >> dies > > >> >> > > and > > >> >> > > >> the > > >> >> > > >> > consumers have not consumed all messages at the point of > > >> failure. > > >> >> > > >> > > > > >> >> > > >> > > Thanks, > > >> >> > > >> > > Oliver > > >> >> > > >> > > > > >> >> > > >> > > > > >> >> > > >> > > > > >> >> > > >> > > > > >> >> > > >> > > Am 23.04.2012 um 19:58 schrieb Jay Kreps: > > >> >> > > >> > > > > >> >> > > >> > >> I think the confusion comes from the fact that we are > > using > > >> >> > > mirroring > > >> >> > > >> to > > >> >> > > >> > >> handle geographic distribution not failover. If I > > understand > > >> >> > > correctly > > >> >> > > >> > what > > >> >> > > >> > >> Oliver is asking for is something to give fault > tolerance > > >> not > > >> >> > > >> something > > >> >> > > >> > for > > >> >> > > >> > >> distribution. I don't think that is really what the > > >> mirroring > > >> >> > does > > >> >> > > out > > >> >> > > >> > of > > >> >> > > >> > >> the box, though technically i suppose you could just > > reset > > >> the > > >> >> > > offsets > > >> >> > > >> > and > > >> >> > > >> > >> point the consumer at the new cluster and have it start > > from > > >> >> > "now". > > >> >> > > >> > >> > > >> >> > > >> > >> I think it would be helpful to document our use case in > > the > > >> >> > > mirroring > > >> >> > > >> > docs > > >> >> > > >> > >> since this is not the first time someone has asked > about > > >> this. > > >> >> > > >> > >> > > >> >> > > >> > >> -Jay > > >> >> > > >> > >> > > >> >> > > >> > >> On Mon, Apr 23, 2012 at 10:38 AM, Joel Koshy < > > >> >> > jjkosh...@gmail.com> > > >> >> > > >> > wrote: > > >> >> > > >> > >> > > >> >> > > >> > >>> Hi Oliver, > > >> >> > > >> > >>> > > >> >> > > >> > >>> I was reading the mirroring guide and I wonder if it > is > > >> >> required > > >> >> > > that > > >> >> > > >> > the > > >> >> > > >> > >>>> mirror runs it's own zookeeper? > > >> >> > > >> > >>>> > > >> >> > > >> > >>>> We have a zookeeper cluster running which is used by > > >> >> different > > >> >> > > >> > >>>> applications, so can we use that zookeeper cluster > for > > the > > >> >> > kafka > > >> >> > > >> > source > > >> >> > > >> > >>> and > > >> >> > > >> > >>>> kafka mirror? > > >> >> > > >> > >>>> > > >> >> > > >> > >>> > > >> >> > > >> > >>> You could have a single zookeeper cluster and use > > different > > >> >> > > >> namespaces > > >> >> > > >> > for > > >> >> > > >> > >>> the source/target mirror. However, I don't think it is > > >> >> > > recommended to > > >> >> > > >> > use a > > >> >> > > >> > >>> remote zookeeper (if you have a cross-DC set up) since > > that > > >> >> > would > > >> >> > > >> > >>> potentially mean very high ZK latencies on one of your > > >> >> clusters. > > >> >> > > >> > >>> > > >> >> > > >> > >>> > > >> >> > > >> > >>>> What is the procedure if the kafka source server > fails > > to > > >> >> > switch > > >> >> > > the > > >> >> > > >> > >>>> applications to use the mirrored instance? > > >> >> > > >> > >>>> > > >> >> > > >> > >>> > > >> >> > > >> > >>> I don't quite follow this question - can you clarify? > > The > > >> >> mirror > > >> >> > > >> > cluster is > > >> >> > > >> > >>> pretty much a separate instance. There is no built-in > > >> >> automatic > > >> >> > > >> > fail-over > > >> >> > > >> > >>> if your source cluster goes down. > > >> >> > > >> > >>> > > >> >> > > >> > >>> > > >> >> > > >> > >>>> Are there any backup best practices if we would not > use > > >> >> > > mirroring? > > >> >> > > >> > >>>> > > >> >> > > >> > >>> > > >> >> > > >> > >>> You can use RAID arrays for (local) data redundancy. > You > > >> may > > >> >> > also > > >> >> > > be > > >> >> > > >> > >>> interested in the (intra-DC) replication feature > > (KAFKA-50) > > >> >> that > > >> >> > > is > > >> >> > > >> > >>> currently being developed. I believe some folks on > this > > >> list > > >> >> > have > > >> >> > > >> also > > >> >> > > >> > used > > >> >> > > >> > >>> plain rsync's as an alternative to mirroring. > > >> >> > > >> > >>> > > >> >> > > >> > >>> Thanks, > > >> >> > > >> > >>> > > >> >> > > >> > >>> Joel > > >> >> > > >> > >>> > > >> >> > > >> > > > > >> >> > > >> > > > >> >> > > >> > > >> >> > > > > >> >> > > > >> >> > > >> > > >