RE: Offset Management...

2020-05-12 Thread Rajib Deb
Thanks Bill, my apologies I did not elaborate my use case. 

In my use case, the data from Cassandra is pushed to Kafka and then we consume 
from Kafka to snowflake. Once we push the data to snowflake, we do not want to 
go back to the source(Cassandra) to pull the data again. There are occasions 
where we are asked to pull the data for a certain date and time. I thought 
storing the offset will help with that use case. The other item is our 
validation framework. We need to validate that I am processing all the rows 
that Cassandra is pushing to kafka. So the validation program needs to look at 
number of rows in Cassandra for a particular key and see if we have that many 
messages in Kafka and Snowflake for that key.


Thanks
Rajib

-Original Message-
From: Bill Bejeck  
Sent: Tuesday, May 12, 2020 7:41 AM
To: users@kafka.apache.org
Subject: Re: Offset Management...

[**EXTERNAL EMAIL**]

Hi Rajib,

Generally, it's best to let Kafka handle the offset management.
Under normal circumstances, when you restart a consumer, it will start reading 
records from the last committed offset, there's no need for you to manage that 
process yourself.
If you need manually commit records vs. using auto-commit, then you can use one 
of the commit API methods commitSync 
<https://apc01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fkafka.apache.org%2F25%2Fjavadoc%2Forg%2Fapache%2Fkafka%2Fclients%2Fconsumer%2FKafkaConsumer.html%23commitSync--data=02%7C01%7CRajib_Deb%40infosys.com%7C1cc4ca606f1040f79a5508d7f68662ca%7C63ce7d592f3e42cda8ccbe764cff5eb6%7C1%7C0%7C637248929371831115sdata=Qnz%2BuwyN477ga76v6s4vjWU%2BVE0m%2FRXYILWK5CBF0wo%3Dreserved=0>
 or commitAsync
<https://apc01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fkafka.apache.org%2F25%2Fjavadoc%2Forg%2Fapache%2Fkafka%2Fclients%2Fconsumer%2FKafkaConsumer.html%23commitAsync-org.apache.kafka.clients.consumer.OffsetCommitCallback-data=02%7C01%7CRajib_Deb%40infosys.com%7C1cc4ca606f1040f79a5508d7f68662ca%7C63ce7d592f3e42cda8ccbe764cff5eb6%7C1%7C0%7C637248929371831115sdata=EGFjUW59I%2BPtNXsf2Cm02PWcXMjL3oBi0nLhgVE4KAg%3Dreserved=0>
.

-Bill


On Mon, May 11, 2020 at 9:52 PM Rajib Deb  wrote:

