Is there a convenient way to fetch the last message posted on a particular
topic across all partitions?

Not really, unless the message itself has some sort of a timestamp. Even
then, the order that the broker applies to the log is only guaranteed per
partition per client. So it is tricky to know the last written message to a
topic. You can try to find the last message per partition (using the
getOffsetsBefore API).

Thanks,
Neha


On Mon, Jun 9, 2014 at 8:55 AM, Robert Hodges <berkeleybob2...@gmail.com>
wrote:

> Hi Gouzhang,
>
> Thanks for the response.  Answers interpolated below.
>
> Cheers, Robert
>
> On Mon, Jun 9, 2014 at 8:15 AM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Robert,
> >
> > Thanks for the description. Just want to clarify on some of the points
> > (assuming one transaction may include multiple messages below):
> >
> > 2) For the "one-to-one mapping" to work, does the consumer can only read
> at
> > transaction boundaries, i.e., all or none messages are returned to the
> > consumer of a single transaction at once; or it is sufficient to let
> > consumers just read committed messages? For the use case you described it
> > seems the second option is good enough.
> >
>
> Consumers just read committed messages from Kafka itself.  Application
> transactions could be layered on top using the message key, since such
> transactions might consist of multiple Kafka messages.  It's up to the
> consumer to avoid committing a partial transaction.
>
> >
> > 4) If an upstream data source / producer has failed and lost some
> committed
> > transactions, and then on restart regenerates them, since the transaction
> > has been previously committed the downstream consumer may have already
> > consumed their messages, and regenerating the transaction will inevitably
> > result in duplicates. Is that OK for your case?
> >
> > Assuming it is possible to regenerate upstream transactions, downstream
> consumers should do one of two things:
>
> a.) For non-idempotent consumers:  Remember the last committed application
> transaction and ignore anything before that point.
> b.) For idempotent consumers:  Just repeat them.
>
> The uglier problem is what to do when the logs diverge because the upstream
> server cannot regenerate data.  In this case you start by hoping the
> consumer is something like Hadoop that easily tolerates inconsistencies in
> data. Things may go downhill quickly if the consumer is an RDBMS. :(
>
> Is there a convenient way to fetch the last message posted on a particular
> topic across all partitions?  (My laptop currently is about 120 miles away
> so it's hard to look.) If so, it looks to me as if there is enough in the
> Kafka producer and consumer APIs to implement what I am describing without
> too many holes. I believe the trick is to design a message key that
> contains a monotonically increasing transaction ID with a fragment index to
> allow transactions to span Kafka messages but keep all of them (for
> example) in a single partition.
>
> If I have time next weekend I might try to create an example of this to see
> what problems pop up.
>
> Cheers, Robert
>
>
> > Thanks,
> > Guozhang
> >
> >
> > On Sat, Jun 7, 2014 at 11:30 PM, Robert Hodges <
> berkeleybob2...@gmail.com>
> > wrote:
> >
> > > Hi Jonathan and Jun,
> > >
> > > Transactional replication using Kafka between stores at either end is
> an
> > > interesting topic. I have some experience with this problem in database
> > > replication products.
> > >
> > > To understand how to implement it properly in Kafka it would help to
> > define
> > > Jonathan's use case more formally.  As I read the description there are
> > > three parts: a source DBMS, Kafka, and an analytics store.  These can
> be
> > > arranged as follows:
> > >
> > > Producer Store -> Kafka -> Consumer Store
> > >
> > > e.g.:
> > >
> > > MySQL -> Kafka -> Spark over HDFS
> > >
> > > This is like the usual producer/consumer model except that the
> semantics
> > > are as follows.  I added some details to the description to
> accommodate a
> > > number of practical problems that occur in replication topologies of
> this
> > > kind.
> > >
> > > 1.) The producer and consumer in the topology are stores with state and
> > > some notion of a transaction that changes the state of the store to
> which
> > > they are applied.  Kafka is in the middle and also has transactions,
> > namely
> > > to produce and consume messages.
> > >
> > > 2.) If a transaction executes on the producer store, you would like to
> > > execute a corresponding transaction on the consumer store.  The
> > transaction
> > > might not have the same effect downstream but the point is that
> > > transactions are linked one-to-one between producer and consumer.
> > >
> > > 3.) All of the stores or Kafka can fail independently and at any time.
> >  It
> > > must be possible to recover and continue once a failed component
> > restarts.
> > >
> > > 4.) It is possible to have failures where a store or Kafka itself loses
> > > committed state and reverts to an earlier state.  This happens in MySQL
> > for
> > > example, when a host crashes before data are properly committed to
> InnoDB
> > > and/or the MySQL binlog. It can also happen if the upstream DBMS is
> > > restored from a backup or as a result of cluster failover with data
> loss.
> > >  In this case you either want to regenerate lost transactions or (if it
> > is
> > > hopeless) fail cleanly.
> > >
> > > 5.) Producer transactions might be larger than a single Kafka message
> > (e.g.
> > > a KeyedMessage). They may not even fit into a single JVM's memory.
>  This
> > > can occur for example if you do a bulk load or an administrative
> > operation
> > > on a large table in the producer store.  You might not have this
> problem
> > > now but given your requirement to work with a range of stores it seems
> > > likely to occur sooner rather than later. Such transactions must be
> > broken
> > > into a stream of smaller messages with a protocol to identify that they
> > > belong to a single transaction. If there are failures such fragmented
> > > transactions must not result in partial transactions being applied to
> the
> > > consumer.
> > >
> > > 6.) All of the preceding requirements should be met with minimal impact
> > on
> > > message throughput or transaction rates within stores at either end.
> > >
> > > Let me know if this is more than what you (Jonathan) intended.  Usually
> > if
> > > you really want #2, requirements #3-6 follow automatically.  #5 is
> > > potentially a source of much pain if not addressed early on.
> > >
> > > Pending a response, I would just say solutions that require a
> > transactional
> > > commit across two stores are difficult to write, often have performance
> > > downsides, and handle failures poorly because they cannot cover all the
> > > corner cases.  The last point means they tend to drop data, generate
> > > unmatched transactions (orphans), or send things multiple times
> depending
> > > on the failure.
> > >
> > > It's generally better to design systems that use a sliding window
> > protocol
> > > where a commit in the producer triggers a commit to Kafka triggers a
> > commit
> > > to the consumer. Assuming your requirements are as stated above the
> > > question is how to design a transactional sliding window protocol that
> > > works on Kafka.
> > >
> > > Cheers, Robert Hodges
> > >
> > >
> > > On Thu, Jun 5, 2014 at 7:48 AM, Jun Rao <jun...@gmail.com> wrote:
> > >
> > > > It sounds like that you want to write to a data store and a data pipe
> > > > atomically. Since both the data store and the data pipe that you want
> > to
> > > > use are highly available, the only case that you want to protect is
> the
> > > > client failing btw the two writes. One way to do that is to let the
> > > client
> > > > publish to Kafka first with the strongest ack. Then, run a few
> > consumers
> > > to
> > > > read data from Kafka and then write the data to the data store. Any
> one
> > > of
> > > > those consumers can die and the work will be automatically picked up
> by
> > > the
> > > > remaining ones. You can use partition id and the offset of each
> message
> > > as
> > > > its UUID if needed.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Wed, Jun 4, 2014 at 10:56 AM, Jonathan Hodges <hodg...@gmail.com>
> > > > wrote:
> > > >
> > > > > Sorry didn't realize the mailing list wasn't copied...
> > > > >
> > > > >
> > > > > ---------- Forwarded message ----------
> > > > > From: Jonathan Hodges <hodg...@gmail.com>
> > > > > Date: Wed, Jun 4, 2014 at 10:56 AM
> > > > > Subject: Re: Hadoop Summit Meetups
> > > > > To: Neha Narkhede <neha.narkh...@gmail.com>
> > > > >
> > > > >
> > > > > We have a number of customer facing online learning applications.
> > >  These
> > > > > applications are using heterogeneous technologies with different
> data
> > > > > models in underlying data stores such as RDBMS, Cassandra, MongoDB,
> > > etc.
> > > > >  We would like to run offline analysis on the data contained in
> these
> > > > > learning applications with tools like Hadoop and Spark.
> > > > >
> > > > > One thought is to use Kafka as a way for these learning
> applications
> > to
> > > > > emit data in near real-time for analytics.  We developed a common
> > model
> > > > > represented as Avro records in HDFS that spans these learning
> > > > applications
> > > > > so that we can accept the same structured message from them.  This
> > > allows
> > > > > for comparing apples to apples across these apps as opposed to
> messy
> > > > > transformations.
> > > > >
> > > > > So this all sounds good until you dig into the details.  One
> pattern
> > is
> > > > for
> > > > > these applications to update state locally in their data stores
> first
> > > and
> > > > > then publish to Kafka.  The problem with this is these two
> operations
> > > > > aren't atomic so the local persist can succeed and the publish to
> > Kafka
> > > > > fail leaving the application and HDFS out of sync.  You can try to
> > add
> > > > some
> > > > > retry logic to the clients, but this quickly becomes very
> complicated
> > > and
> > > > > still doesn't solve the underlying problem.
> > > > >
> > > > > Another pattern is to publish to Kafka first with -1 and wait for
> the
> > > ack
> > > > > from leader and replicas before persisting locally.  This is
> probably
> > > > > better than the other pattern but does add some complexity to the
> > > client.
> > > > >  The clients must now generate unique entity IDs/UUID for
> persistence
> > > > when
> > > > > they typically rely on the data store for creating these.  Also the
> > > > publish
> > > > > to Kafka can succeed and persist locally can fail leaving the
> stores
> > > out
> > > > of
> > > > > sync.  In this case the learning application needs to determine how
> > to
> > > > get
> > > > > itself in sync.  It can rely on getting this back from Kafka, but
> it
> > is
> > > > > possible the local store failure can't be fixed in a timely manner
> > e.g.
> > > > > hardware failure, constraint, etc.  In this case the application
> > needs
> > > to
> > > > > show an error to the user and likely need to do something like
> send a
> > > > > delete message to Kafka to remove the earlier published message.
> > > > >
> > > > > A third last resort pattern might be go the CDC route with
> something
> > > like
> > > > > Databus.  This would require implementing additional fetchers and
> > > relays
> > > > to
> > > > > support Cassandra and MongoDB.  Also the data will need to be
> > > transformed
> > > > > on the Hadoop/Spark side for virtually every learning application
> > since
> > > > > they have different data models.
> > > > >
> > > > > I hope this gives enough detail to start discussing transactional
> > > > messaging
> > > > > in Kafka.  We are willing to help in this effort if it makes sense
> > for
> > > > our
> > > > > use cases.
> > > > >
> > > > > Thanks
> > > > > Jonathan
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Jun 4, 2014 at 9:44 AM, Neha Narkhede <
> > neha.narkh...@gmail.com
> > > >
> > > > > wrote:
> > > > >
> > > > > > If you are comfortable, share it on the mailing list. If not, I'm
> > > happy
> > > > > to
> > > > > > have this discussion privately.
> > > > > >
> > > > > > Thanks,
> > > > > > Neha
> > > > > > On Jun 4, 2014 9:42 AM, "Neha Narkhede" <neha.narkh...@gmail.com
> >
> > > > wrote:
> > > > > >
> > > > > >> Glad it was useful. It will be great if you can share your
> > > > requirements
> > > > > >> on atomicity. A couple of us are very interested in thinking
> about
> > > > > >> transactional messaging in Kafka.
> > > > > >>
> > > > > >> Thanks,
> > > > > >> Neha
> > > > > >> On Jun 4, 2014 6:57 AM, "Jonathan Hodges" <hodg...@gmail.com>
> > > wrote:
> > > > > >>
> > > > > >>> Hi Neha,
> > > > > >>>
> > > > > >>> Thanks so much to you and the Kafka team for putting together
> the
> > > > > meetup.
> > > > > >>>  It was very nice and gave people from out of town like us the
> > > > ability
> > > > > to
> > > > > >>> join in person.
> > > > > >>>
> > > > > >>> We are the guys from Pearson Education and we talked a little
> > about
> > > > > >>> supplying some details on some of our use cases with respect to
> > > > > atomicity
> > > > > >>> of source systems eventing data and persisting locally.  Should
> > we
> > > > just
> > > > > >>> post to the list or is there somewhere else we should send
> these
> > > > > details?
> > > > > >>>
> > > > > >>> Thanks again!
> > > > > >>> Jonathan
> > > > > >>>
> > > > > >>>
> > > > > >>>
> > > > > >>> On Fri, Apr 11, 2014 at 9:31 AM, Neha Narkhede <
> > > > > neha.narkh...@gmail.com>
> > > > > >>> wrote:
> > > > > >>>
> > > > > >>> > Yes, that's a great idea. I can help organize the meetup at
> > > > LinkedIn.
> > > > > >>> >
> > > > > >>> > Thanks,
> > > > > >>> > Neha
> > > > > >>> >
> > > > > >>> >
> > > > > >>> > On Fri, Apr 11, 2014 at 8:44 AM, Saurabh Agarwal (BLOOMBERG/
> > 731
> > > > > >>> LEXIN) <
> > > > > >>> > sagarwal...@bloomberg.net> wrote:
> > > > > >>> >
> > > > > >>> > > great idea. I am interested in attending as well....
> > > > > >>> > >
> > > > > >>> > > ----- Original Message -----
> > > > > >>> > > From: users@kafka.apache.org
> > > > > >>> > > To: users@kafka.apache.org
> > > > > >>> > > At: Apr 11 2014 11:40:56
> > > > > >>> > >
> > > > > >>> > > With the Hadoop Summit in San Jose 6/3 - 6/5 I wondered if
> > any
> > > of
> > > > > the
> > > > > >>> > > LinkedIn geniuses were thinking of putting together a
> meet-up
> > > on
> > > > > any
> > > > > >>> of
> > > > > >>> > the
> > > > > >>> > > associated technologies like Kafka, Samza, Databus, etc.
>  For
> > > us
> > > > > poor
> > > > > >>> > souls
> > > > > >>> > > that don't live on the West Coast it was a great experience
> > > > > >>> attending the
> > > > > >>> > > Kafka meetup last year.
> > > > > >>> > >
> > > > > >>> > > Jonathan
> > > > > >>> > >
> > > > > >>> > >
> > > > > >>> > >
> > > > > >>> > >
> > > > > >>> > >
> > > > > >>> >
> > > > > >>>
> > > > >
> > > >
> > >
> >
> -------------------------------------------------------------------------------
> > > > > >>> > >
> > > > > >>> >
> > > > > >>>
> > > > > >>
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Reply via email to