It is what we are working on right now, it will be in the next release. -Jay
On Fri, May 4, 2012 at 3:32 PM, Ashutosh Singh <ashutoshvsi...@gmail.com> wrote: > Is this replication an existing functionality or the new stuff that is > planned to come? > > On Tue, May 1, 2012 at 11:47 AM, Felix GV <fe...@mate1inc.com> wrote: > >> Ah, gotcha, so my usage of the term "in-memory replication" can be >> misleading: Kafka still doesn't retain the data in-app (i.e.: in Kafka's >> allocated memory), but the data is in-memory nonetheless because of the OS' >> file system cache. >> >> Basically, on the individual node's level, this is not different from what >> we already have (without KAFKA-50), but the fact that KAFKA-50 will give us >> replication means that the data will reside in the OS' file system cache of >> many nodes, giving us much more reliable durability guarantees. >> >> Thanks for the nitty gritty details Jay :) >> >> -- >> Felix >> >> >> >> On Tue, May 1, 2012 at 1:51 PM, Jay Kreps <jay.kr...@gmail.com> wrote: >> >> > Yes, that is correct. Technically we always immediately write to the >> > filesystem, it is just a question of when you fsync the file (that is >> > the slow thing). So though it is in memory it is not in application >> > memory, so it always survives kill -9 but not unplugging the machine. >> > Currently when a broker fails messages that are flushed to disk come >> > back if the broker comes back with an intact filesystem (if the broker >> > fs is destroyed then it is lost). With replication we retain this same >> > flexibility on the flush policy, so you can flush every message to >> > disk immediately if you like, however having the message on multiple >> > machines is in some ways better durability then the fsync gives, as >> > the message will survive destruction of the filesystem, so we think >> > you can legitimately allow consumers to consume messages independent >> > of the flush policy. >> > >> > Also when a broker fails it will lose unflushed messages, however when >> > it comes back to life it will restore these messages from the other >> > replicas before it will serve data to consumers. So the log will be >> > byte-for-byte identical across all servers including both the contents >> > and the ordering of messages. >> > >> > -Jay >> > >> > On Tue, May 1, 2012 at 9:24 AM, Felix GV <fe...@mate1inc.com> wrote: >> > > Hmm... interesting! >> > > >> > > So, if I understanding correctly, what you're saying regarding point 2, >> > is >> > > that the messages are going to be kept in memory on several nodes, and >> > > start being served to consumers as soon as this is completed, rather >> than >> > > after the data is flushed to disk? This way, we still benefit from the >> > > throughput gain of flushing data to disk in batches, but we consider >> that >> > > the added durability of having in-memory replication is good enough to >> > > start serving that data to consumers sooner. >> > > >> > > Furthermore, this means that in the unlikely event that several nodes >> > would >> > > fail simultaneously (a correlated failure), the data that is replicated >> > to >> > > the failed nodes but not yet flushed on any of them would be lost. >> > However, >> > > when a single node crashes and is then restarted, only the failed node >> > will >> > > have lost its unflushed data, while the other nodes that had replicated >> > > that data will have had the opportunity to flush it to disk later on. >> > > >> > > Sorry if I'm repeating like a parrot. I just want to make sure I >> > understand >> > > correctly :) >> > > >> > > Please correct me if I'm not interpreting this correctly! >> > > >> > > -- >> > > Felix >> > > >> > > >> > > >> > > On Mon, Apr 30, 2012 at 5:59 PM, Jay Kreps <jay.kr...@gmail.com> >> wrote: >> > > >> > >> Yes, it is also worth noting that there are couple of different ways >> > >> to think about latency: >> > >> 1. latency of the request from the producer's point-of-view >> > >> 2. end-to-end latency to the consumer >> > >> >> > >> As Jun mentions (1) may go up a little because the producer was >> > >> sending data without checking for any answer from the server. Although >> > >> this gives a nice buffering effect it leads to a number of corner >> > >> cases that are hard to deal with correctly. It should be the case that >> > >> setting the consumer to async has the same effect from the producer >> > >> point of view without the corner cases of having no RPC response to >> > >> convey errors and other broker misbehavior. >> > >> >> > >> (2) May actually get significantly better, especially for lower volume >> > >> topics. The reason for this is because currently we wait until data is >> > >> flushed to disk before giving it to the consumer, this flush policy is >> > >> controlled by setting a number of messages or timeout at which the >> > >> flush is forced. The reason to configure this is because on >> > >> traditional disks each disk is likely to incur at least one seek. In >> > >> the new model replication can take the place of waiting on a disk >> > >> flush to provide durability (even if the log of the local server loses >> > >> unflushed data as long as all servers don't crash at the same time no >> > >> messages will be lost). Doing 2 parallel replication round-trips >> > >> (perhaps surprisingly) looks like it may be a lot lower-latency than >> > >> doing a local disk flush (< 1ms versus >= 10ms). In our own usage >> > >> desire for this kind of low-latency consumption is not common, but I >> > >> understand that this is a common need for messaging. >> > >> >> > >> -Jay >> > >> >> > >> On Thu, Apr 26, 2012 at 2:03 PM, Felix GV <fe...@mate1inc.com> wrote: >> > >> > Thanks Jun :) >> > >> > >> > >> > -- >> > >> > Felix >> > >> > >> > >> > >> > >> > >> > >> > On Thu, Apr 26, 2012 at 3:26 PM, Jun Rao <jun...@gmail.com> wrote: >> > >> > >> > >> >> Some comments inlined below. >> > >> >> >> > >> >> Thanks, >> > >> >> >> > >> >> Jun >> > >> >> >> > >> >> On Thu, Apr 26, 2012 at 10:27 AM, Felix GV <fe...@mate1inc.com> >> > wrote: >> > >> >> >> > >> >> > Cool :) Thanks for those insights :) ! >> > >> >> > >> > >> >> > I changed the subject of the thread, in order not to derail the >> > >> original >> > >> >> > thread's subject...! I just want to recap to make sure I (and >> > others) >> > >> >> > understand all of this correctly :) >> > >> >> > >> > >> >> > So, if I understand correctly, with acks == [0,1] Kafka should >> > >> provide a >> > >> >> > latency that is similar to what we have now, but with the >> > possibility >> > >> of >> > >> >> > losing a small window of unreplicated events in the case of an >> > >> >> > unrecoverable hardware failure, and with acks > 1 (or acks == -1) >> > >> there >> > >> >> > will probably be a latency penalty but we will be completely >> > protected >> > >> >> from >> > >> >> > (non-correlated) hardware failures, right? >> > >> >> > >> > >> >> > This is mostly true. The difference is that in 0.7, producer >> > doesn't >> > >> wait >> > >> >> for a TCP response from broker. In 0.8, the producer always waits >> > for a >> > >> >> response from broker. How quickly the broker sends the response >> > depends >> > >> on >> > >> >> acks. If acks is less than ideal, you may get the response faster, >> > but >> > >> have >> > >> >> some risk of losing the data if there is broker failure. >> > >> >> >> > >> >> >> > >> >> > Also, I guess the above assumptions are correct for a batch size >> > of 1, >> > >> >> and >> > >> >> > that bigger batch sizes could also lead to small windows of >> > unwritten >> > >> >> data >> > >> >> > in cases of failures, just like now...? Although, now that I >> think >> > of >> > >> >> it, I >> > >> >> > guess the vulnerability of bigger batch sizes would, again, only >> > come >> > >> >> into >> > >> >> > play in scenarios of unrecoverable correlated failures, since >> even >> > if >> > >> a >> > >> >> > machine fails with some partially committed batch, there would be >> > >> other >> > >> >> > machines who received the same data (through replication) and >> would >> > >> have >> > >> >> > enough time to commit those batches... >> > >> >> > >> > >> >> > I want to add that if the producer itself dies, it could lose a >> > batch >> > >> of >> > >> >> events. >> > >> >> >> > >> >> >> > >> >> > Finally, I guess that replication (whatever the ack parameter is) >> > will >> > >> >> > affect the overall throughput capacity of the Kafka cluster, >> since >> > >> every >> > >> >> > node will now be writing its own data as well as the replicated >> > data >> > >> from >> > >> >> > +/- 2 other nodes, right? >> > >> >> > >> > >> >> > -- >> > >> >> > Felix >> > >> >> > >> > >> >> > >> > >> >> > >> > >> >> > On Wed, Apr 25, 2012 at 6:32 PM, Jay Kreps <jay.kr...@gmail.com> >> > >> wrote: >> > >> >> > >> > >> >> > > Short answer is yes, both async (acks=0 or 1) and sync >> > replication >> > >> >> > > (acks > 1) will be both be supported. >> > >> >> > > >> > >> >> > > -Jay >> > >> >> > > >> > >> >> > > On Wed, Apr 25, 2012 at 11:22 AM, Jun Rao <jun...@gmail.com> >> > wrote: >> > >> >> > > > Felix, >> > >> >> > > > >> > >> >> > > > Initially, we thought we could keep the option of not sending >> > acks >> > >> >> from >> > >> >> > > the >> > >> >> > > > broker to the producer. However, this seems hard since in the >> > new >> > >> >> wire >> > >> >> > > > protocol, we need to send at least the error code to the >> > producer >> > >> >> > (e.g., >> > >> >> > > a >> > >> >> > > > request is sent to the wrong broker or wrong partition). >> > >> >> > > > >> > >> >> > > > So, what we allow in the current design is the following. The >> > >> >> producer >> > >> >> > > can >> > >> >> > > > specify the # of acks in the request. By default (acks = -1), >> > the >> > >> >> > broker >> > >> >> > > > will wait for the message to be written to all replicas that >> > are >> > >> >> still >> > >> >> > > > synced up with the leader before acking the producer. >> Otherwise >> > >> (acks >> > >> >> > > >=0), >> > >> >> > > > the broker will ack the producer after the message is written >> > to >> > >> acks >> > >> >> > > > replicas. Currently, acks=0 is treated the same as acks=1. >> > >> >> > > > >> > >> >> > > > Thanks, >> > >> >> > > > >> > >> >> > > > Jun >> > >> >> > > > >> > >> >> > > > On Wed, Apr 25, 2012 at 10:39 AM, Felix GV < >> fe...@mate1inc.com >> > > >> > >> >> wrote: >> > >> >> > > > >> > >> >> > > >> Just curious, but if I remember correctly from the time I >> read >> > >> >> > KAFKA-50 >> > >> >> > > and >> > >> >> > > >> the related JIRA issues, you guys plan to implement sync AND >> > >> async >> > >> >> > > >> replication, right? >> > >> >> > > >> >> > >> >> > > >> -- >> > >> >> > > >> Felix >> > >> >> > > >> >> > >> >> > > >> >> > >> >> > > >> >> > >> >> > > >> On Tue, Apr 24, 2012 at 4:42 PM, Jay Kreps < >> > jay.kr...@gmail.com> >> > >> >> > wrote: >> > >> >> > > >> >> > >> >> > > >> > Right now we do sloppy failover. That is when a broker >> goes >> > >> down >> > >> >> > > >> > traffic is redirected to the remaining machines, but any >> > >> >> unconsumed >> > >> >> > > >> > messages are stuck on that server until it comes back, if >> > it is >> > >> >> > > >> > permanently gone the messages are lost. This is acceptable >> > for >> > >> us >> > >> >> in >> > >> >> > > >> > the near-term since our pipeline is pretty real-time so >> this >> > >> >> window >> > >> >> > > >> > between production and consumption is pretty small. The >> > >> complete >> > >> >> > > >> > solution is the intra-cluster replication in KAFA-50 which >> > is >> > >> >> coming >> > >> >> > > >> > along fairly nicely now that we are working on it. >> > >> >> > > >> > >> > >> >> > > >> > -Jay >> > >> >> > > >> > >> > >> >> > > >> > On Tue, Apr 24, 2012 at 12:21 PM, Oliver Krohne >> > >> >> > > >> > <oliver.kro...@googlemail.com> wrote: >> > >> >> > > >> > > Hi, >> > >> >> > > >> > > >> > >> >> > > >> > > indeed I thought could be used as failover approach. >> > >> >> > > >> > > >> > >> >> > > >> > > We use raid for local redundancy but it does not protect >> > us >> > >> in >> > >> >> > case >> > >> >> > > of >> > >> >> > > >> a >> > >> >> > > >> > machine failure, so I am looking for a way to achieve a >> > >> >> master/slave >> > >> >> > > >> setup >> > >> >> > > >> > until KAFKA-50 has been implemented. >> > >> >> > > >> > > >> > >> >> > > >> > > I think we can solve it for now by having multiple >> broker >> > so >> > >> >> that >> > >> >> > > the >> > >> >> > > >> > application can continue sending messages if one broker >> goes >> > >> down. >> > >> >> > My >> > >> >> > > >> main >> > >> >> > > >> > concern is to not introduce a new single point of failure >> > which >> > >> >> can >> > >> >> > > stop >> > >> >> > > >> > the application. However as some consumer are not >> developed >> > by >> > >> us >> > >> >> > and >> > >> >> > > it >> > >> >> > > >> is >> > >> >> > > >> > not clear how they store the offset in zookeeper we need >> to >> > >> find >> > >> >> out >> > >> >> > > how >> > >> >> > > >> we >> > >> >> > > >> > can manage the consumer in case a broker will never return >> > >> after a >> > >> >> > > >> failure. >> > >> >> > > >> > It will be acceptable to lose a couple of messages if a >> > broker >> > >> >> dies >> > >> >> > > and >> > >> >> > > >> the >> > >> >> > > >> > consumers have not consumed all messages at the point of >> > >> failure. >> > >> >> > > >> > > >> > >> >> > > >> > > Thanks, >> > >> >> > > >> > > Oliver >> > >> >> > > >> > > >> > >> >> > > >> > > >> > >> >> > > >> > > >> > >> >> > > >> > > >> > >> >> > > >> > > Am 23.04.2012 um 19:58 schrieb Jay Kreps: >> > >> >> > > >> > > >> > >> >> > > >> > >> I think the confusion comes from the fact that we are >> > using >> > >> >> > > mirroring >> > >> >> > > >> to >> > >> >> > > >> > >> handle geographic distribution not failover. If I >> > understand >> > >> >> > > correctly >> > >> >> > > >> > what >> > >> >> > > >> > >> Oliver is asking for is something to give fault >> tolerance >> > >> not >> > >> >> > > >> something >> > >> >> > > >> > for >> > >> >> > > >> > >> distribution. I don't think that is really what the >> > >> mirroring >> > >> >> > does >> > >> >> > > out >> > >> >> > > >> > of >> > >> >> > > >> > >> the box, though technically i suppose you could just >> > reset >> > >> the >> > >> >> > > offsets >> > >> >> > > >> > and >> > >> >> > > >> > >> point the consumer at the new cluster and have it start >> > from >> > >> >> > "now". >> > >> >> > > >> > >> >> > >> >> > > >> > >> I think it would be helpful to document our use case in >> > the >> > >> >> > > mirroring >> > >> >> > > >> > docs >> > >> >> > > >> > >> since this is not the first time someone has asked >> about >> > >> this. >> > >> >> > > >> > >> >> > >> >> > > >> > >> -Jay >> > >> >> > > >> > >> >> > >> >> > > >> > >> On Mon, Apr 23, 2012 at 10:38 AM, Joel Koshy < >> > >> >> > jjkosh...@gmail.com> >> > >> >> > > >> > wrote: >> > >> >> > > >> > >> >> > >> >> > > >> > >>> Hi Oliver, >> > >> >> > > >> > >>> >> > >> >> > > >> > >>> I was reading the mirroring guide and I wonder if it >> is >> > >> >> required >> > >> >> > > that >> > >> >> > > >> > the >> > >> >> > > >> > >>>> mirror runs it's own zookeeper? >> > >> >> > > >> > >>>> >> > >> >> > > >> > >>>> We have a zookeeper cluster running which is used by >> > >> >> different >> > >> >> > > >> > >>>> applications, so can we use that zookeeper cluster >> for >> > the >> > >> >> > kafka >> > >> >> > > >> > source >> > >> >> > > >> > >>> and >> > >> >> > > >> > >>>> kafka mirror? >> > >> >> > > >> > >>>> >> > >> >> > > >> > >>> >> > >> >> > > >> > >>> You could have a single zookeeper cluster and use >> > different >> > >> >> > > >> namespaces >> > >> >> > > >> > for >> > >> >> > > >> > >>> the source/target mirror. However, I don't think it is >> > >> >> > > recommended to >> > >> >> > > >> > use a >> > >> >> > > >> > >>> remote zookeeper (if you have a cross-DC set up) since >> > that >> > >> >> > would >> > >> >> > > >> > >>> potentially mean very high ZK latencies on one of your >> > >> >> clusters. >> > >> >> > > >> > >>> >> > >> >> > > >> > >>> >> > >> >> > > >> > >>>> What is the procedure if the kafka source server >> fails >> > to >> > >> >> > switch >> > >> >> > > the >> > >> >> > > >> > >>>> applications to use the mirrored instance? >> > >> >> > > >> > >>>> >> > >> >> > > >> > >>> >> > >> >> > > >> > >>> I don't quite follow this question - can you clarify? >> > The >> > >> >> mirror >> > >> >> > > >> > cluster is >> > >> >> > > >> > >>> pretty much a separate instance. There is no built-in >> > >> >> automatic >> > >> >> > > >> > fail-over >> > >> >> > > >> > >>> if your source cluster goes down. >> > >> >> > > >> > >>> >> > >> >> > > >> > >>> >> > >> >> > > >> > >>>> Are there any backup best practices if we would not >> use >> > >> >> > > mirroring? >> > >> >> > > >> > >>>> >> > >> >> > > >> > >>> >> > >> >> > > >> > >>> You can use RAID arrays for (local) data redundancy. >> You >> > >> may >> > >> >> > also >> > >> >> > > be >> > >> >> > > >> > >>> interested in the (intra-DC) replication feature >> > (KAFKA-50) >> > >> >> that >> > >> >> > > is >> > >> >> > > >> > >>> currently being developed. I believe some folks on >> this >> > >> list >> > >> >> > have >> > >> >> > > >> also >> > >> >> > > >> > used >> > >> >> > > >> > >>> plain rsync's as an alternative to mirroring. >> > >> >> > > >> > >>> >> > >> >> > > >> > >>> Thanks, >> > >> >> > > >> > >>> >> > >> >> > > >> > >>> Joel >> > >> >> > > >> > >>> >> > >> >> > > >> > > >> > >> >> > > >> > >> > >> >> > > >> >> > >> >> > > >> > >> >> > >> > >> >> >> > >> >> > >>