> Hi, I wanted to know if it is a good practice to develop a custom 
> offset management method while consuming from Kafka. I am thinking to 
> develop it as below.
>
>
>   1.  Create a PartitionInfo named tuple as below
>
> PartitionInfo("PartitionInfo",["header","custom writer","offset"]
>
>   1.  Then populate the tuple with the header, writer and last offset 
> details
>   2.  Write the tuple in a file/database once the consumer commits the 
> message
>   3.  Next time when consumer starts, it checks the last offset and 
> reads from there
>
> Thanks
> Rajib
>
>


Re: Offset Management...

2020-05-12 Thread Bill Bejeck
Hi Rajib,

Generally, it's best to let Kafka handle the offset management.
Under normal circumstances, when you restart a consumer, it will start
reading records from the last committed offset, there's no need for you to
manage that process yourself.
If you need manually commit records vs. using auto-commit, then you can use
one of the commit API methods
commitSync
<https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync-->
 or commitAsync
<https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitAsync-org.apache.kafka.clients.consumer.OffsetCommitCallback->
.

-Bill


On Mon, May 11, 2020 at 9:52 PM Rajib Deb  wrote:

> Hi, I wanted to know if it is a good practice to develop a custom offset
> management method while consuming from Kafka. I am thinking to develop it
> as below.
>
>
>   1.  Create a PartitionInfo named tuple as below
>
> PartitionInfo("PartitionInfo",["header","custom writer","offset"]
>
>   1.  Then populate the tuple with the header, writer and last offset
> details
>   2.  Write the tuple in a file/database once the consumer commits the
> message
>   3.  Next time when consumer starts, it checks the last offset and reads
> from there
>
> Thanks
> Rajib
>
>


Offset Management...

2020-05-11 Thread Rajib Deb
Hi, I wanted to know if it is a good practice to develop a custom offset 
management method while consuming from Kafka. I am thinking to develop it as 
below.


  1.  Create a PartitionInfo named tuple as below

PartitionInfo("PartitionInfo",["header","custom writer","offset"]

  1.  Then populate the tuple with the header, writer and last offset details
  2.  Write the tuple in a file/database once the consumer commits the message
  3.  Next time when consumer starts, it checks the last offset and reads from 
there

Thanks
Rajib



Consumer offset management in latest Kafka release

2017-07-06 Thread Bishnu Agrawal
Hello Team,

I have few questions on consumer offset management done in latest release,
if you could spend few minutes and answer the below questions - (please
answer based on latest release)

1. Is there any dependency of zookeeper in storing consumer offset ?
2. Can you give some insight on the structure it uses to store these
offsets ?

Any link or pointer would really help me if you can provide.

Thanks,
Bishnu


Custom Offset Management

2016-09-09 Thread Daniel Fagnan
I’m currently wondering if it’s possible to use the internal 
`__consumer_offsets` topic to manage offsets outside the consumer group APIs. 
I’m using the low-level API to manage the consumers but I’d still like to store 
offsets in Kafka.

If it’s not possible to publish and fetch offsets from the internal topic, 
would a separate compacted log replicate most of the functionality?

Thanks,
Daniel


signature.asc
Description: Message signed with OpenPGP using GPGMail


RE: New Consumer APIs , Hight level API , Simple API regarding offset management

2016-05-11 Thread Natarajan, Rajeswari
We have a requirement that consumer must be able to re-read the messages.

In High level  consumer api,it looks like the re-start of consumer needed ,if 
offset has to be reset.

The new consumer API seems to be of beta quality. 

In stable version ,I guess the only option is to go for simple consumer api ,if 
we don't have to restart consumer. Please confirm. Also please shed some light 
on when the new consumer api will go GA.

Thanks in advance,
Rajeswari

-Original Message-
From: Natarajan, Rajeswari [mailto:rajeswari.natara...@sap.com] 
Sent: Tuesday, May 10, 2016 9:18 PM
To: users@kafka.apache.org
Subject: New Consumer APIs

Hi,



Would like to if New Consumer API is  GAed.





Regards,

Rajeswari



Disable offset management

2016-04-05 Thread Jakub Neubauer
Hi,
we are using Kafka server + new client 0.9.0.1.

In our application we process log events and aggregate the results in
RDBMS. In order to achieve data consistency, when we process a polled log
event list (a batch), we update DB in one transaction and we also save the
partition offsets in the RDBMS within the same transaction.
Then, on re-balance, we read the offsets from RDBMS.

So far so good. But we can see lot of auto-commit stuff in the log. So, we
don't need the Kafka offset management. Is there a clean way to disable it?
Is simple setting enable.auto.commit =false just enough? Isn't then still
something remaining under the hood?

I couldn't find some clean statement of how to setup kafka client when
managing offsets on the application level in the doc or Internet. I found
just this page:
https://cwiki.apache.org/confluence/display/KAFKA/Offset+Management where
the new offset management is discussed and the optionality is discussed
just as an "interesting" thing, so that is basically what makes me unsure.

Thank you
Jakub Neubauer


Re: Kafka 0.9 Offset Management

2016-02-04 Thread Adam Kunicki
Hi Joe,

I found:
http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0.9-consumer-client
<https://mailtrack.io/trace/link/8185c5c7540e2b5599f54f59b0479cca9f26aa81?url=http%3A%2F%2Fwww.confluent.io%2Fblog%2Ftutorial-getting-started-with-the-new-apache-kafka-0.9-consumer-client=65f848e48483f235>
to be pretty useful. Scroll down to the "Delivery Semantics" section for
more info about handling offsets.

-Adam

On Thu, Feb 4, 2016 at 9:55 PM, Joe San <codeintheo...@gmail.com> wrote:

> Could anyone point me to some code samples or some documentation on where I
> could find more information about Kafka's Offset management? Currently, I'm
> using the
>
>   props.put("enable.auto.commit", "true")
>   props.put("auto.commit.interval.ms", "1000")
>
> which I guess commits to the Zookeeper and I understand that this is not
> the way to go!
>
> Any clues?
>
> Thanks and Regards,
> Joe
>



-- 
Adam Kunicki
StreamSets | Field Engineer
mobile: 415.890.DATA (3282) | linkedin
<https://mailtrack.io/trace/link/9a752f11f1171714c952d6ca9a9810b83a6eef7e?url=http%3A%2F%2Fwww.adamkunicki.com=ec2eec76b957bfa3>


Kafka 0.9 Offset Management

2016-02-04 Thread Joe San
Could anyone point me to some code samples or some documentation on where I
could find more information about Kafka's Offset management? Currently, I'm
using the

  props.put("enable.auto.commit", "true")
  props.put("auto.commit.interval.ms", "1000")

which I guess commits to the Zookeeper and I understand that this is not
the way to go!

Any clues?

Thanks and Regards,
Joe


RE: Regarding The Kafka Offset Management Issue In Direct Stream Approach.

2015-11-25 Thread Dave Ariens
Charan,

You may find this Gist useful for storing/retrieving offsets for Kafka topics:

https://gist.github.com/ariens/e6a39bc3dbeb11467e53


From: Cody Koeninger [c...@koeninger.org]
Sent: Friday, November 06, 2015 10:10 AM
To: users@kafka.apache.org
Subject: Re: Regarding The Kafka Offset Management Issue In Direct Stream 
Approach.

Questions about Spark-kafka integration are better directed to the Spark
user mailing list.

I'm not 100% sure what you're asking.  The spark createDirectStream api
will not store any offsets internally, unless you enable checkpointing.



On Sun, Nov 1, 2015 at 10:26 PM, Charan Ganga Phani Adabala <
char...@eiqnetworks.com> wrote:

> Hi All,
>
> We are working in Apache spark with Kafka integration, in this use case we
> are using DirectStream approach. we want to avoid the data loss in this
> approach for actually we take offsets and saving that offset into MongoDB.
>
> We want some clarification is Spark stores any offsets internally, let us
> explain some example :
>
> For the first rdd batch we get 0 to 5 offsets of events to be processed,
> but unexpectedly the application is crashed, then we started aging the
> application, then this job fetches again from 0 to 5 events or where the
> event stopped in previous job.
>
> We are not committing any offsets in the above process, because we have to
> commit offsets manually in DirectStream approach. Is that new job fetches
> events form 0th position.
>
>
>
>
>
> Thanks & Regards,
>
> *Ganga Phani Charan Adabala | Software Engineer*
>
> o:  +91-40-23116680 | c:  +91-9491418099
>
> e:  char...@eiqnetworks.com
>
> [image: cid:image001.jpg@01CF60B1.87C0C870]
> *EiQ Networks®, Inc.* |  www.eiqnetworks.com
>
> *www.socvue.com <http://www.socvue.com/>* | www.eiqfederal.com
>
>
>
> [image: Blog] <http://blog.eiqnetworks.com/>Blog
> <http://blog.eiqnetworks.com/>   [image: Twitter]
> <https://twitter.com/eiqnetworks> Twitter
> <https://twitter.com/eiqnetworks>   [image: LinkedIn]
> <http://www.linkedin.com/company/eiqnetworks> LinkedIn
> <http://www.linkedin.com/company/eiqnetworks>   [image: Facebook]
> <http://www.facebook.com/eiqnetworks> Facebook
> <http://www.facebook.com/eiqnetworks>
>
>
>
> *"This email is intended only for the use of the individual or entity
> named above and may contain information that is confidential and
> privileged. If you are not the intended recipient, you are hereby notified
> that any dissemination, distribution or copying of the email is strictly
> prohibited. If you have received this email in error, please destroy
> the original message."*
>
>
>
>
>


Re: Regarding The Kafka Offset Management Issue In Direct Stream Approach.

2015-11-06 Thread Cody Koeninger
Questions about Spark-kafka integration are better directed to the Spark
user mailing list.

I'm not 100% sure what you're asking.  The spark createDirectStream api
will not store any offsets internally, unless you enable checkpointing.



On Sun, Nov 1, 2015 at 10:26 PM, Charan Ganga Phani Adabala <
char...@eiqnetworks.com> wrote:

> Hi All,
>
> We are working in Apache spark with Kafka integration, in this use case we
> are using DirectStream approach. we want to avoid the data loss in this
> approach for actually we take offsets and saving that offset into MongoDB.
>
> We want some clarification is Spark stores any offsets internally, let us
> explain some example :
>
> For the first rdd batch we get 0 to 5 offsets of events to be processed,
> but unexpectedly the application is crashed, then we started aging the
> application, then this job fetches again from 0 to 5 events or where the
> event stopped in previous job.
>
> We are not committing any offsets in the above process, because we have to
> commit offsets manually in DirectStream approach. Is that new job fetches
> events form 0th position.
>
>
>
>
>
> Thanks & Regards,
>
> *Ganga Phani Charan Adabala | Software Engineer*
>
> o:  +91-40-23116680 | c:  +91-9491418099
>
> e:  char...@eiqnetworks.com
>
> [image: cid:image001.jpg@01CF60B1.87C0C870]
> *EiQ Networks®, Inc.* |  www.eiqnetworks.com
>
> *www.socvue.com * | www.eiqfederal.com
>
>
>
> [image: Blog] Blog
>    [image: Twitter]
>  Twitter
>    [image: LinkedIn]
>  LinkedIn
>    [image: Facebook]
>  Facebook
> 
>
>
>
> *"This email is intended only for the use of the individual or entity
> named above and may contain information that is confidential and
> privileged. If you are not the intended recipient, you are hereby notified
> that any dissemination, distribution or copying of the email is strictly
> prohibited. If you have received this email in error, please destroy
> the original message."*
>
>
>
>
>


Regarding The Kafka Offset Management Issue In Direct Stream Approach.

2015-11-01 Thread Charan Ganga Phani Adabala
Hi All,
We are working in Apache spark with Kafka integration, in this use case we are 
using DirectStream approach. we want to avoid the data loss in this approach 
for actually we take offsets and saving that offset into MongoDB.
We want some clarification is Spark stores any offsets internally, let us 
explain some example :
For the first rdd batch we get 0 to 5 offsets of events to be processed, but 
unexpectedly the application is crashed, then we started aging the application, 
then this job fetches again from 0 to 5 events or where the event stopped in 
previous job.
We are not committing any offsets in the above process, because we have to 
commit offsets manually in DirectStream approach. Is that new job fetches 
events form 0th position.


Thanks & Regards,
Ganga Phani Charan Adabala | Software Engineer
o:  +91-40-23116680 | c:  +91-9491418099
e:  char...@eiqnetworks.com
[cid:image001.jpg@01CF60B1.87C0C870]
EiQ Networks(r), Inc. |  www.eiqnetworks.com
www.socvue.com | 
www.eiqfederal.com

[Blog]Blog   
[Twitter]   
Twitter   [LinkedIn] 
  
LinkedIn   [Facebook] 
  
Facebook

"This email is intended only for the use of the individual or entity named 
above and may contain information that is confidential and privileged. If you 
are not the intended recipient, you are hereby notified that any dissemination, 
distribution or copying of the email is strictly prohibited. If you have 
received this email in error, please destroy the original message."




Regarding the Kafka offset management issue in Direct Stream Approach.

2015-10-26 Thread Charan Ganga Phani Adabala
Hi All,

We are working in Apache spark with Kafka integration, in this use case we are 
using DirectStream approach. we want to avoid the data loss in this approach 
for actually we take offsets and saving that offset into MongoDB.
We want some clarification is Spark stores any offsets internally, let us 
explain some example :
For the first rdd batch we get 0 to 5 offsets of events to be processed, but 
unexpectedly the application is crashed, then we started aging the application, 
then this job fetches again from 0 to 5 events or where the event stopped in 
previous job.
We are not committing any offsets in the above process, because we have to 
commit offsets manually in DirectStream approach. Is that new job fetches 
events form 0th position.


Thanks & Regards,
Ganga Phani Charan Adabala | Software Engineer
o:  +91-40-23116680 | c:  +91-9491418099
e:  char...@eiqnetworks.com
[cid:image001.jpg@01CF60B1.87C0C870]
EiQ Networks(r), Inc. |  www.eiqnetworks.com
www.socvue.com | 
www.eiqfederal.com

[Blog]Blog   
[Twitter]   
Twitter   [LinkedIn] 
  
LinkedIn   [Facebook] 
  
Facebook

"This email is intended only for the use of the individual or entity named 
above and may contain information that is confidential and privileged. If you 
are not the intended recipient, you are hereby notified that any dissemination, 
distribution or copying of the email is strictly prohibited. If you have 
received this email in error, please destroy the original message."




Re: Regarding the Kafka offset management issue in Direct Stream Approach.

2015-10-26 Thread Cody Koeninger
Questions about spark's kafka integration should probably be directed to
the spark user mailing list, not this one.  I don't monitor kafka mailing
lists as closely, for instance.

For the direct stream, Spark doesn't keep any state regarding offsets,
unless you enable checkpointing.  Have you read
https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md





On Mon, Oct 26, 2015 at 3:43 AM, Charan Ganga Phani Adabala <
char...@eiqnetworks.com> wrote:

> Hi All,
>
>
>
> We are working in Apache spark with Kafka integration, in this use case we
> are using DirectStream approach. we want to avoid the data loss in this
> approach for actually we take offsets and saving that offset into MongoDB.
>
> We want some clarification is Spark stores any offsets internally, let us
> explain some *example* :
>
> For the first rdd batch *we get 0 to 5 offsets of events to be processed*,
> but *unexpectedly* the application is crashed, then we started aging the
> application, then this job *fetches again from 0 to 5 events or where the
> event stopped in previous job.*
>
> *We are not committing any offsets in the above process, because we have
> to commit offsets manually in DirectStream approach. Is that new job
> fetches events form 0th position.*
>
>
>
>
>
> Thanks & Regards,
>
> *Ganga Phani Charan Adabala | Software Engineer*
>
> o:  +91-40-23116680 | c:  +91-9491418099
>
> e:  char...@eiqnetworks.com
>
> [image: cid:image001.jpg@01CF60B1.87C0C870]
> *EiQ Networks®, Inc.* |  www.eiqnetworks.com
>
> *www.socvue.com * | www.eiqfederal.com
>
>
>
> [image: Blog] Blog
>    [image: Twitter]
>  Twitter
>    [image: LinkedIn]
>  LinkedIn
>    [image: Facebook]
>  Facebook
> 
>
>
>
> *"This email is intended only for the use of the individual or entity
> named above and may contain information that is confidential and
> privileged. If you are not the intended recipient, you are hereby notified
> that any dissemination, distribution or copying of the email is strictly
> prohibited. If you have received this email in error, please destroy
> the original message."*
>
>
>
>
>


Offset Management in Kafka

2015-10-15 Thread Kiran Singh
Hi

I am looking for offset management in Kafka.
Is there any link which can explain how offset are updated in Kafka once a
consumer read a message.


Re: Offset Management in Kafka

2015-10-15 Thread Mayuresh Gharat
This might help :
https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka

http://www.slideshare.net/jjkoshy/offset-management-in-kafka


Thanks,

Mayuresh

On Thu, Oct 15, 2015 at 5:39 AM, Kiran Singh <kim.sing...@gmail.com> wrote:

> Hi
>
> I am looking for offset management in Kafka.
> Is there any link which can explain how offset are updated in Kafka once a
> consumer read a message.
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Offset management: client vs broker side responsibility

2015-06-16 Thread Stevo Slavić
Found out that there is standard API for retrieving and committing offsets
(see
https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
)

Problem is that the server/broker side is not extensible (see
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L142
) - i.e. there is no API one can implement and deploy/configure together
with Kafka binary with support for handling unsupported or overriding
handling of already supported
offsetCommitRequest.versionId/offsetFetchRequest.versionId

It does not prevent one to implement custom offset management on client
side (instead of using standard API to commit and retrieve offsets, one can
directly talk with custom offset store) but then problem arises that no
commercial or FOSS kafka monitoring solution support it out of the box.

I know I would, but the question to Apache Kafka community is would you
like to have Kafka broker commit/fetch extensible, and then also what
committers think about this?

Kind regards,
Stevo Slavic.


On Tue, Jun 2, 2015 at 7:11 PM, Otis Gospodnetic otis.gospodne...@gmail.com
 wrote:

 Hi,

 I haven't followed the changes to offset tracking closely, other than that
 storing them in ZK is not the only option any more.
 I think what Stevo is asking about/suggesting is that there there be a
 single API from which offset information can be retrieved (e.g. by
 monitoring tools), so that monitoring tools work regardless of where one
 chose to store offsets.
 I know we'd love to have this for SPM's Kafka monitoring and can tell you
 that adding support for N different APIs for N different offset storage
 systems would be hard/time-consuming/expensive.
 But maybe this single API already exists?

 Thanks,
 Otis
 --
 Monitoring * Alerting * Anomaly Detection * Centralized Log Management
 Solr  Elasticsearch Support * http://sematext.com/


 On Mon, Jun 1, 2015 at 4:41 PM, Jason Rosenberg j...@squareup.com wrote:

  Stevo,
 
  Both of the main solutions used by the high-level consumer are
 standardized
  and supported directly by the kafka client libraries (e.g. maintaining
  offsets in zookeeper or in kafka itself).  And for the zk case, there is
  the consumer offset checker (which is good for monitoring).  Consumer
  offset checker still needs to be extended for offsets stored in kafka
  _consumer_offset topics though.
 
  Anyway, I'm not sure I understand your question, you want something for
  better monitoring of all possible clients (some of which might choose to
  manage offsets in their own way)?
 
  It's just not part of the kafka design to directly track individual
  consumers.
 
  Jason
 
  On Wed, May 27, 2015 at 7:42 AM, Shady Xu shad...@gmail.com wrote:
 
   I guess adding a new component will increase the complexity of the
 system
   structure. And if the new component consists of one or a few nodes, it
  may
   becomes the bottleneck of the whole system, if it consists of many
 nodes,
   it will make the system even more complex.
  
   Although every solution has its downsides, I think the current one is
   decent.
  
   2015-05-27 17:10 GMT+08:00 Stevo Slavić ssla...@gmail.com:
  
It could be a separate server component, does not have to be
monolith/coupled with broker.
Such solution would have benefits - single API, pluggable
   implementations.
   
On Wed, May 27, 2015 at 8:57 AM, Shady Xu shad...@gmail.com wrote:
   
 Storing and managing offsets by broker will leave high pressure on
  the
 brokers which will affect the performance of the cluster.

 You can use the advanced consumer APIs, then you can get the
 offsets
either
 from zookeeper or the __consumer_offsets__ topic. On the other
 hand,
  if
you
 use the simple consumer APIs, you mean to manage offsets yourself,
  then
you
 should monitor them yourself, simple and plain, right?

 2015-04-22 14:36 GMT+08:00 Stevo Slavić ssla...@gmail.com:

  Hello Apache Kafka community,
 
  Please correct me if wrong, AFAIK currently (Kafka 0.8.2.x)
 offset
  management responsibility is mainly client/consumer side
responsibility.
 
  Wouldn't it be better if it was broker side only responsibility?
 
  E.g. now if one wants to use custom offset management, any of the
   Kafka
  monitoring tools cannot see the offsets - they would need to use
  same
  custom client implementation which is practically not possible.
 
  Kind regards,
  Stevo Slavic.
 

   
  
 



Re: Offset management: client vs broker side responsibility

2015-06-02 Thread Otis Gospodnetic
Hi,

I haven't followed the changes to offset tracking closely, other than that
storing them in ZK is not the only option any more.
I think what Stevo is asking about/suggesting is that there there be a
single API from which offset information can be retrieved (e.g. by
monitoring tools), so that monitoring tools work regardless of where one
chose to store offsets.
I know we'd love to have this for SPM's Kafka monitoring and can tell you
that adding support for N different APIs for N different offset storage
systems would be hard/time-consuming/expensive.
But maybe this single API already exists?

Thanks,
Otis
--
Monitoring * Alerting * Anomaly Detection * Centralized Log Management
Solr  Elasticsearch Support * http://sematext.com/


On Mon, Jun 1, 2015 at 4:41 PM, Jason Rosenberg j...@squareup.com wrote:

 Stevo,

 Both of the main solutions used by the high-level consumer are standardized
 and supported directly by the kafka client libraries (e.g. maintaining
 offsets in zookeeper or in kafka itself).  And for the zk case, there is
 the consumer offset checker (which is good for monitoring).  Consumer
 offset checker still needs to be extended for offsets stored in kafka
 _consumer_offset topics though.

 Anyway, I'm not sure I understand your question, you want something for
 better monitoring of all possible clients (some of which might choose to
 manage offsets in their own way)?

 It's just not part of the kafka design to directly track individual
 consumers.

 Jason

 On Wed, May 27, 2015 at 7:42 AM, Shady Xu shad...@gmail.com wrote:

  I guess adding a new component will increase the complexity of the system
  structure. And if the new component consists of one or a few nodes, it
 may
  becomes the bottleneck of the whole system, if it consists of many nodes,
  it will make the system even more complex.
 
  Although every solution has its downsides, I think the current one is
  decent.
 
  2015-05-27 17:10 GMT+08:00 Stevo Slavić ssla...@gmail.com:
 
   It could be a separate server component, does not have to be
   monolith/coupled with broker.
   Such solution would have benefits - single API, pluggable
  implementations.
  
   On Wed, May 27, 2015 at 8:57 AM, Shady Xu shad...@gmail.com wrote:
  
Storing and managing offsets by broker will leave high pressure on
 the
brokers which will affect the performance of the cluster.
   
You can use the advanced consumer APIs, then you can get the offsets
   either
from zookeeper or the __consumer_offsets__ topic. On the other hand,
 if
   you
use the simple consumer APIs, you mean to manage offsets yourself,
 then
   you
should monitor them yourself, simple and plain, right?
   
2015-04-22 14:36 GMT+08:00 Stevo Slavić ssla...@gmail.com:
   
 Hello Apache Kafka community,

 Please correct me if wrong, AFAIK currently (Kafka 0.8.2.x) offset
 management responsibility is mainly client/consumer side
   responsibility.

 Wouldn't it be better if it was broker side only responsibility?

 E.g. now if one wants to use custom offset management, any of the
  Kafka
 monitoring tools cannot see the offsets - they would need to use
 same
 custom client implementation which is practically not possible.

 Kind regards,
 Stevo Slavic.

   
  
 



Re: Offset management: client vs broker side responsibility

2015-06-01 Thread Jason Rosenberg
Stevo,

Both of the main solutions used by the high-level consumer are standardized
and supported directly by the kafka client libraries (e.g. maintaining
offsets in zookeeper or in kafka itself).  And for the zk case, there is
the consumer offset checker (which is good for monitoring).  Consumer
offset checker still needs to be extended for offsets stored in kafka
_consumer_offset topics though.

Anyway, I'm not sure I understand your question, you want something for
better monitoring of all possible clients (some of which might choose to
manage offsets in their own way)?

It's just not part of the kafka design to directly track individual
consumers.

Jason

On Wed, May 27, 2015 at 7:42 AM, Shady Xu shad...@gmail.com wrote:

 I guess adding a new component will increase the complexity of the system
 structure. And if the new component consists of one or a few nodes, it may
 becomes the bottleneck of the whole system, if it consists of many nodes,
 it will make the system even more complex.

 Although every solution has its downsides, I think the current one is
 decent.

 2015-05-27 17:10 GMT+08:00 Stevo Slavić ssla...@gmail.com:

  It could be a separate server component, does not have to be
  monolith/coupled with broker.
  Such solution would have benefits - single API, pluggable
 implementations.
 
  On Wed, May 27, 2015 at 8:57 AM, Shady Xu shad...@gmail.com wrote:
 
   Storing and managing offsets by broker will leave high pressure on the
   brokers which will affect the performance of the cluster.
  
   You can use the advanced consumer APIs, then you can get the offsets
  either
   from zookeeper or the __consumer_offsets__ topic. On the other hand, if
  you
   use the simple consumer APIs, you mean to manage offsets yourself, then
  you
   should monitor them yourself, simple and plain, right?
  
   2015-04-22 14:36 GMT+08:00 Stevo Slavić ssla...@gmail.com:
  
Hello Apache Kafka community,
   
Please correct me if wrong, AFAIK currently (Kafka 0.8.2.x) offset
management responsibility is mainly client/consumer side
  responsibility.
   
Wouldn't it be better if it was broker side only responsibility?
   
E.g. now if one wants to use custom offset management, any of the
 Kafka
monitoring tools cannot see the offsets - they would need to use same
custom client implementation which is practically not possible.
   
Kind regards,
Stevo Slavic.
   
  
 



Re: Offset management: client vs broker side responsibility

2015-05-27 Thread Shady Xu
I guess adding a new component will increase the complexity of the system
structure. And if the new component consists of one or a few nodes, it may
becomes the bottleneck of the whole system, if it consists of many nodes,
it will make the system even more complex.

Although every solution has its downsides, I think the current one is
decent.

2015-05-27 17:10 GMT+08:00 Stevo Slavić ssla...@gmail.com:

 It could be a separate server component, does not have to be
 monolith/coupled with broker.
 Such solution would have benefits - single API, pluggable implementations.

 On Wed, May 27, 2015 at 8:57 AM, Shady Xu shad...@gmail.com wrote:

  Storing and managing offsets by broker will leave high pressure on the
  brokers which will affect the performance of the cluster.
 
  You can use the advanced consumer APIs, then you can get the offsets
 either
  from zookeeper or the __consumer_offsets__ topic. On the other hand, if
 you
  use the simple consumer APIs, you mean to manage offsets yourself, then
 you
  should monitor them yourself, simple and plain, right?
 
  2015-04-22 14:36 GMT+08:00 Stevo Slavić ssla...@gmail.com:
 
   Hello Apache Kafka community,
  
   Please correct me if wrong, AFAIK currently (Kafka 0.8.2.x) offset
   management responsibility is mainly client/consumer side
 responsibility.
  
   Wouldn't it be better if it was broker side only responsibility?
  
   E.g. now if one wants to use custom offset management, any of the Kafka
   monitoring tools cannot see the offsets - they would need to use same
   custom client implementation which is practically not possible.
  
   Kind regards,
   Stevo Slavic.
  
 



Re: Offset management: client vs broker side responsibility

2015-05-27 Thread Stevo Slavić
It could be a separate server component, does not have to be
monolith/coupled with broker.
Such solution would have benefits - single API, pluggable implementations.

On Wed, May 27, 2015 at 8:57 AM, Shady Xu shad...@gmail.com wrote:

 Storing and managing offsets by broker will leave high pressure on the
 brokers which will affect the performance of the cluster.

 You can use the advanced consumer APIs, then you can get the offsets either
 from zookeeper or the __consumer_offsets__ topic. On the other hand, if you
 use the simple consumer APIs, you mean to manage offsets yourself, then you
 should monitor them yourself, simple and plain, right?

 2015-04-22 14:36 GMT+08:00 Stevo Slavić ssla...@gmail.com:

  Hello Apache Kafka community,
 
  Please correct me if wrong, AFAIK currently (Kafka 0.8.2.x) offset
  management responsibility is mainly client/consumer side responsibility.
 
  Wouldn't it be better if it was broker side only responsibility?
 
  E.g. now if one wants to use custom offset management, any of the Kafka
  monitoring tools cannot see the offsets - they would need to use same
  custom client implementation which is practically not possible.
 
  Kind regards,
  Stevo Slavic.
 



Offset management: client vs broker side responsibility

2015-04-22 Thread Stevo Slavić
Hello Apache Kafka community,

Please correct me if wrong, AFAIK currently (Kafka 0.8.2.x) offset
management responsibility is mainly client/consumer side responsibility.

Wouldn't it be better if it was broker side only responsibility?

E.g. now if one wants to use custom offset management, any of the Kafka
monitoring tools cannot see the offsets - they would need to use same
custom client implementation which is practically not possible.

Kind regards,
Stevo Slavic.


Re: offset-management-in-kafka

2015-04-08 Thread Gwen Shapira
You are looking at the newer, and not-ready-yet consumer there.

The configuration you are looking for is defined here:
core/src/main/scala/kafka/consumer/ConsumerConfig.scala

Gwen

On Wed, Apr 8, 2015 at 9:25 AM, Yosi Botzer yosi.bot...@gmail.com wrote:

 Hi,

 I am looking at the java
 class org.apache.kafka.clients.consumer.ConsumerConfig and I do not see
 there a constant for offsets.storage

 Am I missing something?

 This is my pom dependency definition:

 dependency
 groupIdorg.apache.kafka/groupId
 artifactIdkafka_2.10/artifactId
 version0.8.2.1/version
 /dependency

 On Wed, Apr 8, 2015 at 6:29 PM, Gwen Shapira gshap...@cloudera.com
 wrote:

  This is available from 0.8.2.0, and is enabled on server by default. The
  consumer needs to specify offsets.storage parameter - the default is
 still
  zookeeper, so the consumers should set it to 'kafka'.
 
  The documentation also explain how to migrate from zookeeper offsets to
  kafka offsets.
 
  Gwen
 
  On Wed, Apr 8, 2015 at 8:13 AM, Yosi Botzer yosi.bot...@gmail.com
 wrote:
 
   Hi,
  
   I have seen this presentation:
   http://www.slideshare.net/jjkoshy/offset-management-in-kafka
  
   describing a new approach for offset management.
  
   I wanted to know from which kafka version this option will be available
  and
   will it be available by default
  
   Thanks
   Yosi
  
 



Re: offset-management-in-kafka

2015-04-08 Thread Gwen Shapira
This is available from 0.8.2.0, and is enabled on server by default. The
consumer needs to specify offsets.storage parameter - the default is still
zookeeper, so the consumers should set it to 'kafka'.

The documentation also explain how to migrate from zookeeper offsets to
kafka offsets.

Gwen

On Wed, Apr 8, 2015 at 8:13 AM, Yosi Botzer yosi.bot...@gmail.com wrote:

 Hi,

 I have seen this presentation:
 http://www.slideshare.net/jjkoshy/offset-management-in-kafka

 describing a new approach for offset management.

 I wanted to know from which kafka version this option will be available and
 will it be available by default

 Thanks
 Yosi



Re: offset-management-in-kafka

2015-04-08 Thread Yosi Botzer
Thanks Gwan,

Are there any plans to change the default from zookeeper to kafka in the
future?


On Wed, Apr 8, 2015 at 6:29 PM, Gwen Shapira gshap...@cloudera.com wrote:

 This is available from 0.8.2.0, and is enabled on server by default. The
 consumer needs to specify offsets.storage parameter - the default is still
 zookeeper, so the consumers should set it to 'kafka'.

 The documentation also explain how to migrate from zookeeper offsets to
 kafka offsets.

 Gwen

 On Wed, Apr 8, 2015 at 8:13 AM, Yosi Botzer yosi.bot...@gmail.com wrote:

  Hi,
 
  I have seen this presentation:
  http://www.slideshare.net/jjkoshy/offset-management-in-kafka
 
  describing a new approach for offset management.
 
  I wanted to know from which kafka version this option will be available
 and
  will it be available by default
 
  Thanks
  Yosi
 



Re: New Offset Management API Question

2015-03-26 Thread Joel Koshy

 1) Does Offset Commit/Fetch API works with Simple Consumer ?

Yes - in 0.8.2.1. There is an example given at
https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka

 2) With MM, can you selectively MM offset topic.
 
 Use Case:
 Let's suppose you want to build Active Consumer Group in DC1 and passive
 Consumer Group (Not yet started in DC 2).  Can you MM single offset topic
 and when DC1 consumer Group goes down, DC2 (with manual or  automated
 custom logic) start same consumer group with last committed offset.  Is
 this possible ?

Not sure I follow - the passive consumer would just need to start up
with the same group right? Then it would pick up the last committed
offset in DC1 for that group.

Thanks,

Joel


zookeeper-less offset management

2015-03-12 Thread Pierre-Yves Ritschard
Hi list,

I was under the impression that consumers still needed to interact with
zookeeper to track their offset. Going through recent Jiras to track the
progress I see that https://issues.apache.org/jira/browse/KAFKA-1000 and
https://issues.apache.org/jira/browse/KAFKA-1012 seem to indicate that
offset tracking is now supported by brokers.

Does this mean that pending support in client libraries, everything is
good to go to part with the ZK dependency ? If so this is awesome news
for the non-JVM world, I don't know how I missed it. I suppose this also
means a pure-Java consumer lib should not be too far off, or is
additional functionality needed ?

Cheers,
  - pyr


Offset management implementation

2015-02-20 Thread Clement Dussieux | AT Internet
Hi,


I am using Kafka_2.9.2-0.8.2 and play a bit with offsets in my code.
I would like to know how is implemented the offset system for message posting.

The main question here is: for every message posted, it gets an offset greater 
that the previous one, meaning that message1 gets offset x and message2 gets 
offset x+1 but how far can offset go ?
Offset does loop and comeback to zero at a point, right ?
Is it deterministic ? Is it just a huge constant and the offset is incremented 
modulo(%) this constant ?

The fact is that I store the offset in my code and like to update it regarding 
to the treatments i do, but I imagine that i cannot just increase it every time 
i read a message (i read in the middle of topic).
I tried to find out how it was in your source code but it did not really turned 
to be  success.

Thank you for your possible answer and for kafka !

Clément Dussieux



Re: Offset management implementation

2015-02-20 Thread Gwen Shapira
We store offsets in INT64, so you can go as high as:
9,223,372,036,854,775,807
messages per topic-partition before looping around :)

