It is what we are working on right now, it will be in the next release.

-Jay

On Fri, May 4, 2012 at 3:32 PM, Ashutosh Singh <ashutoshvsi...@gmail.com> wrote:
> Is this replication an existing functionality or the new stuff that is
> planned to come?
>
> On Tue, May 1, 2012 at 11:47 AM, Felix GV <fe...@mate1inc.com> wrote:
>
>> Ah, gotcha, so my usage of the term "in-memory replication" can be
>> misleading: Kafka still doesn't retain the data in-app (i.e.: in Kafka's
>> allocated memory), but the data is in-memory nonetheless because of the OS'
>> file system cache.
>>
>> Basically, on the individual node's level, this is not different from what
>> we already have (without KAFKA-50), but the fact that KAFKA-50 will give us
>> replication means that the data will reside in the OS' file system cache of
>> many nodes, giving us much more reliable durability guarantees.
>>
>> Thanks for the nitty gritty details Jay :)
>>
>> --
>> Felix
>>
>>
>>
>> On Tue, May 1, 2012 at 1:51 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
>>
>> > Yes, that is correct. Technically we always immediately write to the
>> > filesystem, it is just a question of when you fsync the file (that is
>> > the slow thing). So though it is in memory it is not in application
>> > memory, so it always survives kill -9 but not unplugging the machine.
>> > Currently when a broker fails messages that are flushed to disk come
>> > back if the broker comes back with an intact filesystem (if the broker
>> > fs is destroyed then it is lost). With replication we retain this same
>> > flexibility on the flush policy, so you can flush every message to
>> > disk immediately if you like, however having the message on multiple
>> > machines is in some ways better durability then the fsync gives, as
>> > the message will survive destruction of the filesystem, so we think
>> > you can legitimately allow consumers to consume messages independent
>> > of the flush policy.
>> >
>> > Also when a broker fails it will lose unflushed messages, however when
>> > it comes back to life it will restore these messages from the other
>> > replicas before it will serve data to consumers. So the log will be
>> > byte-for-byte identical across all servers including both the contents
>> > and the ordering of messages.
>> >
>> > -Jay
>> >
>> > On Tue, May 1, 2012 at 9:24 AM, Felix GV <fe...@mate1inc.com> wrote:
>> > > Hmm... interesting!
>> > >
>> > > So, if I understanding correctly, what you're saying regarding point 2,
>> > is
>> > > that the messages are going to be kept in memory on several nodes, and
>> > > start being served to consumers as soon as this is completed, rather
>> than
>> > > after the data is flushed to disk? This way, we still benefit from the
>> > > throughput gain of flushing data to disk in batches, but we consider
>> that
>> > > the added durability of having in-memory replication is good enough to
>> > > start serving that data to consumers sooner.
>> > >
>> > > Furthermore, this means that in the unlikely event that several nodes
>> > would
>> > > fail simultaneously (a correlated failure), the data that is replicated
>> > to
>> > > the failed nodes but not yet flushed on any of them would be lost.
>> > However,
>> > > when a single node crashes and is then restarted, only the failed node
>> > will
>> > > have lost its unflushed data, while the other nodes that had replicated
>> > > that data will have had the opportunity to flush it to disk later on.
>> > >
>> > > Sorry if I'm repeating like a parrot. I just want to make sure I
>> > understand
>> > > correctly :)
>> > >
>> > > Please correct me if I'm not interpreting this correctly!
>> > >
>> > > --
>> > > Felix
>> > >
>> > >
>> > >
>> > > On Mon, Apr 30, 2012 at 5:59 PM, Jay Kreps <jay.kr...@gmail.com>
>> wrote:
>> > >
>> > >> Yes, it is also worth noting that there are couple of different ways
>> > >> to think about latency:
>> > >> 1. latency of the request from the producer's point-of-view
>> > >> 2. end-to-end latency to the consumer
>> > >>
>> > >> As Jun mentions (1) may go up a little because the producer was
>> > >> sending data without checking for any answer from the server. Although
>> > >> this gives a nice buffering effect it leads to a number of corner
>> > >> cases that are hard to deal with correctly. It should be the case that
>> > >> setting the consumer to async has the same effect from the producer
>> > >> point of view without the corner cases of having no RPC response to
>> > >> convey errors and other broker misbehavior.
>> > >>
>> > >> (2) May actually get significantly better, especially for lower volume
>> > >> topics. The reason for this is because currently we wait until data is
>> > >> flushed to disk before giving it to the consumer, this flush policy is
>> > >> controlled by setting a number of messages or timeout at which the
>> > >> flush is forced. The reason to configure this is because on
>> > >> traditional disks each disk is likely to incur at least one seek. In
>> > >> the new model replication can take the place of waiting on a disk
>> > >> flush to provide durability (even if the log of the local server loses
>> > >> unflushed data as long as all servers don't crash at the same time no
>> > >> messages will be lost). Doing 2 parallel replication round-trips
>> > >> (perhaps surprisingly) looks like it may be a lot lower-latency than
>> > >> doing a local disk flush (< 1ms versus >= 10ms). In our own usage
>> > >> desire for this kind of low-latency consumption is not common, but I
>> > >> understand that this is a common need for messaging.
>> > >>
>> > >> -Jay
>> > >>
>> > >> On Thu, Apr 26, 2012 at 2:03 PM, Felix GV <fe...@mate1inc.com> wrote:
>> > >> > Thanks Jun :)
>> > >> >
>> > >> > --
>> > >> > Felix
>> > >> >
>> > >> >
>> > >> >
>> > >> > On Thu, Apr 26, 2012 at 3:26 PM, Jun Rao <jun...@gmail.com> wrote:
>> > >> >
>> > >> >> Some comments inlined below.
>> > >> >>
>> > >> >> Thanks,
>> > >> >>
>> > >> >> Jun
>> > >> >>
>> > >> >> On Thu, Apr 26, 2012 at 10:27 AM, Felix GV <fe...@mate1inc.com>
>> > wrote:
>> > >> >>
>> > >> >> > Cool :) Thanks for those insights :) !
>> > >> >> >
>> > >> >> > I changed the subject of the thread, in order not to derail the
>> > >> original
>> > >> >> > thread's subject...! I just want to recap to make sure I (and
>> > others)
>> > >> >> > understand all of this correctly :)
>> > >> >> >
>> > >> >> > So, if I understand correctly, with acks == [0,1] Kafka should
>> > >> provide a
>> > >> >> > latency that is similar to what we have now, but with the
>> > possibility
>> > >> of
>> > >> >> > losing a small window of unreplicated events in the case of an
>> > >> >> > unrecoverable hardware failure, and with acks > 1 (or acks == -1)
>> > >> there
>> > >> >> > will probably be a latency penalty but we will be completely
>> > protected
>> > >> >> from
>> > >> >> > (non-correlated) hardware failures, right?
>> > >> >> >
>> > >> >> > This is mostly true. The difference is that in 0.7, producer
>> > doesn't
>> > >> wait
>> > >> >> for a TCP response from broker. In 0.8, the producer always waits
>> > for a
>> > >> >> response from broker. How quickly the broker sends the response
>> > depends
>> > >> on
>> > >> >> acks. If acks is less than ideal, you may get the response faster,
>> > but
>> > >> have
>> > >> >> some risk of losing the data if there is broker failure.
>> > >> >>
>> > >> >>
>> > >> >> > Also, I guess the above assumptions are correct for a batch size
>> > of 1,
>> > >> >> and
>> > >> >> > that bigger batch sizes could also lead to small windows of
>> > unwritten
>> > >> >> data
>> > >> >> > in cases of failures, just like now...? Although, now that I
>> think
>> > of
>> > >> >> it, I
>> > >> >> > guess the vulnerability of bigger batch sizes would, again, only
>> > come
>> > >> >> into
>> > >> >> > play in scenarios of unrecoverable correlated failures, since
>> even
>> > if
>> > >> a
>> > >> >> > machine fails with some partially committed batch, there would be
>> > >> other
>> > >> >> > machines who received the same data (through replication) and
>> would
>> > >> have
>> > >> >> > enough time to commit those batches...
>> > >> >> >
>> > >> >> > I want to add that if the producer itself dies, it could lose a
>> > batch
>> > >> of
>> > >> >> events.
>> > >> >>
>> > >> >>
>> > >> >> > Finally, I guess that replication (whatever the ack parameter is)
>> > will
>> > >> >> > affect the overall throughput capacity of the Kafka cluster,
>> since
>> > >> every
>> > >> >> > node will now be writing its own data as well as the replicated
>> > data
>> > >> from
>> > >> >> > +/- 2 other nodes, right?
>> > >> >> >
>> > >> >> > --
>> > >> >> > Felix
>> > >> >> >
>> > >> >> >
>> > >> >> >
>> > >> >> > On Wed, Apr 25, 2012 at 6:32 PM, Jay Kreps <jay.kr...@gmail.com>
>> > >> wrote:
>> > >> >> >
>> > >> >> > > Short answer is yes, both async (acks=0 or 1) and sync
>> > replication
>> > >> >> > > (acks > 1) will be both be supported.
>> > >> >> > >
>> > >> >> > > -Jay
>> > >> >> > >
>> > >> >> > > On Wed, Apr 25, 2012 at 11:22 AM, Jun Rao <jun...@gmail.com>
>> > wrote:
>> > >> >> > > > Felix,
>> > >> >> > > >
>> > >> >> > > > Initially, we thought we could keep the option of not sending
>> > acks
>> > >> >> from
>> > >> >> > > the
>> > >> >> > > > broker to the producer. However, this seems hard since in the
>> > new
>> > >> >> wire
>> > >> >> > > > protocol, we need to send at least the error code to the
>> > producer
>> > >> >> > (e.g.,
>> > >> >> > > a
>> > >> >> > > > request is sent to the wrong broker or wrong partition).
>> > >> >> > > >
>> > >> >> > > > So, what we allow in the current design is the following. The
>> > >> >> producer
>> > >> >> > > can
>> > >> >> > > > specify the # of acks in the request. By default (acks = -1),
>> > the
>> > >> >> > broker
>> > >> >> > > > will wait for the message to be written to all replicas that
>> > are
>> > >> >> still
>> > >> >> > > > synced up with the leader before acking the producer.
>> Otherwise
>> > >> (acks
>> > >> >> > > >=0),
>> > >> >> > > > the broker will ack the producer after the message is written
>> > to
>> > >> acks
>> > >> >> > > > replicas. Currently, acks=0 is treated the same as acks=1.
>> > >> >> > > >
>> > >> >> > > > Thanks,
>> > >> >> > > >
>> > >> >> > > > Jun
>> > >> >> > > >
>> > >> >> > > > On Wed, Apr 25, 2012 at 10:39 AM, Felix GV <
>> fe...@mate1inc.com
>> > >
>> > >> >> wrote:
>> > >> >> > > >
>> > >> >> > > >> Just curious, but if I remember correctly from the time I
>> read
>> > >> >> > KAFKA-50
>> > >> >> > > and
>> > >> >> > > >> the related JIRA issues, you guys plan to implement sync AND
>> > >> async
>> > >> >> > > >> replication, right?
>> > >> >> > > >>
>> > >> >> > > >> --
>> > >> >> > > >> Felix
>> > >> >> > > >>
>> > >> >> > > >>
>> > >> >> > > >>
>> > >> >> > > >> On Tue, Apr 24, 2012 at 4:42 PM, Jay Kreps <
>> > jay.kr...@gmail.com>
>> > >> >> > wrote:
>> > >> >> > > >>
>> > >> >> > > >> > Right now we do sloppy failover. That is when a broker
>> goes
>> > >> down
>> > >> >> > > >> > traffic is redirected to the remaining machines, but any
>> > >> >> unconsumed
>> > >> >> > > >> > messages are stuck on that server until it comes back, if
>> > it is
>> > >> >> > > >> > permanently gone the messages are lost. This is acceptable
>> > for
>> > >> us
>> > >> >> in
>> > >> >> > > >> > the near-term since our pipeline is pretty real-time so
>> this
>> > >> >> window
>> > >> >> > > >> > between production and consumption is pretty small. The
>> > >> complete
>> > >> >> > > >> > solution is the intra-cluster replication in KAFA-50 which
>> > is
>> > >> >> coming
>> > >> >> > > >> > along fairly nicely now that we are working on it.
>> > >> >> > > >> >
>> > >> >> > > >> > -Jay
>> > >> >> > > >> >
>> > >> >> > > >> > On Tue, Apr 24, 2012 at 12:21 PM, Oliver Krohne
>> > >> >> > > >> > <oliver.kro...@googlemail.com> wrote:
>> > >> >> > > >> > > Hi,
>> > >> >> > > >> > >
>> > >> >> > > >> > > indeed I thought could be used as failover approach.
>> > >> >> > > >> > >
>> > >> >> > > >> > > We use raid for local redundancy but it does not protect
>> > us
>> > >> in
>> > >> >> > case
>> > >> >> > > of
>> > >> >> > > >> a
>> > >> >> > > >> > machine failure, so I am looking for a way to achieve a
>> > >> >> master/slave
>> > >> >> > > >> setup
>> > >> >> > > >> > until KAFKA-50 has been implemented.
>> > >> >> > > >> > >
>> > >> >> > > >> > > I think we can solve it for now by having multiple
>> broker
>> > so
>> > >> >> that
>> > >> >> > > the
>> > >> >> > > >> > application can continue sending messages if one broker
>> goes
>> > >> down.
>> > >> >> > My
>> > >> >> > > >> main
>> > >> >> > > >> > concern is to not introduce a new single point of failure
>> > which
>> > >> >> can
>> > >> >> > > stop
>> > >> >> > > >> > the application. However as some consumer are not
>> developed
>> > by
>> > >> us
>> > >> >> > and
>> > >> >> > > it
>> > >> >> > > >> is
>> > >> >> > > >> > not clear how they store the offset in zookeeper we need
>> to
>> > >> find
>> > >> >> out
>> > >> >> > > how
>> > >> >> > > >> we
>> > >> >> > > >> > can manage the consumer in case a broker will never return
>> > >> after a
>> > >> >> > > >> failure.
>> > >> >> > > >> > It will be acceptable to lose a couple of messages if a
>> > broker
>> > >> >> dies
>> > >> >> > > and
>> > >> >> > > >> the
>> > >> >> > > >> > consumers have not consumed all messages at the point of
>> > >> failure.
>> > >> >> > > >> > >
>> > >> >> > > >> > > Thanks,
>> > >> >> > > >> > > Oliver
>> > >> >> > > >> > >
>> > >> >> > > >> > >
>> > >> >> > > >> > >
>> > >> >> > > >> > >
>> > >> >> > > >> > > Am 23.04.2012 um 19:58 schrieb Jay Kreps:
>> > >> >> > > >> > >
>> > >> >> > > >> > >> I think the confusion comes from the fact that we are
>> > using
>> > >> >> > > mirroring
>> > >> >> > > >> to
>> > >> >> > > >> > >> handle geographic distribution not failover. If I
>> > understand
>> > >> >> > > correctly
>> > >> >> > > >> > what
>> > >> >> > > >> > >> Oliver is asking for is something to give fault
>> tolerance
>> > >> not
>> > >> >> > > >> something
>> > >> >> > > >> > for
>> > >> >> > > >> > >> distribution. I don't think that is really what the
>> > >> mirroring
>> > >> >> > does
>> > >> >> > > out
>> > >> >> > > >> > of
>> > >> >> > > >> > >> the box, though technically i suppose you could just
>> > reset
>> > >> the
>> > >> >> > > offsets
>> > >> >> > > >> > and
>> > >> >> > > >> > >> point the consumer at the new cluster and have it start
>> > from
>> > >> >> > "now".
>> > >> >> > > >> > >>
>> > >> >> > > >> > >> I think it would be helpful to document our use case in
>> > the
>> > >> >> > > mirroring
>> > >> >> > > >> > docs
>> > >> >> > > >> > >> since this is not the first time someone has asked
>> about
>> > >> this.
>> > >> >> > > >> > >>
>> > >> >> > > >> > >> -Jay
>> > >> >> > > >> > >>
>> > >> >> > > >> > >> On Mon, Apr 23, 2012 at 10:38 AM, Joel Koshy <
>> > >> >> > jjkosh...@gmail.com>
>> > >> >> > > >> > wrote:
>> > >> >> > > >> > >>
>> > >> >> > > >> > >>> Hi Oliver,
>> > >> >> > > >> > >>>
>> > >> >> > > >> > >>> I was reading the mirroring guide and I wonder if it
>> is
>> > >> >> required
>> > >> >> > > that
>> > >> >> > > >> > the
>> > >> >> > > >> > >>>> mirror runs it's own zookeeper?
>> > >> >> > > >> > >>>>
>> > >> >> > > >> > >>>> We have a zookeeper cluster running which is used by
>> > >> >> different
>> > >> >> > > >> > >>>> applications, so can we use that zookeeper cluster
>> for
>> > the
>> > >> >> > kafka
>> > >> >> > > >> > source
>> > >> >> > > >> > >>> and
>> > >> >> > > >> > >>>> kafka mirror?
>> > >> >> > > >> > >>>>
>> > >> >> > > >> > >>>
>> > >> >> > > >> > >>> You could have a single zookeeper cluster and use
>> > different
>> > >> >> > > >> namespaces
>> > >> >> > > >> > for
>> > >> >> > > >> > >>> the source/target mirror. However, I don't think it is
>> > >> >> > > recommended to
>> > >> >> > > >> > use a
>> > >> >> > > >> > >>> remote zookeeper (if you have a cross-DC set up) since
>> > that
>> > >> >> > would
>> > >> >> > > >> > >>> potentially mean very high ZK latencies on one of your
>> > >> >> clusters.
>> > >> >> > > >> > >>>
>> > >> >> > > >> > >>>
>> > >> >> > > >> > >>>> What is the procedure if the kafka source server
>> fails
>> > to
>> > >> >> > switch
>> > >> >> > > the
>> > >> >> > > >> > >>>> applications to use the mirrored instance?
>> > >> >> > > >> > >>>>
>> > >> >> > > >> > >>>
>> > >> >> > > >> > >>> I don't quite follow this question - can you clarify?
>> > The
>> > >> >> mirror
>> > >> >> > > >> > cluster is
>> > >> >> > > >> > >>> pretty much a separate instance. There is no built-in
>> > >> >> automatic
>> > >> >> > > >> > fail-over
>> > >> >> > > >> > >>> if your source cluster goes down.
>> > >> >> > > >> > >>>
>> > >> >> > > >> > >>>
>> > >> >> > > >> > >>>> Are there any backup best practices if we would not
>> use
>> > >> >> > > mirroring?
>> > >> >> > > >> > >>>>
>> > >> >> > > >> > >>>
>> > >> >> > > >> > >>> You can use RAID arrays for (local) data redundancy.
>> You
>> > >> may
>> > >> >> > also
>> > >> >> > > be
>> > >> >> > > >> > >>> interested in the (intra-DC) replication feature
>> > (KAFKA-50)
>> > >> >> that
>> > >> >> > > is
>> > >> >> > > >> > >>> currently being developed. I believe some folks on
>> this
>> > >> list
>> > >> >> > have
>> > >> >> > > >> also
>> > >> >> > > >> > used
>> > >> >> > > >> > >>> plain rsync's as an alternative to mirroring.
>> > >> >> > > >> > >>>
>> > >> >> > > >> > >>> Thanks,
>> > >> >> > > >> > >>>
>> > >> >> > > >> > >>> Joel
>> > >> >> > > >> > >>>
>> > >> >> > > >> > >
>> > >> >> > > >> >
>> > >> >> > > >>
>> > >> >> > >
>> > >> >> >
>> > >> >>
>> > >>
>> >
>>

Reply via email to