Gwen

On Fri, Feb 20, 2015 at 12:21 AM, Clement Dussieux | AT Internet 
clement.dussi...@atinternet.com wrote:

 Hi,


 I am using Kafka_2.9.2-0.8.2 and play a bit with offsets in my code.
 I would like to know how is implemented the offset system for message
 posting.

 The main question here is: for every message posted, it gets an offset
 greater that the previous one, meaning that message1 gets offset x and
 message2 gets offset x+1 but how far can offset go ?
 Offset does loop and comeback to zero at a point, right ?
 Is it deterministic ? Is it just a huge constant and the offset is
 incremented modulo(%) this constant ?

 The fact is that I store the offset in my code and like to update it
 regarding to the treatments i do, but I imagine that i cannot just increase
 it every time i read a message (i read in the middle of topic).
 I tried to find out how it was in your source code but it did not really
 turned to be  success.

 Thank you for your possible answer and for kafka !

 Clément Dussieux




Re: New Consumer Offset management in 0.8.2

2015-02-19 Thread Joel Koshy
Yes it is supported in 0.8.2-beta. It is documented on the site - you
will need to set offsets.storage to kafka.

On Thu, Feb 19, 2015 at 03:57:31PM -0500, Matthew Butt wrote:
 I'm having a hard time figuring out if the new Kafka-based offset
 management in the high-level Scala Consumer is implemented in the current
 version of 0.8.2-beta. If I implement a high-level consumer, will it use
 the new system, or will it still be storing in zookeeper? Do I need to wait
 for the Java consumer to take advantage of it?
 
 -- 
 - Matt



Re: question about new consumer offset management in 0.8.2

2015-02-06 Thread Joel Koshy
On Fri, Feb 06, 2015 at 12:43:37AM -0500, Jason Rosenberg wrote:
 I'm not sure what you mean by 'default' behavior 'only if' offset.storage
 is kafka.  Does that mean the 'default' behavior is 'false' if
 offset.storage is 'zookeeper'?  Can that be clarified in the config
 documentation section?
 
 In section 5.6 where the offset managements is described, there is this:
 A roll-back (i.e., migrating from Kafka back to ZooKeeper) can also be
 performed using the above steps if you set offsets.storage=zookeeper.
 
 This implies that dual commit will work also if offsets.storage=zookeeper,
 no?  Just not by default?  Perhaps there needs to be clarification there
 (and in the config section for offsets.storage  dual.commit.enabled).

Actually I think there may be a bug here if someone needs to roll back
from Kafka-based offsets to zookeeper. Will reply tomorrow on this.

 
 The doc in section 5.6 is probably in need of editing, it looks like it in
 places assumes zookeeper offset storage, and has some repeated sentences,
 etc.

Will do.

 
 Finally, why is section 5.6 titled Distribution?  Seems to be a grab-bag
 of mostly consumer related topics?

Yes this was prior structure that can be improved.

 
   On Thu, Feb 5, 2015 at 2:21 PM, Joel Koshy jjkosh...@gmail.com wrote:
  
This is documented in the official docs:
http://kafka.apache.org/documentation.html#distributionimpl
   
On Thu, Feb 05, 2015 at 01:23:01PM -0500, Jason Rosenberg wrote:
 What are the defaults for those settings (I assume it will be to
  continue
 using only zookeeper by default)?

 Also, if I have a cluster of consumers sharing the same groupId, and
  I
 update them via a rolling release, will it be a problem during the
rolling
 restart if there is inconsistency in the settings for a short time?
  Or
is
 it required that the entire cluster be stopped, then update configs,
  then
 restart all nodes?

 Jason

 On Thu, Feb 5, 2015 at 12:45 PM, Gwen Shapira gshap...@cloudera.com
  
wrote:

  Thanks Jon. I updated the FAQ with your procedure:
 
 
 
   
  https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdowemigratetocommittingoffsetstoKafka(ratherthanZookeeper)in0.8.2
  ?
 
  On Thu, Feb 5, 2015 at 9:16 AM, Jon Bringhurst 
  jbringhu...@linkedin.com.invalid wrote:
 
   There should probably be a wiki page started for this so we have
  the
   details in one place. The same question was asked on Freenode
  IRC a
few
   minutes ago. :)
  
   A summary of the migration procedure is:
  
   1) Upgrade your brokers and set dual.commit.enabled=false and
   offsets.storage=zookeeper (Commit offsets to Zookeeper Only).
   2) Set dual.commit.enabled=true and offsets.storage=kafka and
  restart
   (Commit offsets to Zookeeper and Kafka).
   3) Set dual.commit.enabled=false and offsets.storage=kafka and
restart
   (Commit offsets to Kafka only).
  
   -Jon
  
   On Feb 5, 2015, at 9:03 AM, Jason Rosenberg j...@squareup.com
wrote:
  
Hi,
   
For 0.8.2, one of the features listed is:
 - Kafka-based offset storage.
   
Is there documentation on this (I've heard discussion of it of
course)?
   
Also, is it something that will be used by existing consumers
  when
they
migrate up to 0.8.2?  What is the migration process?
   
Thanks,
   
Jason
  
  
 
   
   
 
  --
  Joel
 



Re: question about new consumer offset management in 0.8.2

2015-02-06 Thread Joel Koshy

On Thu, Feb 05, 2015 at 11:57:15PM -0800, Joel Koshy wrote:
 On Fri, Feb 06, 2015 at 12:43:37AM -0500, Jason Rosenberg wrote:
  I'm not sure what you mean by 'default' behavior 'only if' offset.storage
  is kafka.  Does that mean the 'default' behavior is 'false' if
  offset.storage is 'zookeeper'?  Can that be clarified in the config
  documentation section?
  
  In section 5.6 where the offset managements is described, there is this:
  A roll-back (i.e., migrating from Kafka back to ZooKeeper) can also be
  performed using the above steps if you set offsets.storage=zookeeper.
  
  This implies that dual commit will work also if offsets.storage=zookeeper,
  no?  Just not by default?  Perhaps there needs to be clarification there
  (and in the config section for offsets.storage  dual.commit.enabled).
 
 Actually I think there may be a bug here if someone needs to roll back
 from Kafka-based offsets to zookeeper. Will reply tomorrow on this.
 
  

Never mind - I think we are fine here.  The scenario I was thinking
about is the following: 
- If there are three consumer instances c0, c1, c2 consuming
  partitions pX, pY, ...  and are committing offsets to Kafka and you
  want to migrate to zookeeper
- Do a rolling bounce to turn on dual-commit (and keep offset.storage
  = kafka)
- Do another rolling bounce to set offset.storage to zookeeper:
  - Say, you bounce c0 to commit offsets to zk and it comes back up
and then owns pX. It begins to commit offsets for pX to zookeeper
only.
  - You then bounce c1; after it goes down due to our partition
assignment strategy say pX is now assigned to c2 (which has not
yet been bounced).
  - c2 uses offset.storage=kafka so would fetch a potentially stale
offset for pX which would be an issue. 
  - So we explicitly handle this case - if dual.commit is turned on
and offset.storage is kafka, then the broker fetches offsets from
both Kafka and ZooKeeper and selects the maximum of the two.

Let me know if you see any holes in the above.

dual.commit is confusing and would have been (slightly) less confusing if it
was called offset.migration.in.progress or something similar. Still, I think
we can document the process carefully and state clearly that it is
intended for use during migration/roll-back only.



Re: question about new consumer offset management in 0.8.2

2015-02-05 Thread Jason Rosenberg
On Thu, Feb 5, 2015 at 9:52 PM, Joel Koshy jjkosh...@gmail.com wrote:

  Ok, so it looks like the default settings are:
  offset.storage = zookeeper
  dual.commit.enabled = true
  The doc for 'dual.commit.enabled' seems to imply (but doesn't clearly
  state) that it will only apply if offset.storage = kafka.  Is that right?
  (I'm guessing not)

 dual.commit.enabled defaults to true only if offset.storage is kafka.
 As you noted, it only applies if offset.storage = kafka is primarily
 intended for migration.


I'm not sure what you mean by 'default' behavior 'only if' offset.storage
is kafka.  Does that mean the 'default' behavior is 'false' if
offset.storage is 'zookeeper'?  Can that be clarified in the config
documentation section?

In section 5.6 where the offset managements is described, there is this:
A roll-back (i.e., migrating from Kafka back to ZooKeeper) can also be
performed using the above steps if you set offsets.storage=zookeeper.

This implies that dual commit will work also if offsets.storage=zookeeper,
no?  Just not by default?  Perhaps there needs to be clarification there
(and in the config section for offsets.storage  dual.commit.enabled).

The doc in section 5.6 is probably in need of editing, it looks like it in
places assumes zookeeper offset storage, and has some repeated sentences,
etc.

Finally, why is section 5.6 titled Distribution?  Seems to be a grab-bag
of mostly consumer related topics?


  It seems to me less than ideal to have the default behavior to have
  dual.commit.enabled = true, since this seems like a performance hit, no?

 To some degree yes, but it is relatively cheap.


And as you pointed out (I think) it's not even an issue because it won't be
dual committing initially, by default (while offsets.storage = zookeeper),
right?


  Also, I assume the __consumer_offsets topic will be set to have an
 infinite
  retention policy internally, is that right?  So that currently committed
  offsets for a given consumer group won't be lost?

 It uses the compaction retention policy - so the topic won't grow
 unbounded.


Nice



 Thanks,

 Joel

  On Thu, Feb 5, 2015 at 2:21 PM, Joel Koshy jjkosh...@gmail.com wrote:
 
   This is documented in the official docs:
   http://kafka.apache.org/documentation.html#distributionimpl
  
   On Thu, Feb 05, 2015 at 01:23:01PM -0500, Jason Rosenberg wrote:
What are the defaults for those settings (I assume it will be to
 continue
using only zookeeper by default)?
   
Also, if I have a cluster of consumers sharing the same groupId, and
 I
update them via a rolling release, will it be a problem during the
   rolling
restart if there is inconsistency in the settings for a short time?
 Or
   is
it required that the entire cluster be stopped, then update configs,
 then
restart all nodes?
   
Jason
   
On Thu, Feb 5, 2015 at 12:45 PM, Gwen Shapira gshap...@cloudera.com
 
   wrote:
   
 Thanks Jon. I updated the FAQ with your procedure:



  
 https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdowemigratetocommittingoffsetstoKafka(ratherthanZookeeper)in0.8.2
 ?

 On Thu, Feb 5, 2015 at 9:16 AM, Jon Bringhurst 
 jbringhu...@linkedin.com.invalid wrote:

  There should probably be a wiki page started for this so we have
 the
  details in one place. The same question was asked on Freenode
 IRC a
   few
  minutes ago. :)
 
  A summary of the migration procedure is:
 
  1) Upgrade your brokers and set dual.commit.enabled=false and
  offsets.storage=zookeeper (Commit offsets to Zookeeper Only).
  2) Set dual.commit.enabled=true and offsets.storage=kafka and
 restart
  (Commit offsets to Zookeeper and Kafka).
  3) Set dual.commit.enabled=false and offsets.storage=kafka and
   restart
  (Commit offsets to Kafka only).
 
  -Jon
 
  On Feb 5, 2015, at 9:03 AM, Jason Rosenberg j...@squareup.com
   wrote:
 
   Hi,
  
   For 0.8.2, one of the features listed is:
- Kafka-based offset storage.
  
   Is there documentation on this (I've heard discussion of it of
   course)?
  
   Also, is it something that will be used by existing consumers
 when
   they
   migrate up to 0.8.2?  What is the migration process?
  
   Thanks,
  
   Jason
 
 

  
  

 --
 Joel



Re: question about new consumer offset management in 0.8.2

2015-02-05 Thread Jason Rosenberg
Ok, so it looks like the default settings are:

offset.storage = zookeeper
dual.commit.enabled = true

The doc for 'dual.commit.enabled' seems to imply (but doesn't clearly
state) that it will only apply if offset.storage = kafka.  Is that right?
(I'm guessing not)

*If you are using kafka* as offsets.storage, you can dual commit offsets
to ZooKeeper (in addition to Kafka).

It seems to me less than ideal to have the default behavior to have
dual.commit.enabled = true, since this seems like a performance hit, no?
I'd think you'd only want this during a planned migration.

Also, I assume it's desirable to switch to using 'kafka' for offset
storage, for performance reasons?  Will it better handle a larger number of
topics?
Also, I assume the __consumer_offsets topic will be set to have an infinite
retention policy internally, is that right?  So that currently committed
offsets for a given consumer group won't be lost?

Jason

On Thu, Feb 5, 2015 at 2:21 PM, Joel Koshy jjkosh...@gmail.com wrote:

 This is documented in the official docs:
 http://kafka.apache.org/documentation.html#distributionimpl

 On Thu, Feb 05, 2015 at 01:23:01PM -0500, Jason Rosenberg wrote:
  What are the defaults for those settings (I assume it will be to continue
  using only zookeeper by default)?
 
  Also, if I have a cluster of consumers sharing the same groupId, and I
  update them via a rolling release, will it be a problem during the
 rolling
  restart if there is inconsistency in the settings for a short time?  Or
 is
  it required that the entire cluster be stopped, then update configs, then
  restart all nodes?
 
  Jason
 
  On Thu, Feb 5, 2015 at 12:45 PM, Gwen Shapira gshap...@cloudera.com
 wrote:
 
   Thanks Jon. I updated the FAQ with your procedure:
  
  
  
 https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdowemigratetocommittingoffsetstoKafka(ratherthanZookeeper)in0.8.2
   ?
  
   On Thu, Feb 5, 2015 at 9:16 AM, Jon Bringhurst 
   jbringhu...@linkedin.com.invalid wrote:
  
There should probably be a wiki page started for this so we have the
details in one place. The same question was asked on Freenode IRC a
 few
minutes ago. :)
   
A summary of the migration procedure is:
   
1) Upgrade your brokers and set dual.commit.enabled=false and
offsets.storage=zookeeper (Commit offsets to Zookeeper Only).
2) Set dual.commit.enabled=true and offsets.storage=kafka and restart
(Commit offsets to Zookeeper and Kafka).
3) Set dual.commit.enabled=false and offsets.storage=kafka and
 restart
(Commit offsets to Kafka only).
   
-Jon
   
On Feb 5, 2015, at 9:03 AM, Jason Rosenberg j...@squareup.com
 wrote:
   
 Hi,

 For 0.8.2, one of the features listed is:
  - Kafka-based offset storage.

 Is there documentation on this (I've heard discussion of it of
 course)?

 Also, is it something that will be used by existing consumers when
 they
 migrate up to 0.8.2?  What is the migration process?

 Thanks,

 Jason
   
   
  




Re: question about new consumer offset management in 0.8.2

2015-02-05 Thread Joel Koshy
This is documented in the official docs: 
http://kafka.apache.org/documentation.html#distributionimpl

On Thu, Feb 05, 2015 at 01:23:01PM -0500, Jason Rosenberg wrote:
 What are the defaults for those settings (I assume it will be to continue
 using only zookeeper by default)?
 
 Also, if I have a cluster of consumers sharing the same groupId, and I
 update them via a rolling release, will it be a problem during the rolling
 restart if there is inconsistency in the settings for a short time?  Or is
 it required that the entire cluster be stopped, then update configs, then
 restart all nodes?
 
 Jason
 
 On Thu, Feb 5, 2015 at 12:45 PM, Gwen Shapira gshap...@cloudera.com wrote:
 
  Thanks Jon. I updated the FAQ with your procedure:
 
 
  https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdowemigratetocommittingoffsetstoKafka(ratherthanZookeeper)in0.8.2
  ?
 
  On Thu, Feb 5, 2015 at 9:16 AM, Jon Bringhurst 
  jbringhu...@linkedin.com.invalid wrote:
 
   There should probably be a wiki page started for this so we have the
   details in one place. The same question was asked on Freenode IRC a few
   minutes ago. :)
  
   A summary of the migration procedure is:
  
   1) Upgrade your brokers and set dual.commit.enabled=false and
   offsets.storage=zookeeper (Commit offsets to Zookeeper Only).
   2) Set dual.commit.enabled=true and offsets.storage=kafka and restart
   (Commit offsets to Zookeeper and Kafka).
   3) Set dual.commit.enabled=false and offsets.storage=kafka and restart
   (Commit offsets to Kafka only).
  
   -Jon
  
   On Feb 5, 2015, at 9:03 AM, Jason Rosenberg j...@squareup.com wrote:
  
Hi,
   
For 0.8.2, one of the features listed is:
 - Kafka-based offset storage.
   
Is there documentation on this (I've heard discussion of it of course)?
   
Also, is it something that will be used by existing consumers when they
migrate up to 0.8.2?  What is the migration process?
   
Thanks,
   
Jason
  
  
 



Re: question about new consumer offset management in 0.8.2

2015-02-05 Thread Surendranauth Hiraman
This is what I've found so far.

https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka

The high-level consumer just worked for me by setting offsets.storage =
kafka.

Scroll down to the offsets.* config params.

http://kafka.apache.org/documentation.html#consumerconfigs

-Suren



On Thu, Feb 5, 2015 at 12:16 PM, Jon Bringhurst 
jbringhu...@linkedin.com.invalid wrote:

 There should probably be a wiki page started for this so we have the
 details in one place. The same question was asked on Freenode IRC a few
 minutes ago. :)

 A summary of the migration procedure is:

 1) Upgrade your brokers and set dual.commit.enabled=false and
 offsets.storage=zookeeper (Commit offsets to Zookeeper Only).
 2) Set dual.commit.enabled=true and offsets.storage=kafka and restart
 (Commit offsets to Zookeeper and Kafka).
 3) Set dual.commit.enabled=false and offsets.storage=kafka and restart
 (Commit offsets to Kafka only).

 -Jon

 On Feb 5, 2015, at 9:03 AM, Jason Rosenberg j...@squareup.com wrote:

  Hi,
 
  For 0.8.2, one of the features listed is:
   - Kafka-based offset storage.
 
  Is there documentation on this (I've heard discussion of it of course)?
 
  Also, is it something that will be used by existing consumers when they
  migrate up to 0.8.2?  What is the migration process?
 
  Thanks,
 
  Jason




-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

54 West 40th Street, 3RD FLOOR
NEW YORK, NY 10018
T: @suren_h
E: suren.hiraman@v suren.hira...@sociocast.comelos.io
W: www.velos.io


question about new consumer offset management in 0.8.2

2015-02-05 Thread Jason Rosenberg
Hi,

For 0.8.2, one of the features listed is:
  - Kafka-based offset storage.

Is there documentation on this (I've heard discussion of it of course)?

Also, is it something that will be used by existing consumers when they
migrate up to 0.8.2?  What is the migration process?

Thanks,

Jason


Re: question about new consumer offset management in 0.8.2

2015-02-05 Thread Jon Bringhurst
There should probably be a wiki page started for this so we have the details in 
one place. The same question was asked on Freenode IRC a few minutes ago. :)

A summary of the migration procedure is:

1) Upgrade your brokers and set dual.commit.enabled=false and 
offsets.storage=zookeeper (Commit offsets to Zookeeper Only).
2) Set dual.commit.enabled=true and offsets.storage=kafka and restart (Commit 
offsets to Zookeeper and Kafka).
3) Set dual.commit.enabled=false and offsets.storage=kafka and restart (Commit 
offsets to Kafka only).

-Jon

On Feb 5, 2015, at 9:03 AM, Jason Rosenberg j...@squareup.com wrote:

 Hi,
 
 For 0.8.2, one of the features listed is:
  - Kafka-based offset storage.
 
 Is there documentation on this (I've heard discussion of it of course)?
 
 Also, is it something that will be used by existing consumers when they
 migrate up to 0.8.2?  What is the migration process?
 
 Thanks,
 
 Jason



signature.asc
Description: Message signed with OpenPGP using GPGMail


Re: question about new consumer offset management in 0.8.2

2015-02-05 Thread Joel Koshy
 Ok, so it looks like the default settings are:
 offset.storage = zookeeper
 dual.commit.enabled = true
 The doc for 'dual.commit.enabled' seems to imply (but doesn't clearly
 state) that it will only apply if offset.storage = kafka.  Is that right?
 (I'm guessing not)

dual.commit.enabled defaults to true only if offset.storage is kafka.
As you noted, it only applies if offset.storage = kafka is primarily
intended for migration.

 It seems to me less than ideal to have the default behavior to have
 dual.commit.enabled = true, since this seems like a performance hit, no?

To some degree yes, but it is relatively cheap.

 I'd think you'd only want this during a planned migration.

Yes.

 
 Also, I assume it's desirable to switch to using 'kafka' for offset
 storage, for performance reasons?  Will it better handle a larger number of
 topics?

Yes.

 Also, I assume the __consumer_offsets topic will be set to have an infinite
 retention policy internally, is that right?  So that currently committed
 offsets for a given consumer group won't be lost?

It uses the compaction retention policy - so the topic won't grow
unbounded. Compaction will basically dedupe on the inactive segments
of the topic - so in effect it will only maintain the last committed
offset for a given group-topic-partition 3-tuple (plus a few - since
it runs only when a certain dirtiness threshold has been met). The
compaction policy is also documented on the site.

Thanks,

Joel

 On Thu, Feb 5, 2015 at 2:21 PM, Joel Koshy jjkosh...@gmail.com wrote:
 
  This is documented in the official docs:
  http://kafka.apache.org/documentation.html#distributionimpl
 
  On Thu, Feb 05, 2015 at 01:23:01PM -0500, Jason Rosenberg wrote:
   What are the defaults for those settings (I assume it will be to continue
   using only zookeeper by default)?
  
   Also, if I have a cluster of consumers sharing the same groupId, and I
   update them via a rolling release, will it be a problem during the
  rolling
   restart if there is inconsistency in the settings for a short time?  Or
  is
   it required that the entire cluster be stopped, then update configs, then
   restart all nodes?
  
   Jason
  
   On Thu, Feb 5, 2015 at 12:45 PM, Gwen Shapira gshap...@cloudera.com
  wrote:
  
Thanks Jon. I updated the FAQ with your procedure:
   
   
   
  https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdowemigratetocommittingoffsetstoKafka(ratherthanZookeeper)in0.8.2
?
   
On Thu, Feb 5, 2015 at 9:16 AM, Jon Bringhurst 
jbringhu...@linkedin.com.invalid wrote:
   
 There should probably be a wiki page started for this so we have the
 details in one place. The same question was asked on Freenode IRC a
  few
 minutes ago. :)

 A summary of the migration procedure is:

 1) Upgrade your brokers and set dual.commit.enabled=false and
 offsets.storage=zookeeper (Commit offsets to Zookeeper Only).
 2) Set dual.commit.enabled=true and offsets.storage=kafka and restart
 (Commit offsets to Zookeeper and Kafka).
 3) Set dual.commit.enabled=false and offsets.storage=kafka and
  restart
 (Commit offsets to Kafka only).

 -Jon

 On Feb 5, 2015, at 9:03 AM, Jason Rosenberg j...@squareup.com
  wrote:

  Hi,
 
  For 0.8.2, one of the features listed is:
   - Kafka-based offset storage.
 
  Is there documentation on this (I've heard discussion of it of
  course)?
 
  Also, is it something that will be used by existing consumers when
  they
  migrate up to 0.8.2?  What is the migration process?
 
  Thanks,
 
  Jason


   
 
 

-- 
Joel


Re: question about new consumer offset management in 0.8.2

2015-02-05 Thread Gwen Shapira
Thanks Jon. I updated the FAQ with your procedure:

https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdowemigratetocommittingoffsetstoKafka(ratherthanZookeeper)in0.8.2
?

On Thu, Feb 5, 2015 at 9:16 AM, Jon Bringhurst 
jbringhu...@linkedin.com.invalid wrote:

 There should probably be a wiki page started for this so we have the
 details in one place. The same question was asked on Freenode IRC a few
 minutes ago. :)

 A summary of the migration procedure is:

 1) Upgrade your brokers and set dual.commit.enabled=false and
 offsets.storage=zookeeper (Commit offsets to Zookeeper Only).
 2) Set dual.commit.enabled=true and offsets.storage=kafka and restart
 (Commit offsets to Zookeeper and Kafka).
 3) Set dual.commit.enabled=false and offsets.storage=kafka and restart
 (Commit offsets to Kafka only).

 -Jon

 On Feb 5, 2015, at 9:03 AM, Jason Rosenberg j...@squareup.com wrote:

  Hi,
 
  For 0.8.2, one of the features listed is:
   - Kafka-based offset storage.
 
  Is there documentation on this (I've heard discussion of it of course)?
 
  Also, is it something that will be used by existing consumers when they
  migrate up to 0.8.2?  What is the migration process?
 
  Thanks,
 
  Jason




Re: question about new consumer offset management in 0.8.2

2015-02-05 Thread Jason Rosenberg
What are the defaults for those settings (I assume it will be to continue
using only zookeeper by default)?

Also, if I have a cluster of consumers sharing the same groupId, and I
update them via a rolling release, will it be a problem during the rolling
restart if there is inconsistency in the settings for a short time?  Or is
it required that the entire cluster be stopped, then update configs, then
restart all nodes?

Jason

On Thu, Feb 5, 2015 at 12:45 PM, Gwen Shapira gshap...@cloudera.com wrote:

 Thanks Jon. I updated the FAQ with your procedure:


 https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdowemigratetocommittingoffsetstoKafka(ratherthanZookeeper)in0.8.2
 ?

 On Thu, Feb 5, 2015 at 9:16 AM, Jon Bringhurst 
 jbringhu...@linkedin.com.invalid wrote:

  There should probably be a wiki page started for this so we have the
  details in one place. The same question was asked on Freenode IRC a few
  minutes ago. :)
 
  A summary of the migration procedure is:
 
  1) Upgrade your brokers and set dual.commit.enabled=false and
  offsets.storage=zookeeper (Commit offsets to Zookeeper Only).
  2) Set dual.commit.enabled=true and offsets.storage=kafka and restart
  (Commit offsets to Zookeeper and Kafka).
  3) Set dual.commit.enabled=false and offsets.storage=kafka and restart
  (Commit offsets to Kafka only).
 
  -Jon
 
  On Feb 5, 2015, at 9:03 AM, Jason Rosenberg j...@squareup.com wrote:
 
   Hi,
  
   For 0.8.2, one of the features listed is:
- Kafka-based offset storage.
  
   Is there documentation on this (I've heard discussion of it of course)?
  
   Also, is it something that will be used by existing consumers when they
   migrate up to 0.8.2?  What is the migration process?
  
   Thanks,
  
   Jason
 
 



Re: Offset management in multi-threaded high-level consumer

2015-01-18 Thread Jun Rao
There isn't much difference btw option 1 and 2 in terms of the offset
commit overhead to Zookeeper. In 0.8.2, we will have a Kafka-based offset
management, which is much more efficient than committing to Zookeeper.

Thanks,

Jun

On Tue, Jan 6, 2015 at 10:45 AM, Rafi Shamim r...@knewton.com wrote:

 Hello,

 I would like to write a multi-threaded consumer for the high-level
 consumer in Kafka 0.8.1. I have found two ways that seem feasible
 while keeping the guarantee that messages in a partition are processed
 in order. I would appreciate any feedback this list has.

 Option 1
 
 - Create multiple threads, so each thread has its own ConsumerConnector.
 - Manually commit offsets in each thread after every N messages.
 - This was discussed a bit on this list previously. See [1].

 ### Questions
 - Is there a problem with making multiple ConsumerConnectors per machine?
 - What does it take for ZooKeeper to handle this much load? We have a
 3-node ZooKeeper cluster with relatively small machines. (I expect the
 topic will have about 40 messages per second. There will be 3 consumer
 groups. That would be 120 commits per second at most, but I can reduce
 the frequency of commits to make this lower.)

 ### Extra info
 Kafka 0.9 will have an entirely different commit API, which will allow
 one connection to commit offsets per partition, but I can’t wait that
 long. See [2].


 Option 2
 
 - Create one ConsumerConnector, but ask for multiple streams in that
 connection. Give each thread one stream.
 - Since there is no way to commit offsets per stream right now, we
 need to do autoCommit.
 - This sacrifices the at-least-once processing guarantee, which would
 be nice to have. See KAFKA-1612 [3].

 ### Extra info
 - There was some discussion in KAFKA-996 about a markForCommit()
 method so that autoCommit would preserve the at-least-once guarantee,
 but it seems more likely that the consumer API will just be redesigned
 to allow commits per partition instead. See [4].


 So basically I'm wondering if option 1 is feasible. If not, I'll just
 do option 2. Of course, let me know if I was mistaken about anything
 or if there is another design which is better.

 Thanks in advance.
 Rafi

 [1]
 http://mail-archives.apache.org/mod_mbox/kafka-users/201310.mbox/%3cff142f6b499ae34caed4d263f6ca32901d35a...@extxmb19.nam.nsroot.net%3E
 [2]
 https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ConsumerAPI
 [3] https://issues.apache.org/jira/browse/KAFKA-1612
 [4] https://issues.apache.org/jira/browse/KAFKA-966



Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-12 Thread Joel Koshy
Catching up on email - yes the wire protocol change was a big mistake
and +1 on this approach. I'm reviewing KAFKA-1841 right now.

On Thu, Jan 08, 2015 at 10:07:44PM -0800, Dana Powers wrote:
 This approach makes sense to me -- thanks, Jun.
 
 -Dana
 
 On Thu, Jan 8, 2015 at 3:57 PM, Jun Rao j...@confluent.io wrote:
 
  I took a stab at fixing this issue in KAFKA-1841.
 
  This is actually a bit tricky to fix. In addition to the incompatible wire
  protocol change of version 0 of OffsetCommitRequest, there is also a
  functionality change. Specifically, in 0.8.1, version 0 of
  OffsetCommitRequest writes to ZK and version 0 of OffsetFetchRequest reads
  from ZK. In 0.8.2/trunk, this is changed to version 0 and version 1 of
  OffsetCommitRequest writes to Kafka and version 0 of OffsetFetchRequest
  reads from Kafka. To make this really backward compatible with 0.8.1, we
  have to
  (1) revert the wire protocol change of version 0 of OffsetCommitRequest.
  (2) change the behavior of version 0 of OffsetCommitRequest and
  OffsetFetchRequest to read from ZK
  (3) create version 1 of OffsetFetchRequest that reads from Kafka (so that
  it behaves consistently with version 1 of OffsetCommitRequest)
 
  That's what the patch in KAFKA-1841 does. This works as long as people are
  only using released final version. However, since this introduces an
  incompatible (functional) change of OffsetFetchRequest in 0.8.2-beta and
  trunk, this will create problems for people (assuming that they are using
  this api) who have a deployment of 0.8.2-beta and want to upgrade to 0.8.2
  final, or a deployment from trunk and want to upgrade to a later version of
  trunk in the future. In either case, the upgrade of the broker will cause
  the old client to behave differently and incorrectly. The only choice there
  is to stop the client and the broker and upgrade them together. Most people
  will probably only deploy final released version in production. However, I
  want to bring this up to see if anyone has concerns on this.
 
  Thanks,
 
  Jun
 
  On Wed, Jan 7, 2015 at 10:32 AM, Jay Kreps jay.kr...@gmail.com wrote:
 
   Yes, I think we are saying the same thing. Basically I am saying version
  0
   should be considered frozen as of the format and behavior in the 0.8.1
   release and we can do whatever we want as a version 1+.
  
   -Jay
  
   On Wed, Jan 7, 2015 at 10:10 AM, Joe Stein joe.st...@stealth.ly wrote:
  
 We need to take the versioning of the protocol seriously
   
amen
   
 People are definitely using the offset commit functionality in 0.8.1
   
agreed
   
 I really think we should treat this as a bug and revert the change
  to
version 0.
   
What do you mean exactly by revert? Why can't we use version as a
  feature
flag and support 0 and 1 at the same time? in the handleOffsetFetch and
handleOffsetCommit functions that process the request messages just do
  if
version == 0 old functionality else if version == 1 new functionality.
This way everyone works and nothing breaks  =8^)
   
/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
/
   
On Wed, Jan 7, 2015 at 1:04 PM, Jay Kreps j...@confluent.io wrote:
   
 Hey guys,

 We need to take the versioning of the protocol seriously. People are
 definitely using the offset commit functionality in 0.8.1 and I
  really
 think we should treat this as a bug and revert the change to version
  0.

 -Jay

 On Wed, Jan 7, 2015 at 9:24 AM, Jun Rao j...@confluent.io wrote:

  Yes, we did make an incompatible change in OffsetCommitRequest in
0.8.2,
  which is a mistake. The incompatible change was introduced in
KAFKA-1012
 in
  Mar, 2014 when we added the kafka-based offset management support.
 However,
  we didn't realize that this breaks the wire protocol until much
   later.
 Now,
  the wire protocol has evolved again and it's a bit hard to fix the
format
  in version 0. I can see a couple of options.
 
  Option 1: Just accept the incompatible change as it is.
  The argument is that even though we introduced OffsetCommitRequest
  in
  0.8.1, it's not used in the high level consumer. It's possible that
some
  users of SimpleConsumer started using it. However, that number is
likely
  small. Also, the functionality of OffsetCommitRequest has changed
   since
  it's writing the offset to a Kafka log, instead of ZK (for good
reasons).
  So, we can document this as a wire protocol and functionality
 incompatible
  change. For users who don't mind the functionality change, they
  will
need
  to upgrade the client to the new protocol before they can use the
  new

Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-07 Thread Jay Kreps
Hey guys,

We need to take the versioning of the protocol seriously. People are
definitely using the offset commit functionality in 0.8.1 and I really
think we should treat this as a bug and revert the change to version 0.

-Jay

On Wed, Jan 7, 2015 at 9:24 AM, Jun Rao j...@confluent.io wrote:

 Yes, we did make an incompatible change in OffsetCommitRequest in 0.8.2,
 which is a mistake. The incompatible change was introduced in KAFKA-1012 in
 Mar, 2014 when we added the kafka-based offset management support. However,
 we didn't realize that this breaks the wire protocol until much later. Now,
 the wire protocol has evolved again and it's a bit hard to fix the format
 in version 0. I can see a couple of options.

 Option 1: Just accept the incompatible change as it is.
 The argument is that even though we introduced OffsetCommitRequest in
 0.8.1, it's not used in the high level consumer. It's possible that some
 users of SimpleConsumer started using it. However, that number is likely
 small. Also, the functionality of OffsetCommitRequest has changed since
 it's writing the offset to a Kafka log, instead of ZK (for good reasons).
 So, we can document this as a wire protocol and functionality incompatible
 change. For users who don't mind the functionality change, they will need
 to upgrade the client to the new protocol before they can use the new
 broker. For users who want to preserve the old functionality, they will
 have to write the offsets directly to ZK. In either case, hopefully the
 number of people being affected is small.

 Option 2: Revert version 0 format to what's in 0.8.1.
 There will be a few issues here. First, it's not clear how this affects
 other people who have been deploying from trunk. Second, I am not sure that
 we want to continue supporting writing the offset to ZK in
 OffsetCommitRequest
 since that can cause ZK to be overloaded.

 Joel Koshy,

 Any thoughts on this?

 Thanks,

 Jun

 On Mon, Jan 5, 2015 at 11:39 PM, Joe Stein joe.st...@stealth.ly wrote:

  In addition to the issue you bring up, the functionality as a whole has
  changed.. when you call OffsetFetchRequest the version = 0 needs to
  preserve the old functionality
 
 
 https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/server/KafkaApis.scala#L678-L700
  and version = 1 the new
 
 
 https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/KafkaApis.scala#L153-L223
  .
  Also the OffsetFetchRequest functionality even though the wire protocol
 is
  the same after the 0.8.2 upgrade for OffsetFetchRequest if you were using
  0.8.1.1 OffsetFetchRequest
 
 
 https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/server/KafkaApis.scala#L705-L728
  will stop going to zookeeper and start going to Kafka storage
 
 
 https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/KafkaApis.scala#L504-L519
  so more errors will happen and things break too.
 
  I think we should treat the version field not just to stop from breaking
  the wire protocol calls but also as a feature flag preserving upgrades
  and multiple pathways.
 
  I updated the JIRA for the feature flag needs for OffsetFetch and
  OffsetCommit too.
 
  /***
   Joe Stein
   Founder, Principal Consultant
   Big Data Open Source Security LLC
   http://www.stealth.ly
   Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
  /
 
  On Mon, Jan 5, 2015 at 3:21 PM, Dana Powers dana.pow...@rd.io wrote:
 
   ok, opened KAFKA-1841 .  KAFKA-1634 also related.
  
   -Dana
  
   On Mon, Jan 5, 2015 at 10:55 AM, Gwen Shapira gshap...@cloudera.com
   wrote:
  
Ooh, I see what you mean - the OffsetAndMetadata (or PartitionData)
part of the Map changed, which will modify the wire protocol.
   
This is actually not handled in the Java client either. It will send
the timestamp no matter which version is used.
   
This looks like a bug and I'd even mark it as blocker for 0.8.2 since
it may prevent rolling upgrades.
   
Are you opening the JIRA?
   
Gwen
   
On Mon, Jan 5, 2015 at 10:28 AM, Dana Powers dana.pow...@rd.io
  wrote:
 specifically comparing 0.8.1 --


   
  
 
 https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L37-L50
 ```
 (1 to partitionCount).map(_ = {
   val partitionId = buffer.getInt
   val offset = buffer.getLong
   val metadata = readShortString(buffer)
   (TopicAndPartition(topic, partitionId),
   OffsetMetadataAndError(offset,
 metadata))
 })
 ```

 totrunk --


   
  
 
 https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L44-L69
 ```
 (1 to partitionCount).map(_ = {
   val partitionId = buffer.getInt
   val offset = buffer.getLong
   val timestamp = {
 val given = buffer.getLong

Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-07 Thread Joe Stein
 We need to take the versioning of the protocol seriously

amen

 People are definitely using the offset commit functionality in 0.8.1

agreed

 I really think we should treat this as a bug and revert the change to
version 0.

What do you mean exactly by revert? Why can't we use version as a feature
flag and support 0 and 1 at the same time? in the handleOffsetFetch and
handleOffsetCommit functions that process the request messages just do if
version == 0 old functionality else if version == 1 new functionality.
This way everyone works and nothing breaks  =8^)

/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
/

On Wed, Jan 7, 2015 at 1:04 PM, Jay Kreps j...@confluent.io wrote:

 Hey guys,

 We need to take the versioning of the protocol seriously. People are
 definitely using the offset commit functionality in 0.8.1 and I really
 think we should treat this as a bug and revert the change to version 0.

 -Jay

 On Wed, Jan 7, 2015 at 9:24 AM, Jun Rao j...@confluent.io wrote:

  Yes, we did make an incompatible change in OffsetCommitRequest in 0.8.2,
  which is a mistake. The incompatible change was introduced in KAFKA-1012
 in
  Mar, 2014 when we added the kafka-based offset management support.
 However,
  we didn't realize that this breaks the wire protocol until much later.
 Now,
  the wire protocol has evolved again and it's a bit hard to fix the format
  in version 0. I can see a couple of options.
 
  Option 1: Just accept the incompatible change as it is.
  The argument is that even though we introduced OffsetCommitRequest in
  0.8.1, it's not used in the high level consumer. It's possible that some
  users of SimpleConsumer started using it. However, that number is likely
  small. Also, the functionality of OffsetCommitRequest has changed since
  it's writing the offset to a Kafka log, instead of ZK (for good reasons).
  So, we can document this as a wire protocol and functionality
 incompatible
  change. For users who don't mind the functionality change, they will need
  to upgrade the client to the new protocol before they can use the new
  broker. For users who want to preserve the old functionality, they will
  have to write the offsets directly to ZK. In either case, hopefully the
  number of people being affected is small.
 
  Option 2: Revert version 0 format to what's in 0.8.1.
  There will be a few issues here. First, it's not clear how this affects
  other people who have been deploying from trunk. Second, I am not sure
 that
  we want to continue supporting writing the offset to ZK in
  OffsetCommitRequest
  since that can cause ZK to be overloaded.
 
  Joel Koshy,
 
  Any thoughts on this?
 
  Thanks,
 
  Jun
 
  On Mon, Jan 5, 2015 at 11:39 PM, Joe Stein joe.st...@stealth.ly wrote:
 
   In addition to the issue you bring up, the functionality as a whole has
   changed.. when you call OffsetFetchRequest the version = 0 needs to
   preserve the old functionality
  
  
 
 https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/server/KafkaApis.scala#L678-L700
   and version = 1 the new
  
  
 
 https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/KafkaApis.scala#L153-L223
   .
   Also the OffsetFetchRequest functionality even though the wire protocol
  is
   the same after the 0.8.2 upgrade for OffsetFetchRequest if you were
 using
   0.8.1.1 OffsetFetchRequest
  
  
 
 https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/server/KafkaApis.scala#L705-L728
   will stop going to zookeeper and start going to Kafka storage
  
  
 
 https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/KafkaApis.scala#L504-L519
   so more errors will happen and things break too.
  
   I think we should treat the version field not just to stop from
 breaking
   the wire protocol calls but also as a feature flag preserving
 upgrades
   and multiple pathways.
  
   I updated the JIRA for the feature flag needs for OffsetFetch and
   OffsetCommit too.
  
   /***
Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
http://www.stealth.ly
Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
   /
  
   On Mon, Jan 5, 2015 at 3:21 PM, Dana Powers dana.pow...@rd.io wrote:
  
ok, opened KAFKA-1841 .  KAFKA-1634 also related.
   
-Dana
   
On Mon, Jan 5, 2015 at 10:55 AM, Gwen Shapira gshap...@cloudera.com
 
wrote:
   
 Ooh, I see what you mean - the OffsetAndMetadata (or PartitionData)
 part of the Map changed, which will modify the wire protocol.

 This is actually not handled in the Java client either. It will
 send
 the timestamp no matter which version is used

Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-07 Thread Jay Kreps
Yes, I think we are saying the same thing. Basically I am saying version 0
should be considered frozen as of the format and behavior in the 0.8.1
release and we can do whatever we want as a version 1+.

-Jay

On Wed, Jan 7, 2015 at 10:10 AM, Joe Stein joe.st...@stealth.ly wrote:

  We need to take the versioning of the protocol seriously

 amen

  People are definitely using the offset commit functionality in 0.8.1

 agreed

  I really think we should treat this as a bug and revert the change to
 version 0.

 What do you mean exactly by revert? Why can't we use version as a feature
 flag and support 0 and 1 at the same time? in the handleOffsetFetch and
 handleOffsetCommit functions that process the request messages just do if
 version == 0 old functionality else if version == 1 new functionality.
 This way everyone works and nothing breaks  =8^)

 /***
  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  http://www.stealth.ly
  Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
 /

 On Wed, Jan 7, 2015 at 1:04 PM, Jay Kreps j...@confluent.io wrote:

  Hey guys,
 
  We need to take the versioning of the protocol seriously. People are
  definitely using the offset commit functionality in 0.8.1 and I really
  think we should treat this as a bug and revert the change to version 0.
 
  -Jay
 
  On Wed, Jan 7, 2015 at 9:24 AM, Jun Rao j...@confluent.io wrote:
 
   Yes, we did make an incompatible change in OffsetCommitRequest in
 0.8.2,
   which is a mistake. The incompatible change was introduced in
 KAFKA-1012
  in
   Mar, 2014 when we added the kafka-based offset management support.
  However,
   we didn't realize that this breaks the wire protocol until much later.
  Now,
   the wire protocol has evolved again and it's a bit hard to fix the
 format
   in version 0. I can see a couple of options.
  
   Option 1: Just accept the incompatible change as it is.
   The argument is that even though we introduced OffsetCommitRequest in
   0.8.1, it's not used in the high level consumer. It's possible that
 some
   users of SimpleConsumer started using it. However, that number is
 likely
   small. Also, the functionality of OffsetCommitRequest has changed since
   it's writing the offset to a Kafka log, instead of ZK (for good
 reasons).
   So, we can document this as a wire protocol and functionality
  incompatible
   change. For users who don't mind the functionality change, they will
 need
   to upgrade the client to the new protocol before they can use the new
   broker. For users who want to preserve the old functionality, they will
   have to write the offsets directly to ZK. In either case, hopefully the
   number of people being affected is small.
  
   Option 2: Revert version 0 format to what's in 0.8.1.
   There will be a few issues here. First, it's not clear how this affects
   other people who have been deploying from trunk. Second, I am not sure
  that
   we want to continue supporting writing the offset to ZK in
   OffsetCommitRequest
   since that can cause ZK to be overloaded.
  
   Joel Koshy,
  
   Any thoughts on this?
  
   Thanks,
  
   Jun
  
   On Mon, Jan 5, 2015 at 11:39 PM, Joe Stein joe.st...@stealth.ly
 wrote:
  
In addition to the issue you bring up, the functionality as a whole
 has
changed.. when you call OffsetFetchRequest the version = 0 needs to
preserve the old functionality
   
   
  
 
 https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/server/KafkaApis.scala#L678-L700
and version = 1 the new
   
   
  
 
 https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/KafkaApis.scala#L153-L223
.
Also the OffsetFetchRequest functionality even though the wire
 protocol
   is
the same after the 0.8.2 upgrade for OffsetFetchRequest if you were
  using
0.8.1.1 OffsetFetchRequest
   
   
  
 
 https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/server/KafkaApis.scala#L705-L728
will stop going to zookeeper and start going to Kafka storage
   
   
  
 
 https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/KafkaApis.scala#L504-L519
so more errors will happen and things break too.
   
I think we should treat the version field not just to stop from
  breaking
the wire protocol calls but also as a feature flag preserving
  upgrades
and multiple pathways.
   
I updated the JIRA for the feature flag needs for OffsetFetch and
OffsetCommit too.
   
/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
/
   
On Mon, Jan 5, 2015 at 3:21 PM, Dana Powers dana.pow...@rd.io
 wrote:
   
 ok, opened KAFKA-1841

Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-07 Thread Jun Rao
Yes, we did make an incompatible change in OffsetCommitRequest in 0.8.2,
which is a mistake. The incompatible change was introduced in KAFKA-1012 in
Mar, 2014 when we added the kafka-based offset management support. However,
we didn't realize that this breaks the wire protocol until much later. Now,
the wire protocol has evolved again and it's a bit hard to fix the format
in version 0. I can see a couple of options.

Option 1: Just accept the incompatible change as it is.
The argument is that even though we introduced OffsetCommitRequest in
0.8.1, it's not used in the high level consumer. It's possible that some
users of SimpleConsumer started using it. However, that number is likely
small. Also, the functionality of OffsetCommitRequest has changed since
it's writing the offset to a Kafka log, instead of ZK (for good reasons).
So, we can document this as a wire protocol and functionality incompatible
change. For users who don't mind the functionality change, they will need
to upgrade the client to the new protocol before they can use the new
broker. For users who want to preserve the old functionality, they will
have to write the offsets directly to ZK. In either case, hopefully the
number of people being affected is small.

Option 2: Revert version 0 format to what's in 0.8.1.
There will be a few issues here. First, it's not clear how this affects
other people who have been deploying from trunk. Second, I am not sure that
we want to continue supporting writing the offset to ZK in OffsetCommitRequest
since that can cause ZK to be overloaded.

Joel Koshy,

Any thoughts on this?

Thanks,

Jun

On Mon, Jan 5, 2015 at 11:39 PM, Joe Stein joe.st...@stealth.ly wrote:

 In addition to the issue you bring up, the functionality as a whole has
 changed.. when you call OffsetFetchRequest the version = 0 needs to
 preserve the old functionality

 https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/server/KafkaApis.scala#L678-L700
 and version = 1 the new

 https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/KafkaApis.scala#L153-L223
 .
 Also the OffsetFetchRequest functionality even though the wire protocol is
 the same after the 0.8.2 upgrade for OffsetFetchRequest if you were using
 0.8.1.1 OffsetFetchRequest

 https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/server/KafkaApis.scala#L705-L728
 will stop going to zookeeper and start going to Kafka storage

 https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/KafkaApis.scala#L504-L519
 so more errors will happen and things break too.

 I think we should treat the version field not just to stop from breaking
 the wire protocol calls but also as a feature flag preserving upgrades
 and multiple pathways.

 I updated the JIRA for the feature flag needs for OffsetFetch and
 OffsetCommit too.

 /***
  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  http://www.stealth.ly
  Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
 /

 On Mon, Jan 5, 2015 at 3:21 PM, Dana Powers dana.pow...@rd.io wrote:

  ok, opened KAFKA-1841 .  KAFKA-1634 also related.
 
  -Dana
 
  On Mon, Jan 5, 2015 at 10:55 AM, Gwen Shapira gshap...@cloudera.com
  wrote:
 
   Ooh, I see what you mean - the OffsetAndMetadata (or PartitionData)
   part of the Map changed, which will modify the wire protocol.
  
   This is actually not handled in the Java client either. It will send
   the timestamp no matter which version is used.
  
   This looks like a bug and I'd even mark it as blocker for 0.8.2 since
   it may prevent rolling upgrades.
  
   Are you opening the JIRA?
  
   Gwen
  
   On Mon, Jan 5, 2015 at 10:28 AM, Dana Powers dana.pow...@rd.io
 wrote:
specifically comparing 0.8.1 --
   
   
  
 
 https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L37-L50
```
(1 to partitionCount).map(_ = {
  val partitionId = buffer.getInt
  val offset = buffer.getLong
  val metadata = readShortString(buffer)
  (TopicAndPartition(topic, partitionId),
  OffsetMetadataAndError(offset,
metadata))
})
```
   
totrunk --
   
   
  
 
 https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L44-L69
```
(1 to partitionCount).map(_ = {
  val partitionId = buffer.getInt
  val offset = buffer.getLong
  val timestamp = {
val given = buffer.getLong
if (given == -1L) now else given
  }
  val metadata = readShortString(buffer)
  (TopicAndPartition(topic, partitionId), OffsetAndMetadata(offset,
metadata, timestamp))
})
```
   
should the `timestamp` buffer read be wrapped in an api version
 check?
   
   
Dana Powers
Rdio, Inc.
dana.pow...@rd.io
rdio.com/people/dpkp/
   
On Mon, Jan 5, 2015 at 9:49 AM, Gwen

Offset management in multi-threaded high-level consumer

2015-01-06 Thread Rafi Shamim
Hello,

I would like to write a multi-threaded consumer for the high-level
consumer in Kafka 0.8.1. I have found two ways that seem feasible
while keeping the guarantee that messages in a partition are processed
in order. I would appreciate any feedback this list has.

Option 1

- Create multiple threads, so each thread has its own ConsumerConnector.
- Manually commit offsets in each thread after every N messages.
- This was discussed a bit on this list previously. See [1].

### Questions
- Is there a problem with making multiple ConsumerConnectors per machine?
- What does it take for ZooKeeper to handle this much load? We have a
3-node ZooKeeper cluster with relatively small machines. (I expect the
topic will have about 40 messages per second. There will be 3 consumer
groups. That would be 120 commits per second at most, but I can reduce
the frequency of commits to make this lower.)

### Extra info
Kafka 0.9 will have an entirely different commit API, which will allow
one connection to commit offsets per partition, but I can’t wait that
long. See [2].


Option 2

- Create one ConsumerConnector, but ask for multiple streams in that
connection. Give each thread one stream.
- Since there is no way to commit offsets per stream right now, we
need to do autoCommit.
- This sacrifices the at-least-once processing guarantee, which would
be nice to have. See KAFKA-1612 [3].

### Extra info
- There was some discussion in KAFKA-996 about a markForCommit()
method so that autoCommit would preserve the at-least-once guarantee,
but it seems more likely that the consumer API will just be redesigned
to allow commits per partition instead. See [4].


So basically I'm wondering if option 1 is feasible. If not, I'll just
do option 2. Of course, let me know if I was mistaken about anything
or if there is another design which is better.

Thanks in advance.
Rafi

[1] 
http://mail-archives.apache.org/mod_mbox/kafka-users/201310.mbox/%3cff142f6b499ae34caed4d263f6ca32901d35a...@extxmb19.nam.nsroot.net%3E
[2] 
https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ConsumerAPI
[3] https://issues.apache.org/jira/browse/KAFKA-1612
[4] https://issues.apache.org/jira/browse/KAFKA-966


Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-05 Thread Gwen Shapira
OffsetCommitRequest has two constructors now:

For version 0:
 OffsetCommitRequest(String groupId, MapTopicPartition,
PartitionData offsetData)

And version 1:
OffsetCommitRequest(String groupId, int generationId, String
consumerId, MapTopicPartition, PartitionData offsetData)

None of them seem to require timestamps... so I'm not sure where you
see that this is required. Can you share an example?

Gwen

On Sun, Jan 4, 2015 at 11:15 PM, Dana Powers dana.pow...@rd.io wrote:
 Hi Joel,

 I'm looking more closely at the OffsetCommitRequest wire protocol change
 you mentioned below, and I cannot figure out how to explicitly construct a
 request with the earlier version.  Should the api version be different for
 requests that do not include it and/or servers that do not support the
 timestamp field?  It looks like 0.8.1.1 did not include the timestamp field
 and used api version 0.  But 0.8.2-beta seems to now require timestamps
 even when I explicitly encode OffsetCommitRequest api version 0 (server
 logs a BufferUnderflowException).

 Is this the expected server behavior?  Can you provide any tips on how
 third-party clients should manage the wire-protocol change for this api
 method (I'm working on kafka-python)?

 Thanks,

 -Dana

 On Tue, Nov 18, 2014 at 1:27 PM, Joel Koshy jjkosh...@gmail.com wrote:

 Yes it should be backwards compatible. So for e.g., you should be able
 to use an 0.8.1 client with an 0.8.2 broker. In general, you should
 not upgrade your clients until after the brokers have been upgraded.
 However, you can point an 0.8.2 client at an 0.8.1 broker. One wire
 protocol change I'm aware of is the OffsetCommitRequest.  There is a
 change in the OffsetCommitRequest format (KAFKA-1634) although you can
 explicitly construct an OffsetCommitRequest with the earlier version.

 Thanks,

 Joel

 On Tue, Nov 18, 2014 at 03:39:41PM -0500, Marius Bogoevici wrote:
  Hi Joel,
 
  Thanks for all the clarifications!  Just another question on this: will
  0.8.2 be backwards compatible with 0.8.1, just as 0.8.1 was with 0.8?
  Generally speaking, would there be any concerns with using the 0.8.2
  consumer with a 0.8.1 broker, for instance?
 
  Marius
 
  On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy jjkosh...@gmail.com wrote:
 
   Inline..
  
   On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici wrote:
Hello everyone,
   
I have a few questions about the current status and future of the
 Kafka
consumers.
   
We have been working to adding Kafka support in Spring XD [1],
 currently
using the high level consumer via Spring Integration Kafka [2]. We
 are
working on adding features such as:
- the ability to control offsets/replay topics;
- the ability to control partition allocation across multiple
 consumers;
   
We are currently at version 0.8.1.1, so using the simple consumer is
 a
pretty straightforward choice right now. However, in the light of the
upcoming consumer changes for 0.8.2 and 0.9, I have a few questions:
   
1) With respect to the consumer redesign for 0.9, what is the future
 of
   the
Simple Consumer and High Level Consumer? To my best understanding,
 the
existing high level consumer API will be deprecated in favour of the
 new
consumer API. What is the future of the Simple Consumer, in this
 case? it
will continue to exist as a low-level API implementing the Kafka
 protocol
[3] and providing the building blocks for the new consumer, or will
 it be
deprecated as well?
  
   The new consumer will subsume both use-cases (simple and high-level).
   You can still use the old SimpleConsumer if you wish - i.e., the wire
   protocol for fetch and other requests will still be supported.
  
   
2) Regarding the new consumer: the v0.8.2 codebase contains an early
implementation of it, but since this a feature scheduled only for
 0.9,
   what
is its status as well? Is it included only as a future reference and
 for
stabilizing the API?
  
   It is a WIP so you cannot really use it.
  
3) Obviously, offset management is a concern if using the simple
   consumer,
so - wondering about the Offset Management API as well. The Kafka
   protocol
document specifically indicates that it will be fully functional in
 0.8.2
[4] - however, a functional implementation is already available in
   0.8.1.1
(accessible via the SimpleConsumer API but not documented in [5]).
 Again,
trying to understand the extent of what 0.8.1.1 already supports
(ostensibly, the offset manager support seems to have been added
 only in
0.8.2 - please correct me if I am wrong), and whether if it is
   recommended
for use in production in any form (with the caveats that accompany
 the
   use
of ZooKeeper).
  
   In 0.8.2 the OffsetCommitRequest and OffsetFetchRequest will use Kafka
   as the offsets storage mechanism (not zookeeper). High-level Java
   consumers can choose to store offsets in ZooKeeper instead

Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-05 Thread Dana Powers
Hi Gwen, I am using/writing kafka-python to construct api requests and have
not dug too deeply into the server source code.  But I believe it is
kafka/api/OffsetCommitRequest.scala and specifically the readFrom method
used to decode the wire protocol.

-Dana
OffsetCommitRequest has two constructors now:

For version 0:
 OffsetCommitRequest(String groupId, MapTopicPartition,
PartitionData offsetData)

And version 1:
OffsetCommitRequest(String groupId, int generationId, String
consumerId, MapTopicPartition, PartitionData offsetData)

None of them seem to require timestamps... so I'm not sure where you
see that this is required. Can you share an example?

Gwen

On Sun, Jan 4, 2015 at 11:15 PM, Dana Powers dana.pow...@rd.io wrote:
 Hi Joel,

 I'm looking more closely at the OffsetCommitRequest wire protocol change
 you mentioned below, and I cannot figure out how to explicitly construct a
 request with the earlier version.  Should the api version be different for
 requests that do not include it and/or servers that do not support the
 timestamp field?  It looks like 0.8.1.1 did not include the timestamp
field
 and used api version 0.  But 0.8.2-beta seems to now require timestamps
 even when I explicitly encode OffsetCommitRequest api version 0 (server
 logs a BufferUnderflowException).

 Is this the expected server behavior?  Can you provide any tips on how
 third-party clients should manage the wire-protocol change for this api
 method (I'm working on kafka-python)?

 Thanks,

 -Dana

 On Tue, Nov 18, 2014 at 1:27 PM, Joel Koshy jjkosh...@gmail.com wrote:

 Yes it should be backwards compatible. So for e.g., you should be able
 to use an 0.8.1 client with an 0.8.2 broker. In general, you should
 not upgrade your clients until after the brokers have been upgraded.
 However, you can point an 0.8.2 client at an 0.8.1 broker. One wire
 protocol change I'm aware of is the OffsetCommitRequest.  There is a
 change in the OffsetCommitRequest format (KAFKA-1634) although you can
 explicitly construct an OffsetCommitRequest with the earlier version.

 Thanks,

 Joel

 On Tue, Nov 18, 2014 at 03:39:41PM -0500, Marius Bogoevici wrote:
  Hi Joel,
 
  Thanks for all the clarifications!  Just another question on this: will
  0.8.2 be backwards compatible with 0.8.1, just as 0.8.1 was with 0.8?
  Generally speaking, would there be any concerns with using the 0.8.2
  consumer with a 0.8.1 broker, for instance?
 
  Marius
 
  On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy jjkosh...@gmail.com
wrote:
 
   Inline..
  
   On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici wrote:
Hello everyone,
   
I have a few questions about the current status and future of the
 Kafka
consumers.
   
We have been working to adding Kafka support in Spring XD [1],
 currently
using the high level consumer via Spring Integration Kafka [2]. We
 are
working on adding features such as:
- the ability to control offsets/replay topics;
- the ability to control partition allocation across multiple
 consumers;
   
We are currently at version 0.8.1.1, so using the simple consumer
is
 a
pretty straightforward choice right now. However, in the light of
the
upcoming consumer changes for 0.8.2 and 0.9, I have a few
questions:
   
1) With respect to the consumer redesign for 0.9, what is the
future
 of
   the
Simple Consumer and High Level Consumer? To my best understanding,
 the
existing high level consumer API will be deprecated in favour of
the
 new
consumer API. What is the future of the Simple Consumer, in this
 case? it
will continue to exist as a low-level API implementing the Kafka
 protocol
[3] and providing the building blocks for the new consumer, or will
 it be
deprecated as well?
  
   The new consumer will subsume both use-cases (simple and high-level).
   You can still use the old SimpleConsumer if you wish - i.e., the wire
   protocol for fetch and other requests will still be supported.
  
   
2) Regarding the new consumer: the v0.8.2 codebase contains an
early
implementation of it, but since this a feature scheduled only for
 0.9,
   what
is its status as well? Is it included only as a future reference
and
 for
stabilizing the API?
  
   It is a WIP so you cannot really use it.
  
3) Obviously, offset management is a concern if using the simple
   consumer,
so - wondering about the Offset Management API as well. The Kafka
   protocol
document specifically indicates that it will be fully functional in
 0.8.2
[4] - however, a functional implementation is already available in
   0.8.1.1
(accessible via the SimpleConsumer API but not documented in [5]).
 Again,
trying to understand the extent of what 0.8.1.1 already supports
(ostensibly, the offset manager support seems to have been added
 only in
0.8.2 - please correct me if I am wrong), and whether if it is
   recommended
for use in production in any form

Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-05 Thread Gwen Shapira
Ah, I see :)

The readFrom function basically tries to read two extra fields if you
are on version 1:

if (versionId == 1) {
  groupGenerationId = buffer.getInt
  consumerId = readShortString(buffer)
}

The rest looks identical in version 0 and 1, and still no timestamp in sight...

Gwen

On Mon, Jan 5, 2015 at 9:33 AM, Dana Powers dana.pow...@rd.io wrote:
 Hi Gwen, I am using/writing kafka-python to construct api requests and have
 not dug too deeply into the server source code.  But I believe it is
 kafka/api/OffsetCommitRequest.scala and specifically the readFrom method
 used to decode the wire protocol.

 -Dana
 OffsetCommitRequest has two constructors now:

 For version 0:
  OffsetCommitRequest(String groupId, MapTopicPartition,
 PartitionData offsetData)

 And version 1:
 OffsetCommitRequest(String groupId, int generationId, String
 consumerId, MapTopicPartition, PartitionData offsetData)

 None of them seem to require timestamps... so I'm not sure where you
 see that this is required. Can you share an example?

 Gwen

 On Sun, Jan 4, 2015 at 11:15 PM, Dana Powers dana.pow...@rd.io wrote:
 Hi Joel,

 I'm looking more closely at the OffsetCommitRequest wire protocol change
 you mentioned below, and I cannot figure out how to explicitly construct a
 request with the earlier version.  Should the api version be different for
 requests that do not include it and/or servers that do not support the
 timestamp field?  It looks like 0.8.1.1 did not include the timestamp
 field
 and used api version 0.  But 0.8.2-beta seems to now require timestamps
 even when I explicitly encode OffsetCommitRequest api version 0 (server
 logs a BufferUnderflowException).

 Is this the expected server behavior?  Can you provide any tips on how
 third-party clients should manage the wire-protocol change for this api
 method (I'm working on kafka-python)?

 Thanks,

 -Dana

 On Tue, Nov 18, 2014 at 1:27 PM, Joel Koshy jjkosh...@gmail.com wrote:

 Yes it should be backwards compatible. So for e.g., you should be able
 to use an 0.8.1 client with an 0.8.2 broker. In general, you should
 not upgrade your clients until after the brokers have been upgraded.
 However, you can point an 0.8.2 client at an 0.8.1 broker. One wire
 protocol change I'm aware of is the OffsetCommitRequest.  There is a
 change in the OffsetCommitRequest format (KAFKA-1634) although you can
 explicitly construct an OffsetCommitRequest with the earlier version.

 Thanks,

 Joel

 On Tue, Nov 18, 2014 at 03:39:41PM -0500, Marius Bogoevici wrote:
  Hi Joel,
 
  Thanks for all the clarifications!  Just another question on this: will
  0.8.2 be backwards compatible with 0.8.1, just as 0.8.1 was with 0.8?
  Generally speaking, would there be any concerns with using the 0.8.2
  consumer with a 0.8.1 broker, for instance?
 
  Marius
 
  On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy jjkosh...@gmail.com
 wrote:
 
   Inline..
  
   On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici wrote:
Hello everyone,
   
I have a few questions about the current status and future of the
 Kafka
consumers.
   
We have been working to adding Kafka support in Spring XD [1],
 currently
using the high level consumer via Spring Integration Kafka [2]. We
 are
working on adding features such as:
- the ability to control offsets/replay topics;
- the ability to control partition allocation across multiple
 consumers;
   
We are currently at version 0.8.1.1, so using the simple consumer
 is
 a
pretty straightforward choice right now. However, in the light of
 the
upcoming consumer changes for 0.8.2 and 0.9, I have a few
 questions:
   
1) With respect to the consumer redesign for 0.9, what is the
 future
 of
   the
Simple Consumer and High Level Consumer? To my best understanding,
 the
existing high level consumer API will be deprecated in favour of
 the
 new
consumer API. What is the future of the Simple Consumer, in this
 case? it
will continue to exist as a low-level API implementing the Kafka
 protocol
[3] and providing the building blocks for the new consumer, or will
 it be
deprecated as well?
  
   The new consumer will subsume both use-cases (simple and high-level).
   You can still use the old SimpleConsumer if you wish - i.e., the wire
   protocol for fetch and other requests will still be supported.
  
   
2) Regarding the new consumer: the v0.8.2 codebase contains an
 early
implementation of it, but since this a feature scheduled only for
 0.9,
   what
is its status as well? Is it included only as a future reference
 and
 for
stabilizing the API?
  
   It is a WIP so you cannot really use it.
  
3) Obviously, offset management is a concern if using the simple
   consumer,
so - wondering about the Offset Management API as well. The Kafka
   protocol
document specifically indicates that it will be fully functional in
 0.8.2
[4] - however, a functional implementation

Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-05 Thread Dana Powers
 of
 the
   Kafka
  consumers.
 
  We have been working to adding Kafka support in Spring XD [1],
   currently
  using the high level consumer via Spring Integration Kafka
 [2].
  We
   are
  working on adding features such as:
  - the ability to control offsets/replay topics;
  - the ability to control partition allocation across multiple
   consumers;
 
  We are currently at version 0.8.1.1, so using the simple
 consumer
   is
   a
  pretty straightforward choice right now. However, in the
 light of
   the
  upcoming consumer changes for 0.8.2 and 0.9, I have a few
   questions:
 
  1) With respect to the consumer redesign for 0.9, what is the
   future
   of
 the
  Simple Consumer and High Level Consumer? To my best
  understanding,
   the
  existing high level consumer API will be deprecated in favour
 of
   the
   new
  consumer API. What is the future of the Simple Consumer, in
 this
   case? it
  will continue to exist as a low-level API implementing the
 Kafka
   protocol
  [3] and providing the building blocks for the new consumer, or
  will
   it be
  deprecated as well?

 The new consumer will subsume both use-cases (simple and
  high-level).
 You can still use the old SimpleConsumer if you wish - i.e., the
  wire
 protocol for fetch and other requests will still be supported.

 
  2) Regarding the new consumer: the v0.8.2 codebase contains an
   early
  implementation of it, but since this a feature scheduled only
 for
   0.9,
 what
  is its status as well? Is it included only as a future
 reference
   and
   for
  stabilizing the API?

 It is a WIP so you cannot really use it.

  3) Obviously, offset management is a concern if using the
 simple
 consumer,
  so - wondering about the Offset Management API as well. The
 Kafka
 protocol
  document specifically indicates that it will be fully
 functional
  in
   0.8.2
  [4] - however, a functional implementation is already
 available
  in
 0.8.1.1
  (accessible via the SimpleConsumer API but not documented in
  [5]).
   Again,
  trying to understand the extent of what 0.8.1.1 already
 supports
  (ostensibly, the offset manager support seems to have been
 added
   only in
  0.8.2 - please correct me if I am wrong), and whether if it is
 recommended
  for use in production in any form (with the caveats that
  accompany
   the
 use
  of ZooKeeper).

 In 0.8.2 the OffsetCommitRequest and OffsetFetchRequest will use
   Kafka
 as the offsets storage mechanism (not zookeeper). High-level
 Java
 consumers can choose to store offsets in ZooKeeper instead by
  setting
 offsets.storage=zookeeper

 However, if you are using the simple consumer and wish to store
 offsets in ZooKeeper you will need to commit to ZooKeeper
 directly.
 You can use ZkUtils in the kafka.utils package for this.

 If you wish to move to Kafka-based offsets we will be adding a
 new
 OffsetsClient that can be used to commit/fetch offsets to/from
  Kafka.
 This is currently not listed as a blocker for 0.8.2 but I think
 we
 should include it. I will update that ticket.

  4) Trying to interpret the existing examples in [6] and the
   comments
   on
 [7]
  - the version of the Offset Management API that exists in
 0.8.1.1
   is
 using
  ZooKeeper - whereas ZooKeeper will be optional in 0.8.2 - to
 be
   replaced
 by
  Kafka, and phased out if possible. To my understanding, the
  switch
 between
  the two will be controlled by the broker configuration (along
  with
   other
  parameters that control the performance of offset queues. Is
 that
 correct?

 The switch is a client-side configuration. That wiki is not
 up-to-date. The most current documentation is available as a
 patch
  in
 https://issues.apache.org/jira/browse/KAFKA-1729

  5) Also, wondering about the timeline of 0.8.2 - according to
 the
 roadmaps
  it should be released relatively shortly. Is that correct?

 Yes - once the blockers are ironed out.

 
  Thanks,
  Marius
 
  [1] http://projects.spring.io/spring-xd/
  [2]
 https://github.com/spring-projects/spring-integration-kafka
  [3]
 

  
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
  [4]
 

  
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
  [5]
   http://kafka.apache.org/082/documentation.html#simpleconsumerapi
  [6]
 

  
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
  [7] https://issues.apache.org/jira/browse/KAFKA-1729


  
  
 



Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-05 Thread Dana Powers
,
  the
 existing high level consumer API will be deprecated in favour of
  the
  new
 consumer API. What is the future of the Simple Consumer, in this
  case? it
 will continue to exist as a low-level API implementing the Kafka
  protocol
 [3] and providing the building blocks for the new consumer, or
 will
  it be
 deprecated as well?
   
The new consumer will subsume both use-cases (simple and
 high-level).
You can still use the old SimpleConsumer if you wish - i.e., the
 wire
protocol for fetch and other requests will still be supported.
   

 2) Regarding the new consumer: the v0.8.2 codebase contains an
  early
 implementation of it, but since this a feature scheduled only for
  0.9,
what
 is its status as well? Is it included only as a future reference
  and
  for
 stabilizing the API?
   
It is a WIP so you cannot really use it.
   
 3) Obviously, offset management is a concern if using the simple
consumer,
 so - wondering about the Offset Management API as well. The Kafka
protocol
 document specifically indicates that it will be fully functional
 in
  0.8.2
 [4] - however, a functional implementation is already available
 in
0.8.1.1
 (accessible via the SimpleConsumer API but not documented in
 [5]).
  Again,
 trying to understand the extent of what 0.8.1.1 already supports
 (ostensibly, the offset manager support seems to have been added
  only in
 0.8.2 - please correct me if I am wrong), and whether if it is
recommended
 for use in production in any form (with the caveats that
 accompany
  the
use
 of ZooKeeper).
   
In 0.8.2 the OffsetCommitRequest and OffsetFetchRequest will use
  Kafka
as the offsets storage mechanism (not zookeeper). High-level Java
consumers can choose to store offsets in ZooKeeper instead by
 setting
offsets.storage=zookeeper
   
However, if you are using the simple consumer and wish to store
offsets in ZooKeeper you will need to commit to ZooKeeper directly.
You can use ZkUtils in the kafka.utils package for this.
   
If you wish to move to Kafka-based offsets we will be adding a new
OffsetsClient that can be used to commit/fetch offsets to/from
 Kafka.
This is currently not listed as a blocker for 0.8.2 but I think we
should include it. I will update that ticket.
   
 4) Trying to interpret the existing examples in [6] and the
  comments
  on
[7]
 - the version of the Offset Management API that exists in 0.8.1.1
  is
using
 ZooKeeper - whereas ZooKeeper will be optional in 0.8.2 - to be
  replaced
by
 Kafka, and phased out if possible. To my understanding, the
 switch
between
 the two will be controlled by the broker configuration (along
 with
  other
 parameters that control the performance of offset queues. Is that
correct?
   
The switch is a client-side configuration. That wiki is not
up-to-date. The most current documentation is available as a patch
 in
https://issues.apache.org/jira/browse/KAFKA-1729
   
 5) Also, wondering about the timeline of 0.8.2 - according to the
roadmaps
 it should be released relatively shortly. Is that correct?
   
Yes - once the blockers are ironed out.
   

 Thanks,
 Marius

 [1] http://projects.spring.io/spring-xd/
 [2] https://github.com/spring-projects/spring-integration-kafka
 [3]

   
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
 [4]

   
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
 [5]
  http://kafka.apache.org/082/documentation.html#simpleconsumerapi
 [6]

   
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
 [7] https://issues.apache.org/jira/browse/KAFKA-1729
   
   
 
 



Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-05 Thread Gwen Shapira
;
 - the ability to control partition allocation across multiple
  consumers;

 We are currently at version 0.8.1.1, so using the simple consumer
  is
  a
 pretty straightforward choice right now. However, in the light of
  the
 upcoming consumer changes for 0.8.2 and 0.9, I have a few
  questions:

 1) With respect to the consumer redesign for 0.9, what is the
  future
  of
the
 Simple Consumer and High Level Consumer? To my best
 understanding,
  the
 existing high level consumer API will be deprecated in favour of
  the
  new
 consumer API. What is the future of the Simple Consumer, in this
  case? it
 will continue to exist as a low-level API implementing the Kafka
  protocol
 [3] and providing the building blocks for the new consumer, or
 will
  it be
 deprecated as well?
   
The new consumer will subsume both use-cases (simple and
 high-level).
You can still use the old SimpleConsumer if you wish - i.e., the
 wire
protocol for fetch and other requests will still be supported.
   

 2) Regarding the new consumer: the v0.8.2 codebase contains an
  early
 implementation of it, but since this a feature scheduled only for
  0.9,
what
 is its status as well? Is it included only as a future reference
  and
  for
 stabilizing the API?
   
It is a WIP so you cannot really use it.
   
 3) Obviously, offset management is a concern if using the simple
consumer,
 so - wondering about the Offset Management API as well. The Kafka
protocol
 document specifically indicates that it will be fully functional
 in
  0.8.2
 [4] - however, a functional implementation is already available
 in
0.8.1.1
 (accessible via the SimpleConsumer API but not documented in
 [5]).
  Again,
 trying to understand the extent of what 0.8.1.1 already supports
 (ostensibly, the offset manager support seems to have been added
  only in
 0.8.2 - please correct me if I am wrong), and whether if it is
recommended
 for use in production in any form (with the caveats that
 accompany
  the
use
 of ZooKeeper).
   
In 0.8.2 the OffsetCommitRequest and OffsetFetchRequest will use
  Kafka
as the offsets storage mechanism (not zookeeper). High-level Java
consumers can choose to store offsets in ZooKeeper instead by
 setting
offsets.storage=zookeeper
   
However, if you are using the simple consumer and wish to store
offsets in ZooKeeper you will need to commit to ZooKeeper directly.
You can use ZkUtils in the kafka.utils package for this.
   
If you wish to move to Kafka-based offsets we will be adding a new
OffsetsClient that can be used to commit/fetch offsets to/from
 Kafka.
This is currently not listed as a blocker for 0.8.2 but I think we
should include it. I will update that ticket.
   
 4) Trying to interpret the existing examples in [6] and the
  comments
  on
[7]
 - the version of the Offset Management API that exists in 0.8.1.1
  is
using
 ZooKeeper - whereas ZooKeeper will be optional in 0.8.2 - to be
  replaced
by
 Kafka, and phased out if possible. To my understanding, the
 switch
between
 the two will be controlled by the broker configuration (along
 with
  other
 parameters that control the performance of offset queues. Is that
correct?
   
The switch is a client-side configuration. That wiki is not
up-to-date. The most current documentation is available as a patch
 in
https://issues.apache.org/jira/browse/KAFKA-1729
   
 5) Also, wondering about the timeline of 0.8.2 - according to the
roadmaps
 it should be released relatively shortly. Is that correct?
   
Yes - once the blockers are ironed out.
   

 Thanks,
 Marius

 [1] http://projects.spring.io/spring-xd/
 [2] https://github.com/spring-projects/spring-integration-kafka
 [3]

   
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
 [4]

   
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
 [5]
  http://kafka.apache.org/082/documentation.html#simpleconsumerapi
 [6]

   
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
 [7] https://issues.apache.org/jira/browse/KAFKA-1729
   
   
 
 



Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-04 Thread Dana Powers
Hi Joel,

I'm looking more closely at the OffsetCommitRequest wire protocol change
you mentioned below, and I cannot figure out how to explicitly construct a
request with the earlier version.  Should the api version be different for
requests that do not include it and/or servers that do not support the
timestamp field?  It looks like 0.8.1.1 did not include the timestamp field
and used api version 0.  But 0.8.2-beta seems to now require timestamps
even when I explicitly encode OffsetCommitRequest api version 0 (server
logs a BufferUnderflowException).

Is this the expected server behavior?  Can you provide any tips on how
third-party clients should manage the wire-protocol change for this api
method (I'm working on kafka-python)?

Thanks,

-Dana

On Tue, Nov 18, 2014 at 1:27 PM, Joel Koshy jjkosh...@gmail.com wrote:

 Yes it should be backwards compatible. So for e.g., you should be able
 to use an 0.8.1 client with an 0.8.2 broker. In general, you should
 not upgrade your clients until after the brokers have been upgraded.
 However, you can point an 0.8.2 client at an 0.8.1 broker. One wire
 protocol change I'm aware of is the OffsetCommitRequest.  There is a
 change in the OffsetCommitRequest format (KAFKA-1634) although you can
 explicitly construct an OffsetCommitRequest with the earlier version.

 Thanks,

 Joel

 On Tue, Nov 18, 2014 at 03:39:41PM -0500, Marius Bogoevici wrote:
  Hi Joel,
 
  Thanks for all the clarifications!  Just another question on this: will
  0.8.2 be backwards compatible with 0.8.1, just as 0.8.1 was with 0.8?
  Generally speaking, would there be any concerns with using the 0.8.2
  consumer with a 0.8.1 broker, for instance?
 
  Marius
 
  On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy jjkosh...@gmail.com wrote:
 
   Inline..
  
   On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici wrote:
Hello everyone,
   
I have a few questions about the current status and future of the
 Kafka
consumers.
   
We have been working to adding Kafka support in Spring XD [1],
 currently
using the high level consumer via Spring Integration Kafka [2]. We
 are
working on adding features such as:
- the ability to control offsets/replay topics;
- the ability to control partition allocation across multiple
 consumers;
   
We are currently at version 0.8.1.1, so using the simple consumer is
 a
pretty straightforward choice right now. However, in the light of the
upcoming consumer changes for 0.8.2 and 0.9, I have a few questions:
   
1) With respect to the consumer redesign for 0.9, what is the future
 of
   the
Simple Consumer and High Level Consumer? To my best understanding,
 the
existing high level consumer API will be deprecated in favour of the
 new
consumer API. What is the future of the Simple Consumer, in this
 case? it
will continue to exist as a low-level API implementing the Kafka
 protocol
[3] and providing the building blocks for the new consumer, or will
 it be
deprecated as well?
  
   The new consumer will subsume both use-cases (simple and high-level).
   You can still use the old SimpleConsumer if you wish - i.e., the wire
   protocol for fetch and other requests will still be supported.
  
   
2) Regarding the new consumer: the v0.8.2 codebase contains an early
implementation of it, but since this a feature scheduled only for
 0.9,
   what
is its status as well? Is it included only as a future reference and
 for
stabilizing the API?
  
   It is a WIP so you cannot really use it.
  
3) Obviously, offset management is a concern if using the simple
   consumer,
so - wondering about the Offset Management API as well. The Kafka
   protocol
document specifically indicates that it will be fully functional in
 0.8.2
[4] - however, a functional implementation is already available in
   0.8.1.1
(accessible via the SimpleConsumer API but not documented in [5]).
 Again,
trying to understand the extent of what 0.8.1.1 already supports
(ostensibly, the offset manager support seems to have been added
 only in
0.8.2 - please correct me if I am wrong), and whether if it is
   recommended
for use in production in any form (with the caveats that accompany
 the
   use
of ZooKeeper).
  
   In 0.8.2 the OffsetCommitRequest and OffsetFetchRequest will use Kafka
   as the offsets storage mechanism (not zookeeper). High-level Java
   consumers can choose to store offsets in ZooKeeper instead by setting
   offsets.storage=zookeeper
  
   However, if you are using the simple consumer and wish to store
   offsets in ZooKeeper you will need to commit to ZooKeeper directly.
   You can use ZkUtils in the kafka.utils package for this.
  
   If you wish to move to Kafka-based offsets we will be adding a new
   OffsetsClient that can be used to commit/fetch offsets to/from Kafka.
   This is currently not listed as a blocker for 0.8.2 but I think we
   should include it. I will update

Consumer and offset management support in 0.8.2 and 0.9

2014-11-18 Thread Marius Bogoevici
Hello everyone,

I have a few questions about the current status and future of the Kafka
consumers.

We have been working to adding Kafka support in Spring XD [1], currently
using the high level consumer via Spring Integration Kafka [2]. We are
working on adding features such as:
- the ability to control offsets/replay topics;
- the ability to control partition allocation across multiple consumers;

We are currently at version 0.8.1.1, so using the simple consumer is a
pretty straightforward choice right now. However, in the light of the
upcoming consumer changes for 0.8.2 and 0.9, I have a few questions:

1) With respect to the consumer redesign for 0.9, what is the future of the
Simple Consumer and High Level Consumer? To my best understanding, the
existing high level consumer API will be deprecated in favour of the new
consumer API. What is the future of the Simple Consumer, in this case? it
will continue to exist as a low-level API implementing the Kafka protocol
[3] and providing the building blocks for the new consumer, or will it be
deprecated as well?

2) Regarding the new consumer: the v0.8.2 codebase contains an early
implementation of it, but since this a feature scheduled only for 0.9, what
is its status as well? Is it included only as a future reference and for
stabilizing the API?

3) Obviously, offset management is a concern if using the simple consumer,
so - wondering about the Offset Management API as well. The Kafka protocol
document specifically indicates that it will be fully functional in 0.8.2
[4] - however, a functional implementation is already available in 0.8.1.1
(accessible via the SimpleConsumer API but not documented in [5]). Again,
trying to understand the extent of what 0.8.1.1 already supports
(ostensibly, the offset manager support seems to have been added only in
0.8.2 - please correct me if I am wrong), and whether if it is recommended
for use in production in any form (with the caveats that accompany the use
of ZooKeeper).

4) Trying to interpret the existing examples in [6] and the comments on [7]
- the version of the Offset Management API that exists in 0.8.1.1 is using
ZooKeeper - whereas ZooKeeper will be optional in 0.8.2 - to be replaced by
Kafka, and phased out if possible. To my understanding, the switch between
the two will be controlled by the broker configuration (along with other
parameters that control the performance of offset queues. Is that correct?

5) Also, wondering about the timeline of 0.8.2 - according to the roadmaps
it should be released relatively shortly. Is that correct?

Thanks,
Marius

[1] http://projects.spring.io/spring-xd/
[2] https://github.com/spring-projects/spring-integration-kafka
[3]
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
[4]
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
[5] http://kafka.apache.org/082/documentation.html#simpleconsumerapi
[6]
https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
[7] https://issues.apache.org/jira/browse/KAFKA-1729


Re: Consumer and offset management support in 0.8.2 and 0.9

2014-11-18 Thread Joel Koshy
Inline..

On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici wrote:
 Hello everyone,
 
 I have a few questions about the current status and future of the Kafka
 consumers.
 
 We have been working to adding Kafka support in Spring XD [1], currently
 using the high level consumer via Spring Integration Kafka [2]. We are
 working on adding features such as:
 - the ability to control offsets/replay topics;
 - the ability to control partition allocation across multiple consumers;
 
 We are currently at version 0.8.1.1, so using the simple consumer is a
 pretty straightforward choice right now. However, in the light of the
 upcoming consumer changes for 0.8.2 and 0.9, I have a few questions:
 
 1) With respect to the consumer redesign for 0.9, what is the future of the
 Simple Consumer and High Level Consumer? To my best understanding, the
 existing high level consumer API will be deprecated in favour of the new
 consumer API. What is the future of the Simple Consumer, in this case? it
 will continue to exist as a low-level API implementing the Kafka protocol
 [3] and providing the building blocks for the new consumer, or will it be
 deprecated as well?

The new consumer will subsume both use-cases (simple and high-level).
You can still use the old SimpleConsumer if you wish - i.e., the wire
protocol for fetch and other requests will still be supported.

 
 2) Regarding the new consumer: the v0.8.2 codebase contains an early
 implementation of it, but since this a feature scheduled only for 0.9, what
 is its status as well? Is it included only as a future reference and for
 stabilizing the API?

It is a WIP so you cannot really use it.

 3) Obviously, offset management is a concern if using the simple consumer,
 so - wondering about the Offset Management API as well. The Kafka protocol
 document specifically indicates that it will be fully functional in 0.8.2
 [4] - however, a functional implementation is already available in 0.8.1.1
 (accessible via the SimpleConsumer API but not documented in [5]). Again,
 trying to understand the extent of what 0.8.1.1 already supports
 (ostensibly, the offset manager support seems to have been added only in
 0.8.2 - please correct me if I am wrong), and whether if it is recommended
 for use in production in any form (with the caveats that accompany the use
 of ZooKeeper).

In 0.8.2 the OffsetCommitRequest and OffsetFetchRequest will use Kafka
as the offsets storage mechanism (not zookeeper). High-level Java
consumers can choose to store offsets in ZooKeeper instead by setting
offsets.storage=zookeeper

However, if you are using the simple consumer and wish to store
offsets in ZooKeeper you will need to commit to ZooKeeper directly.
You can use ZkUtils in the kafka.utils package for this.

If you wish to move to Kafka-based offsets we will be adding a new
OffsetsClient that can be used to commit/fetch offsets to/from Kafka.
This is currently not listed as a blocker for 0.8.2 but I think we
should include it. I will update that ticket.

 4) Trying to interpret the existing examples in [6] and the comments on [7]
 - the version of the Offset Management API that exists in 0.8.1.1 is using
 ZooKeeper - whereas ZooKeeper will be optional in 0.8.2 - to be replaced by
 Kafka, and phased out if possible. To my understanding, the switch between
 the two will be controlled by the broker configuration (along with other
 parameters that control the performance of offset queues. Is that correct?

The switch is a client-side configuration. That wiki is not
up-to-date. The most current documentation is available as a patch in
https://issues.apache.org/jira/browse/KAFKA-1729

 5) Also, wondering about the timeline of 0.8.2 - according to the roadmaps
 it should be released relatively shortly. Is that correct?

Yes - once the blockers are ironed out.

 
 Thanks,
 Marius
 
 [1] http://projects.spring.io/spring-xd/
 [2] https://github.com/spring-projects/spring-integration-kafka
 [3]
 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
 [4]
 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
 [5] http://kafka.apache.org/082/documentation.html#simpleconsumerapi
 [6]
 https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
 [7] https://issues.apache.org/jira/browse/KAFKA-1729



Re: Consumer and offset management support in 0.8.2 and 0.9

2014-11-18 Thread Marius Bogoevici
Hi Joel,

Thanks for all the clarifications!  Just another question on this: will
0.8.2 be backwards compatible with 0.8.1, just as 0.8.1 was with 0.8?
Generally speaking, would there be any concerns with using the 0.8.2
consumer with a 0.8.1 broker, for instance?

Marius

On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy jjkosh...@gmail.com wrote:

 Inline..

 On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici wrote:
  Hello everyone,
 
  I have a few questions about the current status and future of the Kafka
  consumers.
 
  We have been working to adding Kafka support in Spring XD [1], currently
  using the high level consumer via Spring Integration Kafka [2]. We are
  working on adding features such as:
  - the ability to control offsets/replay topics;
  - the ability to control partition allocation across multiple consumers;
 
  We are currently at version 0.8.1.1, so using the simple consumer is a
  pretty straightforward choice right now. However, in the light of the
  upcoming consumer changes for 0.8.2 and 0.9, I have a few questions:
 
  1) With respect to the consumer redesign for 0.9, what is the future of
 the
  Simple Consumer and High Level Consumer? To my best understanding, the
  existing high level consumer API will be deprecated in favour of the new
  consumer API. What is the future of the Simple Consumer, in this case? it
  will continue to exist as a low-level API implementing the Kafka protocol
  [3] and providing the building blocks for the new consumer, or will it be
  deprecated as well?

 The new consumer will subsume both use-cases (simple and high-level).
 You can still use the old SimpleConsumer if you wish - i.e., the wire
 protocol for fetch and other requests will still be supported.

 
  2) Regarding the new consumer: the v0.8.2 codebase contains an early
  implementation of it, but since this a feature scheduled only for 0.9,
 what
  is its status as well? Is it included only as a future reference and for
  stabilizing the API?

 It is a WIP so you cannot really use it.

  3) Obviously, offset management is a concern if using the simple
 consumer,
  so - wondering about the Offset Management API as well. The Kafka
 protocol
  document specifically indicates that it will be fully functional in 0.8.2
  [4] - however, a functional implementation is already available in
 0.8.1.1
  (accessible via the SimpleConsumer API but not documented in [5]). Again,
  trying to understand the extent of what 0.8.1.1 already supports
  (ostensibly, the offset manager support seems to have been added only in
  0.8.2 - please correct me if I am wrong), and whether if it is
 recommended
  for use in production in any form (with the caveats that accompany the
 use
  of ZooKeeper).

 In 0.8.2 the OffsetCommitRequest and OffsetFetchRequest will use Kafka
 as the offsets storage mechanism (not zookeeper). High-level Java
 consumers can choose to store offsets in ZooKeeper instead by setting
 offsets.storage=zookeeper

 However, if you are using the simple consumer and wish to store
 offsets in ZooKeeper you will need to commit to ZooKeeper directly.
 You can use ZkUtils in the kafka.utils package for this.

 If you wish to move to Kafka-based offsets we will be adding a new
 OffsetsClient that can be used to commit/fetch offsets to/from Kafka.
 This is currently not listed as a blocker for 0.8.2 but I think we
 should include it. I will update that ticket.

  4) Trying to interpret the existing examples in [6] and the comments on
 [7]
  - the version of the Offset Management API that exists in 0.8.1.1 is
 using
  ZooKeeper - whereas ZooKeeper will be optional in 0.8.2 - to be replaced
 by
  Kafka, and phased out if possible. To my understanding, the switch
 between
  the two will be controlled by the broker configuration (along with other
  parameters that control the performance of offset queues. Is that
 correct?

 The switch is a client-side configuration. That wiki is not
 up-to-date. The most current documentation is available as a patch in
 https://issues.apache.org/jira/browse/KAFKA-1729

  5) Also, wondering about the timeline of 0.8.2 - according to the
 roadmaps
  it should be released relatively shortly. Is that correct?

 Yes - once the blockers are ironed out.

 
  Thanks,
  Marius
 
  [1] http://projects.spring.io/spring-xd/
  [2] https://github.com/spring-projects/spring-integration-kafka
  [3]
 
 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
  [4]
 
 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
  [5] http://kafka.apache.org/082/documentation.html#simpleconsumerapi
  [6]
 
 https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
  [7] https://issues.apache.org/jira/browse/KAFKA-1729




Re: Consumer and offset management support in 0.8.2 and 0.9

2014-11-18 Thread Joel Koshy
Yes it should be backwards compatible. So for e.g., you should be able
to use an 0.8.1 client with an 0.8.2 broker. In general, you should
not upgrade your clients until after the brokers have been upgraded.
However, you can point an 0.8.2 client at an 0.8.1 broker. One wire
protocol change I'm aware of is the OffsetCommitRequest.  There is a
change in the OffsetCommitRequest format (KAFKA-1634) although you can
explicitly construct an OffsetCommitRequest with the earlier version.

Thanks,

Joel

On Tue, Nov 18, 2014 at 03:39:41PM -0500, Marius Bogoevici wrote:
 Hi Joel,
 
 Thanks for all the clarifications!  Just another question on this: will
 0.8.2 be backwards compatible with 0.8.1, just as 0.8.1 was with 0.8?
 Generally speaking, would there be any concerns with using the 0.8.2
 consumer with a 0.8.1 broker, for instance?
 
 Marius
 
 On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy jjkosh...@gmail.com wrote:
 
  Inline..
 
  On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici wrote:
   Hello everyone,
  
   I have a few questions about the current status and future of the Kafka
   consumers.
  
   We have been working to adding Kafka support in Spring XD [1], currently
   using the high level consumer via Spring Integration Kafka [2]. We are
   working on adding features such as:
   - the ability to control offsets/replay topics;
   - the ability to control partition allocation across multiple consumers;
  
   We are currently at version 0.8.1.1, so using the simple consumer is a
   pretty straightforward choice right now. However, in the light of the
   upcoming consumer changes for 0.8.2 and 0.9, I have a few questions:
  
   1) With respect to the consumer redesign for 0.9, what is the future of
  the
   Simple Consumer and High Level Consumer? To my best understanding, the
   existing high level consumer API will be deprecated in favour of the new
   consumer API. What is the future of the Simple Consumer, in this case? it
   will continue to exist as a low-level API implementing the Kafka protocol
   [3] and providing the building blocks for the new consumer, or will it be
   deprecated as well?
 
  The new consumer will subsume both use-cases (simple and high-level).
  You can still use the old SimpleConsumer if you wish - i.e., the wire
  protocol for fetch and other requests will still be supported.
 
  
   2) Regarding the new consumer: the v0.8.2 codebase contains an early
   implementation of it, but since this a feature scheduled only for 0.9,
  what
   is its status as well? Is it included only as a future reference and for
   stabilizing the API?
 
  It is a WIP so you cannot really use it.
 
   3) Obviously, offset management is a concern if using the simple
  consumer,
   so - wondering about the Offset Management API as well. The Kafka
  protocol
   document specifically indicates that it will be fully functional in 0.8.2
   [4] - however, a functional implementation is already available in
  0.8.1.1
   (accessible via the SimpleConsumer API but not documented in [5]). Again,
   trying to understand the extent of what 0.8.1.1 already supports
   (ostensibly, the offset manager support seems to have been added only in
   0.8.2 - please correct me if I am wrong), and whether if it is
  recommended
   for use in production in any form (with the caveats that accompany the
  use
   of ZooKeeper).
 
  In 0.8.2 the OffsetCommitRequest and OffsetFetchRequest will use Kafka
  as the offsets storage mechanism (not zookeeper). High-level Java
  consumers can choose to store offsets in ZooKeeper instead by setting
  offsets.storage=zookeeper
 
  However, if you are using the simple consumer and wish to store
  offsets in ZooKeeper you will need to commit to ZooKeeper directly.
  You can use ZkUtils in the kafka.utils package for this.
 
  If you wish to move to Kafka-based offsets we will be adding a new
  OffsetsClient that can be used to commit/fetch offsets to/from Kafka.
  This is currently not listed as a blocker for 0.8.2 but I think we
  should include it. I will update that ticket.
 
   4) Trying to interpret the existing examples in [6] and the comments on
  [7]
   - the version of the Offset Management API that exists in 0.8.1.1 is
  using
   ZooKeeper - whereas ZooKeeper will be optional in 0.8.2 - to be replaced
  by
   Kafka, and phased out if possible. To my understanding, the switch
  between
   the two will be controlled by the broker configuration (along with other
   parameters that control the performance of offset queues. Is that
  correct?
 
  The switch is a client-side configuration. That wiki is not
  up-to-date. The most current documentation is available as a patch in
  https://issues.apache.org/jira/browse/KAFKA-1729
 
   5) Also, wondering about the timeline of 0.8.2 - according to the
  roadmaps
   it should be released relatively shortly. Is that correct?
 
  Yes - once the blockers are ironed out.
 
  
   Thanks,
   Marius
  
   [1] http://projects.spring.io

Manual Partition-wise Offset Management

2013-12-30 Thread Tarang Dawer
Hi All

Can we manage partition-offset commits separately instead of a
consumerConnector.commitOffsets() (commits offsets of all the related
broker partitions simultaneously) call ?


Re: Manual Partition-wise Offset Management

2013-12-30 Thread Joe Stein
Yes take a look at
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Idon'twantmyconsumer'soffsetstobecommittedautomatically.CanImanuallymanagemyconsumer'soffsets
?

/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
/


On Mon, Dec 30, 2013 at 7:58 AM, Tarang Dawer tarang.da...@gmail.comwrote:

 Hi All

 Can we manage partition-offset commits separately instead of a
 consumerConnector.commitOffsets() (commits offsets of all the related
 broker partitions simultaneously) call ?