RE: Simple Consumer and offsets

2015-02-19 Thread Arunkumar Srambikkal (asrambik)
If I may use the same thread to discuss the exact same issue  

Assuming one can store the offset in an external location (redis/db etc), along 
with the rest of the state that a program requires, wouldn't it be possible to 
manage things such that, you use the High Level API with auto commit turned off 
and do your custom offset management followed by the kafka commit api call 
(probably delayed to give a breather to zookeeper)? 

That way in the failure scenario, the high level consumer offset would ALWAYS 
be only smaller than what is actually valid and you can skip forward and avoid 
using the simple consumer.

I assume one needs the simple consumer in the offset management use case, only 
we want to skip back to an older offset / use Kafka for storing offsets? 

I was trying to handle the customer failure scenario but avoiding the simple 
consumer and all the complexities it ensues. 

Does this work or is there anything wrong with this picture? 

Thanks
Arun

On Thu, Feb 19, 2015 at 03:29:19PM +, Suren wrote:
> We are using the High Level Consumer API to interact with Kafka for our 
> normal use cases.
> 
> However, on consumer restart in the case of consumer failures, we want 
> to be able to manually reset offsets in certain situations.
> And ideally we'd like to use the same api in 0.8.1 and 0.8.2. :-) It 
> looked like instantiating a SimpleConsumer just to reset offsets on restart 
> was a viable option, while continuing to use the High Level Consumer for our 
> normal operations. Not sure if there is a better way that is compatible 
> across 0.8.1 and 0.8.2.
> -Suren
>  
> 
>  On Thursday, February 19, 2015 10:25 AM, Joel Koshy 
>  wrote:
>
> 
>  Not sure what you mean by using the SimpleConsumer on failure 
> recovery. Can you elaborate on this?
> 
> On Thu, Feb 19, 2015 at 03:04:47PM +, Suren wrote:
> > Haven't used either one now. Sounds like 0.8.2.1 will help.
> > We are using the High Level Consumer generally but are thinking to use the 
> > SimpleConsumer on failure recovery to set the offsets.
> > Is that the recommended approach for this use case?
> > Thanks.
> > -Suren
> >  
> > 
> >  On Thursday, February 19, 2015 9:40 AM, Joel Koshy 
> >  wrote:
> >
> > 
> >  Are you using it from Java or Scala? i.e., are you using the  
> >javaapi.SimpleConsumer or kafka.consumer.SimpleConsumer
> > 
> > In 0.8.2 javaapi we explicitly set version 0 of the 
> > OffsetCommitRequest/OffsetFetchRequest which means it will 
> > commit/fetch to/from ZooKeeper only. If you use the scala API you 
> > can create an OffsetCommitRequest with version set to 1 (which will 
> > allow you to commit to Kafka).
> > 
> > Since we are doing an 0.8.2.1 release we will make the above more 
> > consistent. i.e., you can create OffsetCommitRequests with version 1 
> > even from the javaapi. I will be updating the documentation on this 
> > to make it clearer.
> > 
> > Thanks,
> > 
> > Joel
> > 
> > On Thu, Feb 19, 2015 at 02:28:32PM +, Suren wrote:
> > > Joel,
> > > Looking at SimpleConsumer in the 0.8.2 code, it is using 
> > > OffsetCommitRequest and sending that over to a broker.
> > > Is the broker storing that in ZK?
> > > -Suren
> > >  
> > > 
> > >  On Tuesday, February 17, 2015 12:22 PM, Joel Koshy 
> > >  wrote:
> > >
> > > 
> > >  Hi Chris,
> > > 
> > > In 0.8.2, the simple consumer Java API supports 
> > > committing/fetching offsets that are stored in ZooKeeper. You 
> > > don't need to issue any ConsumerMetadataRequest for this. 
> > > Unfortunately, the API currently does not support fetching offsets that 
> > > are stored in Kafka.
> > > 
> > > Thanks,
> > > 
> > > Joel
> > > 
> > > On Mon, Feb 16, 2015 at 05:02:08PM -0500, Christopher Piggott wrote:
> > > > Hi,
> > > > 
> > > > I am still using 0.8.1.1 because of the CPU use concerns.
> > > > 
> > > > I'm confused about why the SimpleConsumer has:
> > > > 
> > > > OffsetCommitResponse commitOffsets(OffsetCommitRequest request)
> > > > 
> > > > and
> > > > 
> > > > OffsetFetchResponse fetchOffsets(OffsetFetchRequest request)
> > > > 
> > > > but no way that I can see to issue a ConsumerMetadataRequest, 
> > > > which is what I think when restarting my consumers so that they 
> > > > can begin working where they last left off (in the event that 
> > > > they were stopped for a while then restarted some time later, 
> > > > and new messages had come in).
> > > > 
> > > > The fetchOffsets() works on time, usually it looks like you send 
> > > > it Earliest or Latest (beginning or end of what's currently in 
> > > > the stream).
> > > > 
> > > > I realize the documentation says this:
> > > > 
> > > > 
> > > > > *Downsides of using SimpleConsumer*The SimpleConsumer does require a 
> > > > > significant amount of work not needed in the Consumer Groups:
> > > > >
> > > > >1. You must keep track of the offsets in your application to know 
> > > > > where you left off consuming.
> > > > >
> > > > > But that's not really quit

About Symantec's encryption-thru-Kafka proof of concept

2015-02-19 Thread Jim Hoagland
Hi Folks,

At the recent Kafka Meetup in Mountain View there was interest expressed
about the encryption through Kafka proof of concept that Symantec did a
few months ago, so I have created a blog post with some details about it.
You can find that here:
  http://goo.gl/sjYGWN

Let me know if you have any thoughts or questions.

Thanks,

  Jim

-- 
Jim Hoagland, Ph.D.
Sr. Principal Software Engineer
Big Data Analytics Team
Cloud Platform Engineering
Symantec Corporation
http://cpe.symantec.com



Re: Default MirrorMaker not copying over from source to target

2015-02-19 Thread tao xiao
Looks like you only have 4 messages in your topic and no more messages got
sent

2015-02-19 20:09:34,661] DEBUG initial fetch offset of consolemm:0: fetched
offset = 4: consumed offset = 4 is 4 (kafka.consumer.PartitionTopicInfo

You can try sending more messages to topic or give the MM a different
consumer group id and set auto.offset.reset=smallest

On Friday, February 20, 2015, Alex Melville  wrote:

> Tao,
>
>
> I updated the mirrorconsumer.properties config file as you suggested, and
> upped the MM's log level to DEBUG. I have the output of the DEBUG logger
> here in this pastebin, if you could take a minute to look for anything in
> its contents that would indicate a problem that would be extremely helpful.
> Note that my servers hostnames are of the form ad-010X or ba-0X where X is
> some integer between 1 and 4.
>
> http://pastebin.com/rBsxx15A
>
> When I run the mirrormaker and then spin up a console consumer to read from
> the source cluster, I get 0 messages consumed.
>
>
> Alex
>
> On Sun, Feb 15, 2015 at 3:00 AM, tao xiao  > wrote:
>
> > Alex,
> >
> > Are you sure you have data continually being sent to the topic in source
> > cluster after you bring up MM? By default auto.offset.reset=largest in MM
> > consumer config which means MM only fetches the largest offset if the
> > consumer group has no initial offset in zookeeper.
> >
> > You can have MM print more log by changing the log level in
> > config/tools-log4j.properties
> >
> > On Sun, Feb 15, 2015 at 8:39 AM, Alex Melville  >
> > wrote:
> >
> > > Hi Kafka'ers,
> > >
> > >
> > > I am trying to get the Mirrormaker working with two separate clusters,
> > one
> > > as the source and the other as the target. The topic I'm trying to copy
> > > over exists on both the source and target clusters. Here are the
> relevant
> > > entries in my consumer and producer properties files, which I'm
> > specifying
> > > the command I run to start the MM:
> > >
> > > *mirrorconsumer.properties:*
> > > zookeeper.connect=ad-0104:2181
> > > zookeeper.connection.timeout.ms=6000
> > > group.id=test-consumer-group
> > >
> > >
> > > *mirrorproducer.properties:*
> > > metadata.broker.list=ba-02:9092,ba-03:9092
> > > producer.type=sync
> > > compression.codec=none
> > > serializer.class=kafka.serializer.DefaultEncoder
> > >
> > >
> > > Then I run the following command:
> > > bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config
> > >  ../config/mirrorconsumer.properties --producer.config
> > > ../config/mirrorproducer.properties --whitelist consolemm
> > >
> > > so consolemm is the topic I'm trying to copy over. I've created
> consolemm
> > > and have used to console-consumer to verify that there are messages in
> > the
> > > topic.
> > >
> > > When I run this command... nothing happens. The process keeps running
> and
> > > prints nothing to the Terminal. If I look in the output of the
> zookeeper
> > on
> > > the source cluster I get only the following:
> > >
> > > [2015-02-15 00:34:06,102] INFO Accepted socket connection from /
> > > 10.7.162.75:42819 (org.apache.zookeeper.server.NIOServerCnxnFactory)
> > > [2015-02-15 00:34:06,104] INFO Client attempting to establish new
> session
> > > at /10.7.162.75:42819 (org.apache.zookeeper.server.ZooKeeperServer)
> > > [2015-02-15 00:34:06,106] INFO Established session 0x14b668b0fbe0033
> with
> > > negotiated timeout 6000 for client /10.7.162.75:42819
> > > (org.apache.zookeeper.server.ZooKeeperServer)
> > >
> > >
> > > and when I look at the output of one of the brokers on the source
> > cluster I
> > > get:
> > >
> > > [2015-02-15 00:32:14,382] INFO Closing socket connection to /
> 10.7.162.75
> > .
> > > (kafka.network.Processor)
> > >
> > > and there is no output on the zookeeper on the target cluster.
> > >
> > >
> > >
> > > Any advice on what is causing MM to not properly copy over data to the
> > > target cluster would be extremely helpful.
> > >
> > > -Alex
> > >
> >
> >
> >
> > --
> > Regards,
> > Tao
> >
>


-- 
Regards,
Tao


Re: Simple Consumer and offsets

2015-02-19 Thread Joel Koshy
Yeah that is a good point - will do the update as part of the doc
changes in KAFKA-1729

On Thu, Feb 19, 2015 at 09:26:30PM -0500, Evan Huus wrote:
> On Thu, Feb 19, 2015 at 8:43 PM, Joel Koshy  wrote:
> 
> > If you are using v0 of OffsetCommit/FetchRequest then you can issue
> > that to any broker. For version > 0 you will need to issue it to the
> > coordinator. You can discover the coordinator by sending a
> > ConsumerMetadataRequest to any broker.
> >
> 
> The protocol spec [1] still says "Currently the supported version for all
> APIs is 0". Based on your message above that is no longer true, so could
> somebody familiar with the changes please update the spec appropriately?
> 
> Thanks,
> Evan
> 
> [1]
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> 
> 
> 
> > On Thu, Feb 19, 2015 at 07:55:16PM +, Suren wrote:
> > > Joel/All,
> > > The SimpleConsumer constructor requires a specific host and port.
> > >
> > > Can this be any broker?
> > > If it needs to be a specific broker, for 0.8.2, should this be the
> > offset coordinator? For 0.8.1, does it matter?
> > > -Suren
> > >
> > >
> > >  On Thursday, February 19, 2015 10:43 AM, Joel Koshy <
> > jjkosh...@gmail.com> wrote:
> > >
> > >
> > >  I see - yes, you can use the SimpleConsumer for that. However, your
> > > high-level consumers need to be shutdown while you do that (otherwise
> > > they may auto-commit while you are resetting offsets).
> > >
> > > Thanks,
> > >
> > > Joel
> > >
> > > On Thu, Feb 19, 2015 at 03:29:19PM +, Suren wrote:
> > > > We are using the High Level Consumer API to interact with Kafka for
> > our normal use cases.
> > > >
> > > > However, on consumer restart in the case of consumer failures, we want
> > to be able to manually
> > > > reset offsets in certain situations.
> > > > And ideally we'd like to use the same api in 0.8.1 and 0.8.2. :-)
> > > > It looked like instantiating a SimpleConsumer just to reset offsets on
> > restart was a viable option, while continuing to use the High Level
> > Consumer for our normal operations. Not sure if there is a better way that
> > is compatible across 0.8.1 and 0.8.2.
> > > > -Suren
> > > >
> > > >
> > > >  On Thursday, February 19, 2015 10:25 AM, Joel Koshy <
> > jjkosh...@gmail.com> wrote:
> > > >
> > > >
> > > >  Not sure what you mean by using the SimpleConsumer on failure
> > > > recovery. Can you elaborate on this?
> > > >
> > > > On Thu, Feb 19, 2015 at 03:04:47PM +, Suren wrote:
> > > > > Haven't used either one now. Sounds like 0.8.2.1 will help.
> > > > > We are using the High Level Consumer generally but are thinking to
> > use the SimpleConsumer on failure recovery to set the offsets.
> > > > > Is that the recommended approach for this use case?
> > > > > Thanks.
> > > > > -Suren
> > > > >
> > > > >
> > > > >  On Thursday, February 19, 2015 9:40 AM, Joel Koshy <
> > jjkosh...@gmail.com> wrote:
> > > > >
> > > > >
> > > > >  Are you using it from Java or Scala? i.e., are you using the
> > > > > javaapi.SimpleConsumer or kafka.consumer.SimpleConsumer
> > > > >
> > > > > In 0.8.2 javaapi we explicitly set version 0 of the
> > > > > OffsetCommitRequest/OffsetFetchRequest which means it will
> > > > > commit/fetch to/from ZooKeeper only. If you use the scala API you can
> > > > > create an OffsetCommitRequest with version set to 1 (which will allow
> > > > > you to commit to Kafka).
> > > > >
> > > > > Since we are doing an 0.8.2.1 release we will make the above more
> > > > > consistent. i.e., you can create OffsetCommitRequests with version 1
> > > > > even from the javaapi. I will be updating the documentation on this
> > to
> > > > > make it clearer.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Joel
> > > > >
> > > > > On Thu, Feb 19, 2015 at 02:28:32PM +, Suren wrote:
> > > > > > Joel,
> > > > > > Looking at SimpleConsumer in the 0.8.2 code, it is using
> > OffsetCommitRequest and sending that over to a broker.
> > > > > > Is the broker storing that in ZK?
> > > > > > -Suren
> > > > > >
> > > > > >
> > > > > >  On Tuesday, February 17, 2015 12:22 PM, Joel Koshy <
> > jjkosh...@gmail.com> wrote:
> > > > > >
> > > > > >
> > > > > >  Hi Chris,
> > > > > >
> > > > > > In 0.8.2, the simple consumer Java API supports committing/fetching
> > > > > > offsets that are stored in ZooKeeper. You don't need to issue any
> > > > > > ConsumerMetadataRequest for this. Unfortunately, the API currently
> > > > > > does not support fetching offsets that are stored in Kafka.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Joel
> > > > > >
> > > > > > On Mon, Feb 16, 2015 at 05:02:08PM -0500, Christopher Piggott
> > wrote:
> > > > > > > Hi,
> > > > > > >
> > > > > > > I am still using 0.8.1.1 because of the CPU use concerns.
> > > > > > >
> > > > > > > I'm confused about why the SimpleConsumer has:
> > > > > > >
> > > > > > > OffsetCommitResponse commitOffsets(OffsetCommitRequest request)
> > > > > > >
> > > > > > > a

Re: Simple Consumer and offsets

2015-02-19 Thread Evan Huus
On Thu, Feb 19, 2015 at 8:43 PM, Joel Koshy  wrote:

> If you are using v0 of OffsetCommit/FetchRequest then you can issue
> that to any broker. For version > 0 you will need to issue it to the
> coordinator. You can discover the coordinator by sending a
> ConsumerMetadataRequest to any broker.
>

The protocol spec [1] still says "Currently the supported version for all
APIs is 0". Based on your message above that is no longer true, so could
somebody familiar with the changes please update the spec appropriately?

Thanks,
Evan

[1]
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol



> On Thu, Feb 19, 2015 at 07:55:16PM +, Suren wrote:
> > Joel/All,
> > The SimpleConsumer constructor requires a specific host and port.
> >
> > Can this be any broker?
> > If it needs to be a specific broker, for 0.8.2, should this be the
> offset coordinator? For 0.8.1, does it matter?
> > -Suren
> >
> >
> >  On Thursday, February 19, 2015 10:43 AM, Joel Koshy <
> jjkosh...@gmail.com> wrote:
> >
> >
> >  I see - yes, you can use the SimpleConsumer for that. However, your
> > high-level consumers need to be shutdown while you do that (otherwise
> > they may auto-commit while you are resetting offsets).
> >
> > Thanks,
> >
> > Joel
> >
> > On Thu, Feb 19, 2015 at 03:29:19PM +, Suren wrote:
> > > We are using the High Level Consumer API to interact with Kafka for
> our normal use cases.
> > >
> > > However, on consumer restart in the case of consumer failures, we want
> to be able to manually
> > > reset offsets in certain situations.
> > > And ideally we'd like to use the same api in 0.8.1 and 0.8.2. :-)
> > > It looked like instantiating a SimpleConsumer just to reset offsets on
> restart was a viable option, while continuing to use the High Level
> Consumer for our normal operations. Not sure if there is a better way that
> is compatible across 0.8.1 and 0.8.2.
> > > -Suren
> > >
> > >
> > >  On Thursday, February 19, 2015 10:25 AM, Joel Koshy <
> jjkosh...@gmail.com> wrote:
> > >
> > >
> > >  Not sure what you mean by using the SimpleConsumer on failure
> > > recovery. Can you elaborate on this?
> > >
> > > On Thu, Feb 19, 2015 at 03:04:47PM +, Suren wrote:
> > > > Haven't used either one now. Sounds like 0.8.2.1 will help.
> > > > We are using the High Level Consumer generally but are thinking to
> use the SimpleConsumer on failure recovery to set the offsets.
> > > > Is that the recommended approach for this use case?
> > > > Thanks.
> > > > -Suren
> > > >
> > > >
> > > >  On Thursday, February 19, 2015 9:40 AM, Joel Koshy <
> jjkosh...@gmail.com> wrote:
> > > >
> > > >
> > > >  Are you using it from Java or Scala? i.e., are you using the
> > > > javaapi.SimpleConsumer or kafka.consumer.SimpleConsumer
> > > >
> > > > In 0.8.2 javaapi we explicitly set version 0 of the
> > > > OffsetCommitRequest/OffsetFetchRequest which means it will
> > > > commit/fetch to/from ZooKeeper only. If you use the scala API you can
> > > > create an OffsetCommitRequest with version set to 1 (which will allow
> > > > you to commit to Kafka).
> > > >
> > > > Since we are doing an 0.8.2.1 release we will make the above more
> > > > consistent. i.e., you can create OffsetCommitRequests with version 1
> > > > even from the javaapi. I will be updating the documentation on this
> to
> > > > make it clearer.
> > > >
> > > > Thanks,
> > > >
> > > > Joel
> > > >
> > > > On Thu, Feb 19, 2015 at 02:28:32PM +, Suren wrote:
> > > > > Joel,
> > > > > Looking at SimpleConsumer in the 0.8.2 code, it is using
> OffsetCommitRequest and sending that over to a broker.
> > > > > Is the broker storing that in ZK?
> > > > > -Suren
> > > > >
> > > > >
> > > > >  On Tuesday, February 17, 2015 12:22 PM, Joel Koshy <
> jjkosh...@gmail.com> wrote:
> > > > >
> > > > >
> > > > >  Hi Chris,
> > > > >
> > > > > In 0.8.2, the simple consumer Java API supports committing/fetching
> > > > > offsets that are stored in ZooKeeper. You don't need to issue any
> > > > > ConsumerMetadataRequest for this. Unfortunately, the API currently
> > > > > does not support fetching offsets that are stored in Kafka.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Joel
> > > > >
> > > > > On Mon, Feb 16, 2015 at 05:02:08PM -0500, Christopher Piggott
> wrote:
> > > > > > Hi,
> > > > > >
> > > > > > I am still using 0.8.1.1 because of the CPU use concerns.
> > > > > >
> > > > > > I'm confused about why the SimpleConsumer has:
> > > > > >
> > > > > > OffsetCommitResponse commitOffsets(OffsetCommitRequest request)
> > > > > >
> > > > > > and
> > > > > >
> > > > > > OffsetFetchResponse fetchOffsets(OffsetFetchRequest request)
> > > > > >
> > > > > > but no way that I can see to issue a ConsumerMetadataRequest,
> which is
> > > > > > what I think when restarting my consumers so that they can begin
> > > > > > working where they last left off (in the event that they were
> stopped
> > > > > > for a while then restarted some time later, and ne

Re: Simple Consumer and offsets

2015-02-19 Thread Joel Koshy
If you are using v0 of OffsetCommit/FetchRequest then you can issue
that to any broker. For version > 0 you will need to issue it to the
coordinator. You can discover the coordinator by sending a
ConsumerMetadataRequest to any broker.

On Thu, Feb 19, 2015 at 07:55:16PM +, Suren wrote:
> Joel/All,
> The SimpleConsumer constructor requires a specific host and port.
> 
> Can this be any broker?
> If it needs to be a specific broker, for 0.8.2, should this be the offset 
> coordinator? For 0.8.1, does it matter?
> -Suren
>  
> 
>  On Thursday, February 19, 2015 10:43 AM, Joel Koshy 
>  wrote:
>
> 
>  I see - yes, you can use the SimpleConsumer for that. However, your
> high-level consumers need to be shutdown while you do that (otherwise
> they may auto-commit while you are resetting offsets).
> 
> Thanks,
> 
> Joel
> 
> On Thu, Feb 19, 2015 at 03:29:19PM +, Suren wrote:
> > We are using the High Level Consumer API to interact with Kafka for our 
> > normal use cases.
> > 
> > However, on consumer restart in the case of consumer failures, we want to 
> > be able to manually
> > reset offsets in certain situations.
> > And ideally we'd like to use the same api in 0.8.1 and 0.8.2. :-)
> > It looked like instantiating a SimpleConsumer just to reset offsets on 
> > restart was a viable option, while continuing to use the High Level 
> > Consumer for our normal operations. Not sure if there is a better way that 
> > is compatible across 0.8.1 and 0.8.2.
> > -Suren
> >  
> > 
> >      On Thursday, February 19, 2015 10:25 AM, Joel Koshy 
> > wrote:
> >    
> > 
> >  Not sure what you mean by using the SimpleConsumer on failure
> > recovery. Can you elaborate on this?
> > 
> > On Thu, Feb 19, 2015 at 03:04:47PM +, Suren wrote:
> > > Haven't used either one now. Sounds like 0.8.2.1 will help.
> > > We are using the High Level Consumer generally but are thinking to use 
> > > the SimpleConsumer on failure recovery to set the offsets.
> > > Is that the recommended approach for this use case?
> > > Thanks.
> > > -Suren
> > >  
> > > 
> > >      On Thursday, February 19, 2015 9:40 AM, Joel Koshy 
> > > wrote:
> > >    
> > > 
> > >  Are you using it from Java or Scala? i.e., are you using the
> > > javaapi.SimpleConsumer or kafka.consumer.SimpleConsumer
> > > 
> > > In 0.8.2 javaapi we explicitly set version 0 of the
> > > OffsetCommitRequest/OffsetFetchRequest which means it will
> > > commit/fetch to/from ZooKeeper only. If you use the scala API you can
> > > create an OffsetCommitRequest with version set to 1 (which will allow
> > > you to commit to Kafka).
> > > 
> > > Since we are doing an 0.8.2.1 release we will make the above more
> > > consistent. i.e., you can create OffsetCommitRequests with version 1
> > > even from the javaapi. I will be updating the documentation on this to
> > > make it clearer.
> > > 
> > > Thanks,
> > > 
> > > Joel
> > > 
> > > On Thu, Feb 19, 2015 at 02:28:32PM +, Suren wrote:
> > > > Joel,
> > > > Looking at SimpleConsumer in the 0.8.2 code, it is using 
> > > > OffsetCommitRequest and sending that over to a broker.
> > > > Is the broker storing that in ZK?
> > > > -Suren
> > > >  
> > > > 
> > > >      On Tuesday, February 17, 2015 12:22 PM, Joel Koshy 
> > > > wrote:
> > > >    
> > > > 
> > > >  Hi Chris,
> > > > 
> > > > In 0.8.2, the simple consumer Java API supports committing/fetching
> > > > offsets that are stored in ZooKeeper. You don't need to issue any
> > > > ConsumerMetadataRequest for this. Unfortunately, the API currently
> > > > does not support fetching offsets that are stored in Kafka.
> > > > 
> > > > Thanks,
> > > > 
> > > > Joel
> > > > 
> > > > On Mon, Feb 16, 2015 at 05:02:08PM -0500, Christopher Piggott wrote:
> > > > > Hi,
> > > > > 
> > > > > I am still using 0.8.1.1 because of the CPU use concerns.
> > > > > 
> > > > > I'm confused about why the SimpleConsumer has:
> > > > > 
> > > > > OffsetCommitResponse commitOffsets(OffsetCommitRequest request)
> > > > > 
> > > > > and
> > > > > 
> > > > > OffsetFetchResponse fetchOffsets(OffsetFetchRequest request)
> > > > > 
> > > > > but no way that I can see to issue a ConsumerMetadataRequest, which is
> > > > > what I think when restarting my consumers so that they can begin
> > > > > working where they last left off (in the event that they were stopped
> > > > > for a while then restarted some time later, and new messages had come
> > > > > in).
> > > > > 
> > > > > The fetchOffsets() works on time, usually it looks like you send it
> > > > > Earliest or Latest (beginning or end of what's currently in the
> > > > > stream).
> > > > > 
> > > > > I realize the documentation says this:
> > > > > 
> > > > > 
> > > > > > *Downsides of using SimpleConsumer*The SimpleConsumer does require 
> > > > > > a significant amount of work not needed in the Consumer Groups:
> > > > > >
> > > > > >    1. You must keep track of the offsets in your application to 
> > > > > >know where you left off consuming.
> >

Re: Consuming a snapshot from log compacted topic

2015-02-19 Thread Joel Koshy
The log end offset (of a partition) changes when messages are appended
to the partition. (It is not correlated with the consumer's offset).


On Thu, Feb 19, 2015 at 08:58:10PM +, Will Funnell wrote:
> So at what point does the log end offset change? When you commit?
> 
> On 19 February 2015 at 18:47, Joel Koshy  wrote:
> 
> > > If I consumed up to the log end offset and log compaction happens in
> > > between, I would have missed some messages.
> >
> > Compaction actually only runs on the rolled over segments (not the
> > active - i.e., latest segment). The log-end-offset will be in the
> > latest segment which does not participate in compaction.
> >
> > > > The log end offset is just the end of the committed messages in the log
> > > > (the last thing the consumer has access to). It isn't the same as the
> > > > cleaner point but is always later than it so it would work just as
> > well.
> > >
> > > Isn't this just roughly the same value as using c.getOffsetsBefore()
> > with a
> > > partitionRequestTime of -1?
> > >
> > >
> > > Although its always later than the cleaner point, surely log compaction
> > is
> > > still an issue here.
> > >
> > > If I consumed up to the log end offset and log compaction happens in
> > > between, I would have missed some messages.
> > >
> > >
> > > My thinking was that if you knew the log cleaner point, you could:
> > >
> > > Make a note of the starting offset
> > > Consume till end of log
> > > Check my starting point is ahead of current cleaner point, otherwise
> > loop.
> > >
> > >
> > > I appreciate there is a chance I misunderstood your point.
> > >
> > > On 19 February 2015 at 18:02, Jay Kreps  wrote:
> > >
> > > > The log end offset is just the end of the committed messages in the log
> > > > (the last thing the consumer has access to). It isn't the same as the
> > > > cleaner point but is always later than it so it would work just as
> > well.
> > > >
> > > > -Jay
> > > >
> > > > On Thu, Feb 19, 2015 at 8:54 AM, Will Funnell 
> > > > wrote:
> > > >
> > > > > > I'm not sure if I misunderstood Jay's suggestion, but I think it is
> > > > > > along the lines of: we expose the log-end-offset (actually the high
> > > > > > watermark) of the partition in the fetch response. However, this is
> > > > > > not exposed to the consumer (either in the new ConsumerRecord class
> > > > > > or the existing MessageAndMetadata class). If we did, then if you
> > > > > > were to consume a record you can check that it has offsets up to
> > the
> > > > > > log-end offset. If it does then you would know for sure that you
> > have
> > > > > > consumed everything for that partition
> > > > >
> > > > > To confirm then, the log-end-offset is the same as the cleaner point?
> > > > >
> > > > >
> > > > >
> > > > > On 19 February 2015 at 03:10, Jay Kreps  wrote:
> > > > >
> > > > > > Yeah I was thinking either along the lines Joel was suggesting or
> > else
> > > > > > adding a logEndOffset(TopicPartition) method or something like
> > that. As
> > > > > > Joel says the consumer actually has this information internally (we
> > > > > return
> > > > > > it with the fetch request) but doesn't expose it.
> > > > > >
> > > > > > -Jay
> > > > > >
> > > > > > On Wed, Feb 18, 2015 at 4:51 PM, Joel Koshy 
> > > > wrote:
> > > > > >
> > > > > > > > > 2. Make the log end offset available more easily in the
> > consumer.
> > > > > > > >
> > > > > > > > Was thinking something would need to be added in
> > LogCleanerManager,
> > > > > in
> > > > > > > the
> > > > > > > > updateCheckpoints function. Where would be best to publish the
> > > > > > > information
> > > > > > > > to make it more easily available, or would you just expose the
> > > > > > > > offset-cleaner-checkpoint file as it is?
> > > > > > > > Is it right you would also need to know which
> > > > > offset-cleaner-checkpoint
> > > > > > > > entry related to each active partition?
> > > > > > >
> > > > > > > I'm not sure if I misunderstood Jay's suggestion, but I think it
> > is
> > > > > > > along the lines of: we expose the log-end-offset (actually the
> > high
> > > > > > > watermark) of the partition in the fetch response. However, this
> > is
> > > > > > > not exposed to the consumer (either in the new ConsumerRecord
> > class
> > > > > > > or the existing MessageAndMetadata class). If we did, then if you
> > > > > > > were to consume a record you can check that it has offsets up to
> > the
> > > > > > > log-end offset. If it does then you would know for sure that you
> > have
> > > > > > > consumed everything for that partition.
> > > > > > >
> > > > > > > > Yes, was looking at this initially, but as we have 100-150
> > writes
> > > > per
> > > > > > > > second, it could be a while before there is a pause long
> > enough to
> > > > > > check
> > > > > > > it
> > > > > > > > has caught up. Even with the consumer timeout set to -1, it
> > takes
> > > > > some
> > > > > > > time
> > > > > > > > to query the max offset values, which is still lo

Re: New Consumer Offset management in 0.8.2

2015-02-19 Thread Joel Koshy
Yes it is supported in 0.8.2-beta. It is documented on the site - you
will need to set offsets.storage to kafka.

On Thu, Feb 19, 2015 at 03:57:31PM -0500, Matthew Butt wrote:
> I'm having a hard time figuring out if the new Kafka-based offset
> management in the high-level Scala Consumer is implemented in the current
> version of 0.8.2-beta. If I implement a high-level consumer, will it use
> the new system, or will it still be storing in zookeeper? Do I need to wait
> for the Java consumer to take advantage of it?
> 
> -- 
> - Matt



Re: NetworkProcessorAvgIdlePercent

2015-02-19 Thread Zakee
Jun,

I am already using the latest release 0.8.2.1.

-Zakee

On Thu, Feb 19, 2015 at 2:46 PM, Jun Rao  wrote:

> Could you try the 0.8.2.1 release being voted on now? It fixes a CPU issue
> and should reduce the CPU load in network thread.
>
> Thanks,
>
> Jun
>
> On Thu, Feb 19, 2015 at 11:54 AM, Zakee  wrote:
>
> > Kafka documentation recommends <0.3 for above metric. I assume processor
> is
> > busier if this goes below 0.3 and obviously it being < 0.3 for long does
> > not seem to be a good sign.
> >
> > What should be our criteria to raise an alert, I though it should  be
> when
> > its value goes below 0.3. However, the value seems to be below 0.3 a lot
> of
> > the times, almost always if we take samples every five mins. What should
> be
> > the threshold to raise an alarm ?
> >
> > What would be the impact of having this below 0.3 or even zero like most
> of
> > the times?
> >
> >
> > -Zakee
> > 
> > How Old Men Tighten Skin
> > 63 Year Old Man Shares DIY Skin Tightening Method You Can Do From Home
> > http://thirdpartyoffers.netzero.net/TGL3231/54e63f5bda4c23f5b6560st02vuc
> 
> 8% Annuity Return Secret
> Earn Guaranteed Income for Life! Compare Rates Today.
> http://thirdpartyoffers.netzero.net/TGL3255/54e6782bcbe78782b37bdmp15duc


Re: data corruption like behavior

2015-02-19 Thread Karts
[2015-02-05 14:21:09,708] ERROR [ReplicaFetcherThread-2-1], Error in fetch
Name: FetchRequest; Version: 0; CorrelationId: 147301; ClientId:
ReplicaFetcherThread-2-1; ReplicaId: 3; MaxWait: 500 ms; MinBytes: 1 bytes;
RequestInfo: [site.db.people,6] ->
PartitionFetchInfo(0,1048576),[site.db.main,4] ->
PartitionFetchInfo(0,1048576),[site.db.school,7] ->
PartitionFetchInfo(0,1048576),[site.db.people,2] ->
PartitionFetchInfo(0,1048576),[k3.hydra,6] ->
PartitionFetchInfo(3,1048576),[site.db.school,3] ->
PartitionFetchInfo(0,1048576),[site.db.main,0] ->
PartitionFetchInfo(0,1048576),[site.db.cmphotos,2] ->
PartitionFetchInfo(2245,1048576),[site.db.cmphotos,6] ->
PartitionFetchInfo(2220,1048576) (kafka.server.ReplicaFetcherThread)
java.net.ConnectException: Connection refused

These were some of the errors from the server log. didnt find any on the
producer side of things.

On Thu, Feb 19, 2015 at 4:30 PM, Jun Rao  wrote:

> Is there any error in the producer log? Is there any pattern in the
> messages being lost?
>
> Thanks,
>
> Jun
>
> On Thu, Feb 19, 2015 at 4:20 PM, Karts  wrote:
>
> > yes i did.
> >
> > On Thu, Feb 19, 2015 at 2:42 PM, Jun Rao  wrote:
> >
> > > Did you consume the messages from the beginning of the log?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Thu, Feb 19, 2015 at 12:18 PM, Karts  wrote:
> > >
> > > > but they have always been up. I mean when i was testing, all the
> > > zookeepers
> > > > were up. and all the kafka nodes were up. its just that I changed the
> > > > number of zookeeper nodes in my first test iteration. second and
> third
> > > were
> > > > still the same. not sure why the topics were losing some messages.
> > > >
> > > > On Thu, Feb 19, 2015 at 11:39 AM, Jun Rao  wrote:
> > > >
> > > > > Zookeeper requires a majority of the nodes to be up for the service
> > to
> > > be
> > > > > available. Kafka relies on Zookeeper to be always available.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Thu, Feb 19, 2015 at 11:15 AM, Karts 
> wrote:
> > > > >
> > > > > > I have noticed some strange patterns when testing with the 0.8.1
> > > build
> > > > > and
> > > > > > the 0.8.2 builds, and are listed below.
> > > > > > 1. So I setup a brand new cluster [3 kafka nodes with 3
> > zookeepers],
> > > > > > created 2 topics via the API calls, everything went fine and was
> > > > > > successfully able to view my messages in my consumers. There were
> > no
> > > > > > messages lost. All is happy. Now, I change my setup to just have
> 1
> > > > > > zookeeper. and do my test again, i lose some messages. I have
> > checked
> > > > > that
> > > > > > all my configs are pointing to just 1 zookeeper and there was no
> > > > mention
> > > > > of
> > > > > > the other 2 offline zookeepers. any idea why ?
> > > > > > 2. I revert back my settings to the original config, all 3 nodes
> > are
> > > > > > online, no errors, send messages to same old topic, and i am
> still
> > > > > loosing
> > > > > > some messages. I deleted all the old topic files [to follow the
> > > > 'cleanup'
> > > > > > process], create a new topic, and i am successfully able to
> receive
> > > all
> > > > > > messages. no loss whatsoever.
> > > > > > 3. Now in this state, i upgrade to 0.8.2, and try sending
> messages
> > to
> > > > the
> > > > > > topic that was made after the above cleanup, and i am losing
> > messages
> > > > > > again.
> > > > > >
> > > > > > Am i making sense? I mean this is a very strange behavior, and if
> > > > anyone
> > > > > > can comment on this [please correct me if i have done something
> > > 'very'
> > > > > > wrong]..
> > > > > >
> > > > > > Thanks..
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: data corruption like behavior

2015-02-19 Thread Jun Rao
Is there any error in the producer log? Is there any pattern in the
messages being lost?

Thanks,

Jun

On Thu, Feb 19, 2015 at 4:20 PM, Karts  wrote:

> yes i did.
>
> On Thu, Feb 19, 2015 at 2:42 PM, Jun Rao  wrote:
>
> > Did you consume the messages from the beginning of the log?
> >
> > Thanks,
> >
> > Jun
> >
> > On Thu, Feb 19, 2015 at 12:18 PM, Karts  wrote:
> >
> > > but they have always been up. I mean when i was testing, all the
> > zookeepers
> > > were up. and all the kafka nodes were up. its just that I changed the
> > > number of zookeeper nodes in my first test iteration. second and third
> > were
> > > still the same. not sure why the topics were losing some messages.
> > >
> > > On Thu, Feb 19, 2015 at 11:39 AM, Jun Rao  wrote:
> > >
> > > > Zookeeper requires a majority of the nodes to be up for the service
> to
> > be
> > > > available. Kafka relies on Zookeeper to be always available.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Thu, Feb 19, 2015 at 11:15 AM, Karts  wrote:
> > > >
> > > > > I have noticed some strange patterns when testing with the 0.8.1
> > build
> > > > and
> > > > > the 0.8.2 builds, and are listed below.
> > > > > 1. So I setup a brand new cluster [3 kafka nodes with 3
> zookeepers],
> > > > > created 2 topics via the API calls, everything went fine and was
> > > > > successfully able to view my messages in my consumers. There were
> no
> > > > > messages lost. All is happy. Now, I change my setup to just have 1
> > > > > zookeeper. and do my test again, i lose some messages. I have
> checked
> > > > that
> > > > > all my configs are pointing to just 1 zookeeper and there was no
> > > mention
> > > > of
> > > > > the other 2 offline zookeepers. any idea why ?
> > > > > 2. I revert back my settings to the original config, all 3 nodes
> are
> > > > > online, no errors, send messages to same old topic, and i am still
> > > > loosing
> > > > > some messages. I deleted all the old topic files [to follow the
> > > 'cleanup'
> > > > > process], create a new topic, and i am successfully able to receive
> > all
> > > > > messages. no loss whatsoever.
> > > > > 3. Now in this state, i upgrade to 0.8.2, and try sending messages
> to
> > > the
> > > > > topic that was made after the above cleanup, and i am losing
> messages
> > > > > again.
> > > > >
> > > > > Am i making sense? I mean this is a very strange behavior, and if
> > > anyone
> > > > > can comment on this [please correct me if i have done something
> > 'very'
> > > > > wrong]..
> > > > >
> > > > > Thanks..
> > > > >
> > > >
> > >
> >
>


Re: data corruption like behavior

2015-02-19 Thread Karts
actually i take that back. it reads from where the last offset left off.

On Thu, Feb 19, 2015 at 4:20 PM, Karts  wrote:

> yes i did.
>
> On Thu, Feb 19, 2015 at 2:42 PM, Jun Rao  wrote:
>
>> Did you consume the messages from the beginning of the log?
>>
>> Thanks,
>>
>> Jun
>>
>> On Thu, Feb 19, 2015 at 12:18 PM, Karts  wrote:
>>
>> > but they have always been up. I mean when i was testing, all the
>> zookeepers
>> > were up. and all the kafka nodes were up. its just that I changed the
>> > number of zookeeper nodes in my first test iteration. second and third
>> were
>> > still the same. not sure why the topics were losing some messages.
>> >
>> > On Thu, Feb 19, 2015 at 11:39 AM, Jun Rao  wrote:
>> >
>> > > Zookeeper requires a majority of the nodes to be up for the service
>> to be
>> > > available. Kafka relies on Zookeeper to be always available.
>> > >
>> > > Thanks,
>> > >
>> > > Jun
>> > >
>> > > On Thu, Feb 19, 2015 at 11:15 AM, Karts  wrote:
>> > >
>> > > > I have noticed some strange patterns when testing with the 0.8.1
>> build
>> > > and
>> > > > the 0.8.2 builds, and are listed below.
>> > > > 1. So I setup a brand new cluster [3 kafka nodes with 3 zookeepers],
>> > > > created 2 topics via the API calls, everything went fine and was
>> > > > successfully able to view my messages in my consumers. There were no
>> > > > messages lost. All is happy. Now, I change my setup to just have 1
>> > > > zookeeper. and do my test again, i lose some messages. I have
>> checked
>> > > that
>> > > > all my configs are pointing to just 1 zookeeper and there was no
>> > mention
>> > > of
>> > > > the other 2 offline zookeepers. any idea why ?
>> > > > 2. I revert back my settings to the original config, all 3 nodes are
>> > > > online, no errors, send messages to same old topic, and i am still
>> > > loosing
>> > > > some messages. I deleted all the old topic files [to follow the
>> > 'cleanup'
>> > > > process], create a new topic, and i am successfully able to receive
>> all
>> > > > messages. no loss whatsoever.
>> > > > 3. Now in this state, i upgrade to 0.8.2, and try sending messages
>> to
>> > the
>> > > > topic that was made after the above cleanup, and i am losing
>> messages
>> > > > again.
>> > > >
>> > > > Am i making sense? I mean this is a very strange behavior, and if
>> > anyone
>> > > > can comment on this [please correct me if i have done something
>> 'very'
>> > > > wrong]..
>> > > >
>> > > > Thanks..
>> > > >
>> > >
>> >
>>
>
>


Re: data corruption like behavior

2015-02-19 Thread Karts
yes i did.

On Thu, Feb 19, 2015 at 2:42 PM, Jun Rao  wrote:

> Did you consume the messages from the beginning of the log?
>
> Thanks,
>
> Jun
>
> On Thu, Feb 19, 2015 at 12:18 PM, Karts  wrote:
>
> > but they have always been up. I mean when i was testing, all the
> zookeepers
> > were up. and all the kafka nodes were up. its just that I changed the
> > number of zookeeper nodes in my first test iteration. second and third
> were
> > still the same. not sure why the topics were losing some messages.
> >
> > On Thu, Feb 19, 2015 at 11:39 AM, Jun Rao  wrote:
> >
> > > Zookeeper requires a majority of the nodes to be up for the service to
> be
> > > available. Kafka relies on Zookeeper to be always available.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Thu, Feb 19, 2015 at 11:15 AM, Karts  wrote:
> > >
> > > > I have noticed some strange patterns when testing with the 0.8.1
> build
> > > and
> > > > the 0.8.2 builds, and are listed below.
> > > > 1. So I setup a brand new cluster [3 kafka nodes with 3 zookeepers],
> > > > created 2 topics via the API calls, everything went fine and was
> > > > successfully able to view my messages in my consumers. There were no
> > > > messages lost. All is happy. Now, I change my setup to just have 1
> > > > zookeeper. and do my test again, i lose some messages. I have checked
> > > that
> > > > all my configs are pointing to just 1 zookeeper and there was no
> > mention
> > > of
> > > > the other 2 offline zookeepers. any idea why ?
> > > > 2. I revert back my settings to the original config, all 3 nodes are
> > > > online, no errors, send messages to same old topic, and i am still
> > > loosing
> > > > some messages. I deleted all the old topic files [to follow the
> > 'cleanup'
> > > > process], create a new topic, and i am successfully able to receive
> all
> > > > messages. no loss whatsoever.
> > > > 3. Now in this state, i upgrade to 0.8.2, and try sending messages to
> > the
> > > > topic that was made after the above cleanup, and i am losing messages
> > > > again.
> > > >
> > > > Am i making sense? I mean this is a very strange behavior, and if
> > anyone
> > > > can comment on this [please correct me if i have done something
> 'very'
> > > > wrong]..
> > > >
> > > > Thanks..
> > > >
> > >
> >
>


Re: KafkaProducer.send contract

2015-02-19 Thread JAmes Atwill
Hey Jun,

That's what I've got right now, semaphore before send() and release in the
callback. Am I correct in understanding that there's no way to do any
batching with KafkaProducer itself (other than have a "bulk" message which
would just be a single message with multiple messages for a particular
Node)?

  JAmes

On Thu, Feb 19, 2015 at 2:50 PM, Jun Rao  wrote:

> You can register a callback for each message sent. The callback will be
> called when the message is sent successfully or failed.
>
> Thanks,
>
> Jun
>
> On Tue, Feb 17, 2015 at 4:11 PM, JAmes Atwill 
> wrote:
>
> > Hi!
> >
> > I'm using the new KafkaProducer in 0.8.2.0.
> >
> > I have thousands of "Nodes" which receive messages. Each message
> > idempotently mutates the state of the Node, so while duplicate messages
> are
> > fine, missed messages are not.
> >
> > I'm writing these messages into a topic with dozens of partitions.
> >
> > Am I correct in believing that I'll have to manually manage having one
> > message "in flight" per "node" at a time? Or is there a mechanism to say
> > "This message and all messages after it for this partition were
> rejected"?
> > (or something similar)
> >
> > Thanks!
> >
> >   JAmes
> >
>


Re: KafkaProducer.send contract

2015-02-19 Thread Jun Rao
You can register a callback for each message sent. The callback will be
called when the message is sent successfully or failed.

Thanks,

Jun

On Tue, Feb 17, 2015 at 4:11 PM, JAmes Atwill 
wrote:

> Hi!
>
> I'm using the new KafkaProducer in 0.8.2.0.
>
> I have thousands of "Nodes" which receive messages. Each message
> idempotently mutates the state of the Node, so while duplicate messages are
> fine, missed messages are not.
>
> I'm writing these messages into a topic with dozens of partitions.
>
> Am I correct in believing that I'll have to manually manage having one
> message "in flight" per "node" at a time? Or is there a mechanism to say
> "This message and all messages after it for this partition were rejected"?
> (or something similar)
>
> Thanks!
>
>   JAmes
>


Re: data corruption like behavior

2015-02-19 Thread Jun Rao
Did you consume the messages from the beginning of the log?

Thanks,

Jun

On Thu, Feb 19, 2015 at 12:18 PM, Karts  wrote:

> but they have always been up. I mean when i was testing, all the zookeepers
> were up. and all the kafka nodes were up. its just that I changed the
> number of zookeeper nodes in my first test iteration. second and third were
> still the same. not sure why the topics were losing some messages.
>
> On Thu, Feb 19, 2015 at 11:39 AM, Jun Rao  wrote:
>
> > Zookeeper requires a majority of the nodes to be up for the service to be
> > available. Kafka relies on Zookeeper to be always available.
> >
> > Thanks,
> >
> > Jun
> >
> > On Thu, Feb 19, 2015 at 11:15 AM, Karts  wrote:
> >
> > > I have noticed some strange patterns when testing with the 0.8.1 build
> > and
> > > the 0.8.2 builds, and are listed below.
> > > 1. So I setup a brand new cluster [3 kafka nodes with 3 zookeepers],
> > > created 2 topics via the API calls, everything went fine and was
> > > successfully able to view my messages in my consumers. There were no
> > > messages lost. All is happy. Now, I change my setup to just have 1
> > > zookeeper. and do my test again, i lose some messages. I have checked
> > that
> > > all my configs are pointing to just 1 zookeeper and there was no
> mention
> > of
> > > the other 2 offline zookeepers. any idea why ?
> > > 2. I revert back my settings to the original config, all 3 nodes are
> > > online, no errors, send messages to same old topic, and i am still
> > loosing
> > > some messages. I deleted all the old topic files [to follow the
> 'cleanup'
> > > process], create a new topic, and i am successfully able to receive all
> > > messages. no loss whatsoever.
> > > 3. Now in this state, i upgrade to 0.8.2, and try sending messages to
> the
> > > topic that was made after the above cleanup, and i am losing messages
> > > again.
> > >
> > > Am i making sense? I mean this is a very strange behavior, and if
> anyone
> > > can comment on this [please correct me if i have done something 'very'
> > > wrong]..
> > >
> > > Thanks..
> > >
> >
>


Re: NetworkProcessorAvgIdlePercent

2015-02-19 Thread Jun Rao
Could you try the 0.8.2.1 release being voted on now? It fixes a CPU issue
and should reduce the CPU load in network thread.

Thanks,

Jun

On Thu, Feb 19, 2015 at 11:54 AM, Zakee  wrote:

> Kafka documentation recommends <0.3 for above metric. I assume processor is
> busier if this goes below 0.3 and obviously it being < 0.3 for long does
> not seem to be a good sign.
>
> What should be our criteria to raise an alert, I though it should  be when
> its value goes below 0.3. However, the value seems to be below 0.3 a lot of
> the times, almost always if we take samples every five mins. What should be
> the threshold to raise an alarm ?
>
> What would be the impact of having this below 0.3 or even zero like most of
> the times?
>
>
> -Zakee
> 
> How Old Men Tighten Skin
> 63 Year Old Man Shares DIY Skin Tightening Method You Can Do From Home
> http://thirdpartyoffers.netzero.net/TGL3231/54e63f5bda4c23f5b6560st02vuc


Re: big cpu jump on producer in face of broker outage

2015-02-19 Thread Steven Wu
Jun,

You are right. I tried 0.8.2.0 producer with my test. confirmed that it
fixed the cpu issue.

Thanks,
Steven


On Thu, Feb 19, 2015 at 12:02 PM, Steven Wu  wrote:

> will try 0.8.2.1 on producer and report back result.
>
> On Thu, Feb 19, 2015 at 11:52 AM, Jun Rao  wrote:
>
>> This is probably due to KAFKA-1642, which is fixed in 0.8.2.0. Could you
>> try that version or 0.8.2.1 which is being voted now.
>>
>> Thanks,
>>
>> Jun
>>
>> On Thu, Feb 19, 2015 at 10:42 AM, Steven Wu  wrote:
>>
>> > forgot to mention in case it matters
>> > producer: 0.8.2-beta
>> > broker: 0.8.1.1
>> >
>> > On Thu, Feb 19, 2015 at 10:34 AM, Steven Wu 
>> wrote:
>> >
>> > > I think this is an issue caused by KAFKA-1788.
>> > >
>> > > I was trying to test producer resiliency to broker outage. In this
>> > > experiment, I shutdown all brokers and see how producer behavior.
>> > >
>> > > Here are the observations
>> > > 1) kafka producer can recover from kafka outage. i.e. send resumed
>> after
>> > > brokers came back
>> > > 2) producer instance saw big cpu jump during outage. 28% -> 52% in one
>> > > test.
>> > >
>> > > Note that I didn't observe cpu issue when new producer instance
>> started
>> > > with brokers outage. In this case, there are no messages accumulated
>> in
>> > the
>> > > buffer, because KafkaProducer constructor failed with DNS lookup for
>> > > route53 name. when brokers came up, my wrapper re-created
>> KafkaProducer
>> > > object and recover from outage with sending messages.
>> > >
>> > > Here is the cpu graph for a running producer instance where broker
>> outage
>> > > happened in the middle of test run. it shows cpu problem.
>> > >
>> > >
>> >
>> https://docs.google.com/drawings/d/1FdEg9-Rf_jbDZX0cC3iZ834c4m-5rqgK-41lSS6VudQ/edit?usp=sharing
>> > >
>> > > Here is the cpu graph for a new producer instance where broker outage
>> > > happened before instance startup. cpu is good here.
>> > >
>> > >
>> >
>> https://docs.google.com/drawings/d/1NmOdwp79DKHE7kJeskBm411ln6QczAMfmcWeijvZQRQ/edit?usp=sharing
>> > >
>> > > Note that producer is a 4-core m1.xlarge instance. x-axis is time,
>> y-axis
>> > > is cpu util.
>> > >
>> > > Thanks,
>> > > Steven
>> > >
>> >
>>
>
>


New Consumer Offset management in 0.8.2

2015-02-19 Thread Matthew Butt
I'm having a hard time figuring out if the new Kafka-based offset
management in the high-level Scala Consumer is implemented in the current
version of 0.8.2-beta. If I implement a high-level consumer, will it use
the new system, or will it still be storing in zookeeper? Do I need to wait
for the Java consumer to take advantage of it?

-- 
- Matt


Re: Consuming a snapshot from log compacted topic

2015-02-19 Thread Will Funnell
So at what point does the log end offset change? When you commit?

On 19 February 2015 at 18:47, Joel Koshy  wrote:

> > If I consumed up to the log end offset and log compaction happens in
> > between, I would have missed some messages.
>
> Compaction actually only runs on the rolled over segments (not the
> active - i.e., latest segment). The log-end-offset will be in the
> latest segment which does not participate in compaction.
>
> > > The log end offset is just the end of the committed messages in the log
> > > (the last thing the consumer has access to). It isn't the same as the
> > > cleaner point but is always later than it so it would work just as
> well.
> >
> > Isn't this just roughly the same value as using c.getOffsetsBefore()
> with a
> > partitionRequestTime of -1?
> >
> >
> > Although its always later than the cleaner point, surely log compaction
> is
> > still an issue here.
> >
> > If I consumed up to the log end offset and log compaction happens in
> > between, I would have missed some messages.
> >
> >
> > My thinking was that if you knew the log cleaner point, you could:
> >
> > Make a note of the starting offset
> > Consume till end of log
> > Check my starting point is ahead of current cleaner point, otherwise
> loop.
> >
> >
> > I appreciate there is a chance I misunderstood your point.
> >
> > On 19 February 2015 at 18:02, Jay Kreps  wrote:
> >
> > > The log end offset is just the end of the committed messages in the log
> > > (the last thing the consumer has access to). It isn't the same as the
> > > cleaner point but is always later than it so it would work just as
> well.
> > >
> > > -Jay
> > >
> > > On Thu, Feb 19, 2015 at 8:54 AM, Will Funnell 
> > > wrote:
> > >
> > > > > I'm not sure if I misunderstood Jay's suggestion, but I think it is
> > > > > along the lines of: we expose the log-end-offset (actually the high
> > > > > watermark) of the partition in the fetch response. However, this is
> > > > > not exposed to the consumer (either in the new ConsumerRecord class
> > > > > or the existing MessageAndMetadata class). If we did, then if you
> > > > > were to consume a record you can check that it has offsets up to
> the
> > > > > log-end offset. If it does then you would know for sure that you
> have
> > > > > consumed everything for that partition
> > > >
> > > > To confirm then, the log-end-offset is the same as the cleaner point?
> > > >
> > > >
> > > >
> > > > On 19 February 2015 at 03:10, Jay Kreps  wrote:
> > > >
> > > > > Yeah I was thinking either along the lines Joel was suggesting or
> else
> > > > > adding a logEndOffset(TopicPartition) method or something like
> that. As
> > > > > Joel says the consumer actually has this information internally (we
> > > > return
> > > > > it with the fetch request) but doesn't expose it.
> > > > >
> > > > > -Jay
> > > > >
> > > > > On Wed, Feb 18, 2015 at 4:51 PM, Joel Koshy 
> > > wrote:
> > > > >
> > > > > > > > 2. Make the log end offset available more easily in the
> consumer.
> > > > > > >
> > > > > > > Was thinking something would need to be added in
> LogCleanerManager,
> > > > in
> > > > > > the
> > > > > > > updateCheckpoints function. Where would be best to publish the
> > > > > > information
> > > > > > > to make it more easily available, or would you just expose the
> > > > > > > offset-cleaner-checkpoint file as it is?
> > > > > > > Is it right you would also need to know which
> > > > offset-cleaner-checkpoint
> > > > > > > entry related to each active partition?
> > > > > >
> > > > > > I'm not sure if I misunderstood Jay's suggestion, but I think it
> is
> > > > > > along the lines of: we expose the log-end-offset (actually the
> high
> > > > > > watermark) of the partition in the fetch response. However, this
> is
> > > > > > not exposed to the consumer (either in the new ConsumerRecord
> class
> > > > > > or the existing MessageAndMetadata class). If we did, then if you
> > > > > > were to consume a record you can check that it has offsets up to
> the
> > > > > > log-end offset. If it does then you would know for sure that you
> have
> > > > > > consumed everything for that partition.
> > > > > >
> > > > > > > Yes, was looking at this initially, but as we have 100-150
> writes
> > > per
> > > > > > > second, it could be a while before there is a pause long
> enough to
> > > > > check
> > > > > > it
> > > > > > > has caught up. Even with the consumer timeout set to -1, it
> takes
> > > > some
> > > > > > time
> > > > > > > to query the max offset values, which is still long enough for
> more
> > > > > > > messages to arrive.
> > > > > >
> > > > > > Got it - thanks for clarifying.
> > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On 18 February 2015 at 23:16, Joel Koshy 
> > > > wrote:
> > > > > > >
> > > > > > > > > You are also correct and perceptive to notice that if you
> check
> > > > the
> > > > > > end
> > > > > > > > of
> > > > > > > > > the log then begin consuming and read up to that po

Re: data corruption like behavior

2015-02-19 Thread Karts
but they have always been up. I mean when i was testing, all the zookeepers
were up. and all the kafka nodes were up. its just that I changed the
number of zookeeper nodes in my first test iteration. second and third were
still the same. not sure why the topics were losing some messages.

On Thu, Feb 19, 2015 at 11:39 AM, Jun Rao  wrote:

> Zookeeper requires a majority of the nodes to be up for the service to be
> available. Kafka relies on Zookeeper to be always available.
>
> Thanks,
>
> Jun
>
> On Thu, Feb 19, 2015 at 11:15 AM, Karts  wrote:
>
> > I have noticed some strange patterns when testing with the 0.8.1 build
> and
> > the 0.8.2 builds, and are listed below.
> > 1. So I setup a brand new cluster [3 kafka nodes with 3 zookeepers],
> > created 2 topics via the API calls, everything went fine and was
> > successfully able to view my messages in my consumers. There were no
> > messages lost. All is happy. Now, I change my setup to just have 1
> > zookeeper. and do my test again, i lose some messages. I have checked
> that
> > all my configs are pointing to just 1 zookeeper and there was no mention
> of
> > the other 2 offline zookeepers. any idea why ?
> > 2. I revert back my settings to the original config, all 3 nodes are
> > online, no errors, send messages to same old topic, and i am still
> loosing
> > some messages. I deleted all the old topic files [to follow the 'cleanup'
> > process], create a new topic, and i am successfully able to receive all
> > messages. no loss whatsoever.
> > 3. Now in this state, i upgrade to 0.8.2, and try sending messages to the
> > topic that was made after the above cleanup, and i am losing messages
> > again.
> >
> > Am i making sense? I mean this is a very strange behavior, and if anyone
> > can comment on this [please correct me if i have done something 'very'
> > wrong]..
> >
> > Thanks..
> >
>


Re: Default MirrorMaker not copying over from source to target

2015-02-19 Thread Alex Melville
Tao,


I updated the mirrorconsumer.properties config file as you suggested, and
upped the MM's log level to DEBUG. I have the output of the DEBUG logger
here in this pastebin, if you could take a minute to look for anything in
its contents that would indicate a problem that would be extremely helpful.
Note that my servers hostnames are of the form ad-010X or ba-0X where X is
some integer between 1 and 4.

http://pastebin.com/rBsxx15A

When I run the mirrormaker and then spin up a console consumer to read from
the source cluster, I get 0 messages consumed.


Alex

On Sun, Feb 15, 2015 at 3:00 AM, tao xiao  wrote:

> Alex,
>
> Are you sure you have data continually being sent to the topic in source
> cluster after you bring up MM? By default auto.offset.reset=largest in MM
> consumer config which means MM only fetches the largest offset if the
> consumer group has no initial offset in zookeeper.
>
> You can have MM print more log by changing the log level in
> config/tools-log4j.properties
>
> On Sun, Feb 15, 2015 at 8:39 AM, Alex Melville 
> wrote:
>
> > Hi Kafka'ers,
> >
> >
> > I am trying to get the Mirrormaker working with two separate clusters,
> one
> > as the source and the other as the target. The topic I'm trying to copy
> > over exists on both the source and target clusters. Here are the relevant
> > entries in my consumer and producer properties files, which I'm
> specifying
> > the command I run to start the MM:
> >
> > *mirrorconsumer.properties:*
> > zookeeper.connect=ad-0104:2181
> > zookeeper.connection.timeout.ms=6000
> > group.id=test-consumer-group
> >
> >
> > *mirrorproducer.properties:*
> > metadata.broker.list=ba-02:9092,ba-03:9092
> > producer.type=sync
> > compression.codec=none
> > serializer.class=kafka.serializer.DefaultEncoder
> >
> >
> > Then I run the following command:
> > bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config
> >  ../config/mirrorconsumer.properties --producer.config
> > ../config/mirrorproducer.properties --whitelist consolemm
> >
> > so consolemm is the topic I'm trying to copy over. I've created consolemm
> > and have used to console-consumer to verify that there are messages in
> the
> > topic.
> >
> > When I run this command... nothing happens. The process keeps running and
> > prints nothing to the Terminal. If I look in the output of the zookeeper
> on
> > the source cluster I get only the following:
> >
> > [2015-02-15 00:34:06,102] INFO Accepted socket connection from /
> > 10.7.162.75:42819 (org.apache.zookeeper.server.NIOServerCnxnFactory)
> > [2015-02-15 00:34:06,104] INFO Client attempting to establish new session
> > at /10.7.162.75:42819 (org.apache.zookeeper.server.ZooKeeperServer)
> > [2015-02-15 00:34:06,106] INFO Established session 0x14b668b0fbe0033 with
> > negotiated timeout 6000 for client /10.7.162.75:42819
> > (org.apache.zookeeper.server.ZooKeeperServer)
> >
> >
> > and when I look at the output of one of the brokers on the source
> cluster I
> > get:
> >
> > [2015-02-15 00:32:14,382] INFO Closing socket connection to /10.7.162.75
> .
> > (kafka.network.Processor)
> >
> > and there is no output on the zookeeper on the target cluster.
> >
> >
> >
> > Any advice on what is causing MM to not properly copy over data to the
> > target cluster would be extremely helpful.
> >
> > -Alex
> >
>
>
>
> --
> Regards,
> Tao
>


Re: What conditions can cause Leader: -1 ?

2015-02-19 Thread Jun Rao
Any error in the controller and state-change log?

Thanks,

Jun

On Thu, Feb 12, 2015 at 7:28 AM, Omid Aladini  wrote:

> Hi,
>
> I'm experimenting with the following scenario:
>
> - 3 brokers are running (0,1 and 2) -- Kafka version 0.8.2.0
> - Continuously: restart broker number 0 by triggering controlled shutdown.
> Sleep rand(10) seconds. repeat.
> - Continuously: create 'simple-test-topic' (RF=2), write and read messages,
> then delete the topic. repeat.
>
> After a while, broker 0 doesn't come back up any more due to "corrupt
> index" error (but that's not my question for the moment). Looking at the
> state of the topics:
>
> Topic:simple-test-topicPartitionCount:8ReplicationFactor:2
>  Configs:
> Topic: simple-test-topicPartition: 0Leader: -1Replicas: 1,2
>Isr: 1
> Topic: simple-test-topicPartition: 1Leader: -1Replicas: 2,0
>Isr: 2
> Topic: simple-test-topicPartition: 2Leader: -1Replicas: 0,1
>Isr: 1
> Topic: simple-test-topicPartition: 3Leader: -1Replicas: 1,0
>Isr: 1
> Topic: simple-test-topicPartition: 4Leader: -1Replicas: 2,1
>Isr: 1
> Topic: simple-test-topicPartition: 5Leader: -1Replicas: 0,2
>Isr: 2
> Topic: simple-test-topicPartition: 6Leader: -1Replicas: 1,2
>Isr: 1
> Topic: simple-test-topicPartition: 7Leader: -1Replicas: 2,0
>Isr: 2
> Topic:testPartitionCount:8ReplicationFactor:3Configs:
> Topic: testPartition: 0Leader: 1Replicas: 1,2,0Isr: 2,1
> Topic: testPartition: 1Leader: 2Replicas: 2,0,1Isr: 2,1
> Topic: testPartition: 2Leader: 2Replicas: 0,1,2Isr: 2,1
> Topic: testPartition: 3Leader: 1Replicas: 1,0,2Isr: 2,1
> Topic: testPartition: 4Leader: 2Replicas: 2,1,0Isr: 2,1
> Topic: testPartition: 5Leader: 2Replicas: 0,2,1Isr: 2,1
> Topic: testPartition: 6Leader: 1Replicas: 1,2,0Isr: 2,1
> Topic: testPartition: 7Leader: 2Replicas: 2,0,1Isr: 2,1
>
> .. at which point:
>
> - All 'simple-test-topic' partitions are leaderless.
> - It's not possible to delete "simple-test-topic" any more.
> - Calling 'kafka-preferred-replica-election.sh' successfully starts
> election but doesn't have any effect.
>
> The other topic, named "test" (RF 3), is just sitting there and not
> actively participating in the test.
>
> Now I'm wondering:
>
> - Which of the steps above could have caused "simple-test-topic" partitions
> to become leaderless?
> - How to recover in such situation in cases where broker 0 can or cannot be
> recovered?
>
> Thanks,
> Omid
>


Re: big cpu jump on producer in face of broker outage

2015-02-19 Thread Steven Wu
will try 0.8.2.1 on producer and report back result.

On Thu, Feb 19, 2015 at 11:52 AM, Jun Rao  wrote:

> This is probably due to KAFKA-1642, which is fixed in 0.8.2.0. Could you
> try that version or 0.8.2.1 which is being voted now.
>
> Thanks,
>
> Jun
>
> On Thu, Feb 19, 2015 at 10:42 AM, Steven Wu  wrote:
>
> > forgot to mention in case it matters
> > producer: 0.8.2-beta
> > broker: 0.8.1.1
> >
> > On Thu, Feb 19, 2015 at 10:34 AM, Steven Wu 
> wrote:
> >
> > > I think this is an issue caused by KAFKA-1788.
> > >
> > > I was trying to test producer resiliency to broker outage. In this
> > > experiment, I shutdown all brokers and see how producer behavior.
> > >
> > > Here are the observations
> > > 1) kafka producer can recover from kafka outage. i.e. send resumed
> after
> > > brokers came back
> > > 2) producer instance saw big cpu jump during outage. 28% -> 52% in one
> > > test.
> > >
> > > Note that I didn't observe cpu issue when new producer instance started
> > > with brokers outage. In this case, there are no messages accumulated in
> > the
> > > buffer, because KafkaProducer constructor failed with DNS lookup for
> > > route53 name. when brokers came up, my wrapper re-created KafkaProducer
> > > object and recover from outage with sending messages.
> > >
> > > Here is the cpu graph for a running producer instance where broker
> outage
> > > happened in the middle of test run. it shows cpu problem.
> > >
> > >
> >
> https://docs.google.com/drawings/d/1FdEg9-Rf_jbDZX0cC3iZ834c4m-5rqgK-41lSS6VudQ/edit?usp=sharing
> > >
> > > Here is the cpu graph for a new producer instance where broker outage
> > > happened before instance startup. cpu is good here.
> > >
> > >
> >
> https://docs.google.com/drawings/d/1NmOdwp79DKHE7kJeskBm411ln6QczAMfmcWeijvZQRQ/edit?usp=sharing
> > >
> > > Note that producer is a 4-core m1.xlarge instance. x-axis is time,
> y-axis
> > > is cpu util.
> > >
> > > Thanks,
> > > Steven
> > >
> >
>


NetworkProcessorAvgIdlePercent

2015-02-19 Thread Zakee
Kafka documentation recommends <0.3 for above metric. I assume processor is
busier if this goes below 0.3 and obviously it being < 0.3 for long does
not seem to be a good sign.

What should be our criteria to raise an alert, I though it should  be when
its value goes below 0.3. However, the value seems to be below 0.3 a lot of
the times, almost always if we take samples every five mins. What should be
the threshold to raise an alarm ?

What would be the impact of having this below 0.3 or even zero like most of
the times?


-Zakee

How Old Men Tighten Skin
63 Year Old Man Shares DIY Skin Tightening Method You Can Do From Home
http://thirdpartyoffers.netzero.net/TGL3231/54e63f5bda4c23f5b6560st02vuc

Re: Simple Consumer and offsets

2015-02-19 Thread Suren
Joel/All,
The SimpleConsumer constructor requires a specific host and port.

Can this be any broker?
If it needs to be a specific broker, for 0.8.2, should this be the offset 
coordinator? For 0.8.1, does it matter?
-Suren
 

 On Thursday, February 19, 2015 10:43 AM, Joel Koshy  
wrote:
   

 I see - yes, you can use the SimpleConsumer for that. However, your
high-level consumers need to be shutdown while you do that (otherwise
they may auto-commit while you are resetting offsets).

Thanks,

Joel

On Thu, Feb 19, 2015 at 03:29:19PM +, Suren wrote:
> We are using the High Level Consumer API to interact with Kafka for our 
> normal use cases.
> 
> However, on consumer restart in the case of consumer failures, we want to be 
> able to manually
> reset offsets in certain situations.
> And ideally we'd like to use the same api in 0.8.1 and 0.8.2. :-)
> It looked like instantiating a SimpleConsumer just to reset offsets on 
> restart was a viable option, while continuing to use the High Level Consumer 
> for our normal operations. Not sure if there is a better way that is 
> compatible across 0.8.1 and 0.8.2.
> -Suren
>  
> 
>      On Thursday, February 19, 2015 10:25 AM, Joel Koshy 
> wrote:
>    
> 
>  Not sure what you mean by using the SimpleConsumer on failure
> recovery. Can you elaborate on this?
> 
> On Thu, Feb 19, 2015 at 03:04:47PM +, Suren wrote:
> > Haven't used either one now. Sounds like 0.8.2.1 will help.
> > We are using the High Level Consumer generally but are thinking to use the 
> > SimpleConsumer on failure recovery to set the offsets.
> > Is that the recommended approach for this use case?
> > Thanks.
> > -Suren
> >  
> > 
> >      On Thursday, February 19, 2015 9:40 AM, Joel Koshy 
> > wrote:
> >    
> > 
> >  Are you using it from Java or Scala? i.e., are you using the
> > javaapi.SimpleConsumer or kafka.consumer.SimpleConsumer
> > 
> > In 0.8.2 javaapi we explicitly set version 0 of the
> > OffsetCommitRequest/OffsetFetchRequest which means it will
> > commit/fetch to/from ZooKeeper only. If you use the scala API you can
> > create an OffsetCommitRequest with version set to 1 (which will allow
> > you to commit to Kafka).
> > 
> > Since we are doing an 0.8.2.1 release we will make the above more
> > consistent. i.e., you can create OffsetCommitRequests with version 1
> > even from the javaapi. I will be updating the documentation on this to
> > make it clearer.
> > 
> > Thanks,
> > 
> > Joel
> > 
> > On Thu, Feb 19, 2015 at 02:28:32PM +, Suren wrote:
> > > Joel,
> > > Looking at SimpleConsumer in the 0.8.2 code, it is using 
> > > OffsetCommitRequest and sending that over to a broker.
> > > Is the broker storing that in ZK?
> > > -Suren
> > >  
> > > 
> > >      On Tuesday, February 17, 2015 12:22 PM, Joel Koshy 
> > > wrote:
> > >    
> > > 
> > >  Hi Chris,
> > > 
> > > In 0.8.2, the simple consumer Java API supports committing/fetching
> > > offsets that are stored in ZooKeeper. You don't need to issue any
> > > ConsumerMetadataRequest for this. Unfortunately, the API currently
> > > does not support fetching offsets that are stored in Kafka.
> > > 
> > > Thanks,
> > > 
> > > Joel
> > > 
> > > On Mon, Feb 16, 2015 at 05:02:08PM -0500, Christopher Piggott wrote:
> > > > Hi,
> > > > 
> > > > I am still using 0.8.1.1 because of the CPU use concerns.
> > > > 
> > > > I'm confused about why the SimpleConsumer has:
> > > > 
> > > > OffsetCommitResponse commitOffsets(OffsetCommitRequest request)
> > > > 
> > > > and
> > > > 
> > > > OffsetFetchResponse fetchOffsets(OffsetFetchRequest request)
> > > > 
> > > > but no way that I can see to issue a ConsumerMetadataRequest, which is
> > > > what I think when restarting my consumers so that they can begin
> > > > working where they last left off (in the event that they were stopped
> > > > for a while then restarted some time later, and new messages had come
> > > > in).
> > > > 
> > > > The fetchOffsets() works on time, usually it looks like you send it
> > > > Earliest or Latest (beginning or end of what's currently in the
> > > > stream).
> > > > 
> > > > I realize the documentation says this:
> > > > 
> > > > 
> > > > > *Downsides of using SimpleConsumer*The SimpleConsumer does require a 
> > > > > significant amount of work not needed in the Consumer Groups:
> > > > >
> > > > >    1. You must keep track of the offsets in your application to know 
> > > > >where you left off consuming.
> > > > >
> > > > > But that's not really quite true ... not as long as commitOffsets() 
> > > > > has been provided.  It seems the SimpleConsumer provides you with a 
> > > > > solution to only one half of the problem of offset management.
> > > > 
> > > > Using some zookeeper python scripts I wrote I can see that the
> > > > commitOffsets() is doing its job and writing to
> > > > 
> > > > 
> > > > /consumers/myGroupId/offsets/myTopic/0
> > > > 
> > > > 
> > > > That has this value:
> > > > 
> > > > ('32757408', ZnodeStat(czxid=2211679, mzxid=

Re: big cpu jump on producer in face of broker outage

2015-02-19 Thread Jun Rao
This is probably due to KAFKA-1642, which is fixed in 0.8.2.0. Could you
try that version or 0.8.2.1 which is being voted now.

Thanks,

Jun

On Thu, Feb 19, 2015 at 10:42 AM, Steven Wu  wrote:

> forgot to mention in case it matters
> producer: 0.8.2-beta
> broker: 0.8.1.1
>
> On Thu, Feb 19, 2015 at 10:34 AM, Steven Wu  wrote:
>
> > I think this is an issue caused by KAFKA-1788.
> >
> > I was trying to test producer resiliency to broker outage. In this
> > experiment, I shutdown all brokers and see how producer behavior.
> >
> > Here are the observations
> > 1) kafka producer can recover from kafka outage. i.e. send resumed after
> > brokers came back
> > 2) producer instance saw big cpu jump during outage. 28% -> 52% in one
> > test.
> >
> > Note that I didn't observe cpu issue when new producer instance started
> > with brokers outage. In this case, there are no messages accumulated in
> the
> > buffer, because KafkaProducer constructor failed with DNS lookup for
> > route53 name. when brokers came up, my wrapper re-created KafkaProducer
> > object and recover from outage with sending messages.
> >
> > Here is the cpu graph for a running producer instance where broker outage
> > happened in the middle of test run. it shows cpu problem.
> >
> >
> https://docs.google.com/drawings/d/1FdEg9-Rf_jbDZX0cC3iZ834c4m-5rqgK-41lSS6VudQ/edit?usp=sharing
> >
> > Here is the cpu graph for a new producer instance where broker outage
> > happened before instance startup. cpu is good here.
> >
> >
> https://docs.google.com/drawings/d/1NmOdwp79DKHE7kJeskBm411ln6QczAMfmcWeijvZQRQ/edit?usp=sharing
> >
> > Note that producer is a 4-core m1.xlarge instance. x-axis is time, y-axis
> > is cpu util.
> >
> > Thanks,
> > Steven
> >
>


Re: data corruption like behavior

2015-02-19 Thread Jun Rao
Zookeeper requires a majority of the nodes to be up for the service to be
available. Kafka relies on Zookeeper to be always available.

Thanks,

Jun

On Thu, Feb 19, 2015 at 11:15 AM, Karts  wrote:

> I have noticed some strange patterns when testing with the 0.8.1 build and
> the 0.8.2 builds, and are listed below.
> 1. So I setup a brand new cluster [3 kafka nodes with 3 zookeepers],
> created 2 topics via the API calls, everything went fine and was
> successfully able to view my messages in my consumers. There were no
> messages lost. All is happy. Now, I change my setup to just have 1
> zookeeper. and do my test again, i lose some messages. I have checked that
> all my configs are pointing to just 1 zookeeper and there was no mention of
> the other 2 offline zookeepers. any idea why ?
> 2. I revert back my settings to the original config, all 3 nodes are
> online, no errors, send messages to same old topic, and i am still loosing
> some messages. I deleted all the old topic files [to follow the 'cleanup'
> process], create a new topic, and i am successfully able to receive all
> messages. no loss whatsoever.
> 3. Now in this state, i upgrade to 0.8.2, and try sending messages to the
> topic that was made after the above cleanup, and i am losing messages
> again.
>
> Am i making sense? I mean this is a very strange behavior, and if anyone
> can comment on this [please correct me if i have done something 'very'
> wrong]..
>
> Thanks..
>


Re: [VOTE] 0.8.2.1 Candidate 1

2015-02-19 Thread Guozhang Wang
+1 binding.

Checked the md5, and quick start.

Some minor comments:

1. The quickstart section would better include the building step after
download and before starting server.

2. There seems to be a bug in Gradle 1.1x with Java 8 causing the "gradle"
initialization to fail:

-

FAILURE: Build failed with an exception.

* Where:
Build file '/home/guwang/Workspace/temp/kafka/build.gradle' line: 199

* What went wrong:
A problem occurred evaluating root project 'kafka'.
> Could not create task of type 'ScalaDoc'.
--

Downgrading Java to 1.7 resolve this issue.

Guozhang

On Wed, Feb 18, 2015 at 7:56 PM, Connie Yang  wrote:

> +1
> On Feb 18, 2015 7:23 PM, "Matt Narrell"  wrote:
>
> > +1
> >
> > > On Feb 18, 2015, at 7:56 PM, Jun Rao  wrote:
> > >
> > > This is the first candidate for release of Apache Kafka 0.8.2.1. This
> > > only fixes one critical issue (KAFKA-1952) in 0.8.2.0.
> > >
> > > Release Notes for the 0.8.2.1 release
> > >
> >
> https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/RELEASE_NOTES.html
> > >
> > > *** Please download, test and vote by Saturday, Feb 21, 7pm PT
> > >
> > > Kafka's KEYS file containing PGP keys we use to sign the release:
> > > http://kafka.apache.org/KEYS in addition to the md5, sha1
> > > and sha2 (SHA256) checksum.
> > >
> > > * Release artifacts to be voted upon (source and binary):
> > > https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/
> > >
> > > * Maven artifacts to be voted upon prior to release:
> > > https://repository.apache.org/content/groups/staging/
> > >
> > > * scala-doc
> > > https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/scaladoc/
> > >
> > > * java-doc
> > > https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/javadoc/
> > >
> > > * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.1 tag
> > >
> >
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=c1b4c58531343dce80232e0122d085fc687633f6
> > >
> > > /***
> > >
> > > Thanks,
> > >
> > > Jun
> >
> >
>



-- 
-- Guozhang


data corruption like behavior

2015-02-19 Thread Karts
I have noticed some strange patterns when testing with the 0.8.1 build and
the 0.8.2 builds, and are listed below.
1. So I setup a brand new cluster [3 kafka nodes with 3 zookeepers],
created 2 topics via the API calls, everything went fine and was
successfully able to view my messages in my consumers. There were no
messages lost. All is happy. Now, I change my setup to just have 1
zookeeper. and do my test again, i lose some messages. I have checked that
all my configs are pointing to just 1 zookeeper and there was no mention of
the other 2 offline zookeepers. any idea why ?
2. I revert back my settings to the original config, all 3 nodes are
online, no errors, send messages to same old topic, and i am still loosing
some messages. I deleted all the old topic files [to follow the 'cleanup'
process], create a new topic, and i am successfully able to receive all
messages. no loss whatsoever.
3. Now in this state, i upgrade to 0.8.2, and try sending messages to the
topic that was made after the above cleanup, and i am losing messages
again.

Am i making sense? I mean this is a very strange behavior, and if anyone
can comment on this [please correct me if i have done something 'very'
wrong]..

Thanks..


Re: Broker w/ high memory due to index file sizes

2015-02-19 Thread Zakee
Well are there any measurement techniques for Memory config in brokers. We
do have a large load, with a max throughput 200MB/s. What do you suggest as
the recommended memory config for 5 brokers to handle such loads?

On Wed, Feb 18, 2015 at 7:13 PM, Jay Kreps  wrote:

> 40G is really huge, generally you would want more like 4G. Are you sure you
> need that? Not sure what you mean by lsof and index files being too large,
> but the index files are memory mapped so they should be able to grow
> arbitrarily large and their memory usage is not counted in the java heap
> (in fact by having such a large heap you are taking away OS memory from
> them).
>
> -Jay
>
> On Wed, Feb 18, 2015 at 4:13 PM, Zakee  wrote:
>
> > I am running a cluster of 5 brokers with 40G ms/mx for each. I found one
> of
> > the brokers is constantly using above ~90% of memory for jvm.heapUsage. I
> > checked from lsof output that the size of the index files for this broker
> > is too large.
> >
> > Not sure what is going on with this one broker in the cluster? Why would
> > the index file sizes be so hugely different on one broker? Any ideas?
> >
> >
> > Regards
> > Zakee
> > 
> > Invest with the Trend
> > Exclusive Breakout Alert On Soaring Social Media Technology
> > http://thirdpartyoffers.netzero.net/TGL3231/54e52a9fe121d2a9f4a27st01vuc
> 
> Have you been injured?
> Get a free evaluation today to see what your injury case is worth.
> http://thirdpartyoffers.netzero.net/TGL3255/54e55ad9894265ad90bcbmp13duc


Re: Consuming a snapshot from log compacted topic

2015-02-19 Thread Joel Koshy
> If I consumed up to the log end offset and log compaction happens in
> between, I would have missed some messages.

Compaction actually only runs on the rolled over segments (not the
active - i.e., latest segment). The log-end-offset will be in the
latest segment which does not participate in compaction.

> > The log end offset is just the end of the committed messages in the log
> > (the last thing the consumer has access to). It isn't the same as the
> > cleaner point but is always later than it so it would work just as well.
> 
> Isn't this just roughly the same value as using c.getOffsetsBefore() with a
> partitionRequestTime of -1?
> 
> 
> Although its always later than the cleaner point, surely log compaction is
> still an issue here.
> 
> If I consumed up to the log end offset and log compaction happens in
> between, I would have missed some messages.
> 
> 
> My thinking was that if you knew the log cleaner point, you could:
> 
> Make a note of the starting offset
> Consume till end of log
> Check my starting point is ahead of current cleaner point, otherwise loop.
> 
> 
> I appreciate there is a chance I misunderstood your point.
> 
> On 19 February 2015 at 18:02, Jay Kreps  wrote:
> 
> > The log end offset is just the end of the committed messages in the log
> > (the last thing the consumer has access to). It isn't the same as the
> > cleaner point but is always later than it so it would work just as well.
> >
> > -Jay
> >
> > On Thu, Feb 19, 2015 at 8:54 AM, Will Funnell 
> > wrote:
> >
> > > > I'm not sure if I misunderstood Jay's suggestion, but I think it is
> > > > along the lines of: we expose the log-end-offset (actually the high
> > > > watermark) of the partition in the fetch response. However, this is
> > > > not exposed to the consumer (either in the new ConsumerRecord class
> > > > or the existing MessageAndMetadata class). If we did, then if you
> > > > were to consume a record you can check that it has offsets up to the
> > > > log-end offset. If it does then you would know for sure that you have
> > > > consumed everything for that partition
> > >
> > > To confirm then, the log-end-offset is the same as the cleaner point?
> > >
> > >
> > >
> > > On 19 February 2015 at 03:10, Jay Kreps  wrote:
> > >
> > > > Yeah I was thinking either along the lines Joel was suggesting or else
> > > > adding a logEndOffset(TopicPartition) method or something like that. As
> > > > Joel says the consumer actually has this information internally (we
> > > return
> > > > it with the fetch request) but doesn't expose it.
> > > >
> > > > -Jay
> > > >
> > > > On Wed, Feb 18, 2015 at 4:51 PM, Joel Koshy 
> > wrote:
> > > >
> > > > > > > 2. Make the log end offset available more easily in the consumer.
> > > > > >
> > > > > > Was thinking something would need to be added in LogCleanerManager,
> > > in
> > > > > the
> > > > > > updateCheckpoints function. Where would be best to publish the
> > > > > information
> > > > > > to make it more easily available, or would you just expose the
> > > > > > offset-cleaner-checkpoint file as it is?
> > > > > > Is it right you would also need to know which
> > > offset-cleaner-checkpoint
> > > > > > entry related to each active partition?
> > > > >
> > > > > I'm not sure if I misunderstood Jay's suggestion, but I think it is
> > > > > along the lines of: we expose the log-end-offset (actually the high
> > > > > watermark) of the partition in the fetch response. However, this is
> > > > > not exposed to the consumer (either in the new ConsumerRecord class
> > > > > or the existing MessageAndMetadata class). If we did, then if you
> > > > > were to consume a record you can check that it has offsets up to the
> > > > > log-end offset. If it does then you would know for sure that you have
> > > > > consumed everything for that partition.
> > > > >
> > > > > > Yes, was looking at this initially, but as we have 100-150 writes
> > per
> > > > > > second, it could be a while before there is a pause long enough to
> > > > check
> > > > > it
> > > > > > has caught up. Even with the consumer timeout set to -1, it takes
> > > some
> > > > > time
> > > > > > to query the max offset values, which is still long enough for more
> > > > > > messages to arrive.
> > > > >
> > > > > Got it - thanks for clarifying.
> > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On 18 February 2015 at 23:16, Joel Koshy 
> > > wrote:
> > > > > >
> > > > > > > > You are also correct and perceptive to notice that if you check
> > > the
> > > > > end
> > > > > > > of
> > > > > > > > the log then begin consuming and read up to that point
> > compaction
> > > > may
> > > > > > > have
> > > > > > > > already kicked in (if the reading takes a while) and hence you
> > > > might
> > > > > have
> > > > > > > > an incomplete snapshot.
> > > > > > >
> > > > > > > Isn't it sufficient to just repeat the check at the end after
> > > reading
> > > > > > > the log and repeat until you are truly done? At least for th

Re: big cpu jump on producer in face of broker outage

2015-02-19 Thread Steven Wu
forgot to mention in case it matters
producer: 0.8.2-beta
broker: 0.8.1.1

On Thu, Feb 19, 2015 at 10:34 AM, Steven Wu  wrote:

> I think this is an issue caused by KAFKA-1788.
>
> I was trying to test producer resiliency to broker outage. In this
> experiment, I shutdown all brokers and see how producer behavior.
>
> Here are the observations
> 1) kafka producer can recover from kafka outage. i.e. send resumed after
> brokers came back
> 2) producer instance saw big cpu jump during outage. 28% -> 52% in one
> test.
>
> Note that I didn't observe cpu issue when new producer instance started
> with brokers outage. In this case, there are no messages accumulated in the
> buffer, because KafkaProducer constructor failed with DNS lookup for
> route53 name. when brokers came up, my wrapper re-created KafkaProducer
> object and recover from outage with sending messages.
>
> Here is the cpu graph for a running producer instance where broker outage
> happened in the middle of test run. it shows cpu problem.
>
> https://docs.google.com/drawings/d/1FdEg9-Rf_jbDZX0cC3iZ834c4m-5rqgK-41lSS6VudQ/edit?usp=sharing
>
> Here is the cpu graph for a new producer instance where broker outage
> happened before instance startup. cpu is good here.
>
> https://docs.google.com/drawings/d/1NmOdwp79DKHE7kJeskBm411ln6QczAMfmcWeijvZQRQ/edit?usp=sharing
>
> Note that producer is a 4-core m1.xlarge instance. x-axis is time, y-axis
> is cpu util.
>
> Thanks,
> Steven
>


big cpu jump on producer in face of broker outage

2015-02-19 Thread Steven Wu
I think this is an issue caused by KAFKA-1788.

I was trying to test producer resiliency to broker outage. In this
experiment, I shutdown all brokers and see how producer behavior.

Here are the observations
1) kafka producer can recover from kafka outage. i.e. send resumed after
brokers came back
2) producer instance saw big cpu jump during outage. 28% -> 52% in one
test.

Note that I didn't observe cpu issue when new producer instance started
with brokers outage. In this case, there are no messages accumulated in the
buffer, because KafkaProducer constructor failed with DNS lookup for
route53 name. when brokers came up, my wrapper re-created KafkaProducer
object and recover from outage with sending messages.

Here is the cpu graph for a running producer instance where broker outage
happened in the middle of test run. it shows cpu problem.
https://docs.google.com/drawings/d/1FdEg9-Rf_jbDZX0cC3iZ834c4m-5rqgK-41lSS6VudQ/edit?usp=sharing

Here is the cpu graph for a new producer instance where broker outage
happened before instance startup. cpu is good here.
https://docs.google.com/drawings/d/1NmOdwp79DKHE7kJeskBm411ln6QczAMfmcWeijvZQRQ/edit?usp=sharing

Note that producer is a 4-core m1.xlarge instance. x-axis is time, y-axis
is cpu util.

Thanks,
Steven


Re: Consuming a snapshot from log compacted topic

2015-02-19 Thread Will Funnell
> The log end offset is just the end of the committed messages in the log
> (the last thing the consumer has access to). It isn't the same as the
> cleaner point but is always later than it so it would work just as well.

Isn't this just roughly the same value as using c.getOffsetsBefore() with a
partitionRequestTime of -1?


Although its always later than the cleaner point, surely log compaction is
still an issue here.

If I consumed up to the log end offset and log compaction happens in
between, I would have missed some messages.


My thinking was that if you knew the log cleaner point, you could:

Make a note of the starting offset
Consume till end of log
Check my starting point is ahead of current cleaner point, otherwise loop.


I appreciate there is a chance I misunderstood your point.

On 19 February 2015 at 18:02, Jay Kreps  wrote:

> The log end offset is just the end of the committed messages in the log
> (the last thing the consumer has access to). It isn't the same as the
> cleaner point but is always later than it so it would work just as well.
>
> -Jay
>
> On Thu, Feb 19, 2015 at 8:54 AM, Will Funnell 
> wrote:
>
> > > I'm not sure if I misunderstood Jay's suggestion, but I think it is
> > > along the lines of: we expose the log-end-offset (actually the high
> > > watermark) of the partition in the fetch response. However, this is
> > > not exposed to the consumer (either in the new ConsumerRecord class
> > > or the existing MessageAndMetadata class). If we did, then if you
> > > were to consume a record you can check that it has offsets up to the
> > > log-end offset. If it does then you would know for sure that you have
> > > consumed everything for that partition
> >
> > To confirm then, the log-end-offset is the same as the cleaner point?
> >
> >
> >
> > On 19 February 2015 at 03:10, Jay Kreps  wrote:
> >
> > > Yeah I was thinking either along the lines Joel was suggesting or else
> > > adding a logEndOffset(TopicPartition) method or something like that. As
> > > Joel says the consumer actually has this information internally (we
> > return
> > > it with the fetch request) but doesn't expose it.
> > >
> > > -Jay
> > >
> > > On Wed, Feb 18, 2015 at 4:51 PM, Joel Koshy 
> wrote:
> > >
> > > > > > 2. Make the log end offset available more easily in the consumer.
> > > > >
> > > > > Was thinking something would need to be added in LogCleanerManager,
> > in
> > > > the
> > > > > updateCheckpoints function. Where would be best to publish the
> > > > information
> > > > > to make it more easily available, or would you just expose the
> > > > > offset-cleaner-checkpoint file as it is?
> > > > > Is it right you would also need to know which
> > offset-cleaner-checkpoint
> > > > > entry related to each active partition?
> > > >
> > > > I'm not sure if I misunderstood Jay's suggestion, but I think it is
> > > > along the lines of: we expose the log-end-offset (actually the high
> > > > watermark) of the partition in the fetch response. However, this is
> > > > not exposed to the consumer (either in the new ConsumerRecord class
> > > > or the existing MessageAndMetadata class). If we did, then if you
> > > > were to consume a record you can check that it has offsets up to the
> > > > log-end offset. If it does then you would know for sure that you have
> > > > consumed everything for that partition.
> > > >
> > > > > Yes, was looking at this initially, but as we have 100-150 writes
> per
> > > > > second, it could be a while before there is a pause long enough to
> > > check
> > > > it
> > > > > has caught up. Even with the consumer timeout set to -1, it takes
> > some
> > > > time
> > > > > to query the max offset values, which is still long enough for more
> > > > > messages to arrive.
> > > >
> > > > Got it - thanks for clarifying.
> > > >
> > > > >
> > > > >
> > > > >
> > > > > On 18 February 2015 at 23:16, Joel Koshy 
> > wrote:
> > > > >
> > > > > > > You are also correct and perceptive to notice that if you check
> > the
> > > > end
> > > > > > of
> > > > > > > the log then begin consuming and read up to that point
> compaction
> > > may
> > > > > > have
> > > > > > > already kicked in (if the reading takes a while) and hence you
> > > might
> > > > have
> > > > > > > an incomplete snapshot.
> > > > > >
> > > > > > Isn't it sufficient to just repeat the check at the end after
> > reading
> > > > > > the log and repeat until you are truly done? At least for the
> > > purposes
> > > > > > of a snapshot?
> > > > > >
> > > > > > On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps wrote:
> > > > > > > If you catch up off a compacted topic and keep consuming then
> you
> > > > will
> > > > > > > become consistent with the log.
> > > > > > >
> > > > > > > I think what you are saying is that you want to create a
> snapshot
> > > > from
> > > > > > the
> > > > > > > Kafka topic but NOT do continual reads after that point. For
> > > example
> > > > you
> > > > > > > might be creating a backup of the dat

Broker ID disappears in Zookeeper

2015-02-19 Thread Sybrandy, Casey
Hello,

We're having the following issue with Kafka and/or Zookeeper:
If a broker (id=1) is running, and you start another broker with id=1, the new 
broker will exit saying "A broker is already registered on the path 
/brokers/ids/1". However, I noticed when I query zookeeper /brokers/ids/1 
disappears
This behaviour doesn't make sense to us.  The concern is that if we 
accidentally start up multiple brokers with the same ID (automatic restarts), 
then we may end up with multiple brokers with the same ID running at the same 
time.

Thoughts?

Kafka: 0.8.2
Zookeeper: 3.4.5


Re: Consuming a snapshot from log compacted topic

2015-02-19 Thread Jay Kreps
The log end offset is just the end of the committed messages in the log
(the last thing the consumer has access to). It isn't the same as the
cleaner point but is always later than it so it would work just as well.

-Jay

On Thu, Feb 19, 2015 at 8:54 AM, Will Funnell  wrote:

> > I'm not sure if I misunderstood Jay's suggestion, but I think it is
> > along the lines of: we expose the log-end-offset (actually the high
> > watermark) of the partition in the fetch response. However, this is
> > not exposed to the consumer (either in the new ConsumerRecord class
> > or the existing MessageAndMetadata class). If we did, then if you
> > were to consume a record you can check that it has offsets up to the
> > log-end offset. If it does then you would know for sure that you have
> > consumed everything for that partition
>
> To confirm then, the log-end-offset is the same as the cleaner point?
>
>
>
> On 19 February 2015 at 03:10, Jay Kreps  wrote:
>
> > Yeah I was thinking either along the lines Joel was suggesting or else
> > adding a logEndOffset(TopicPartition) method or something like that. As
> > Joel says the consumer actually has this information internally (we
> return
> > it with the fetch request) but doesn't expose it.
> >
> > -Jay
> >
> > On Wed, Feb 18, 2015 at 4:51 PM, Joel Koshy  wrote:
> >
> > > > > 2. Make the log end offset available more easily in the consumer.
> > > >
> > > > Was thinking something would need to be added in LogCleanerManager,
> in
> > > the
> > > > updateCheckpoints function. Where would be best to publish the
> > > information
> > > > to make it more easily available, or would you just expose the
> > > > offset-cleaner-checkpoint file as it is?
> > > > Is it right you would also need to know which
> offset-cleaner-checkpoint
> > > > entry related to each active partition?
> > >
> > > I'm not sure if I misunderstood Jay's suggestion, but I think it is
> > > along the lines of: we expose the log-end-offset (actually the high
> > > watermark) of the partition in the fetch response. However, this is
> > > not exposed to the consumer (either in the new ConsumerRecord class
> > > or the existing MessageAndMetadata class). If we did, then if you
> > > were to consume a record you can check that it has offsets up to the
> > > log-end offset. If it does then you would know for sure that you have
> > > consumed everything for that partition.
> > >
> > > > Yes, was looking at this initially, but as we have 100-150 writes per
> > > > second, it could be a while before there is a pause long enough to
> > check
> > > it
> > > > has caught up. Even with the consumer timeout set to -1, it takes
> some
> > > time
> > > > to query the max offset values, which is still long enough for more
> > > > messages to arrive.
> > >
> > > Got it - thanks for clarifying.
> > >
> > > >
> > > >
> > > >
> > > > On 18 February 2015 at 23:16, Joel Koshy 
> wrote:
> > > >
> > > > > > You are also correct and perceptive to notice that if you check
> the
> > > end
> > > > > of
> > > > > > the log then begin consuming and read up to that point compaction
> > may
> > > > > have
> > > > > > already kicked in (if the reading takes a while) and hence you
> > might
> > > have
> > > > > > an incomplete snapshot.
> > > > >
> > > > > Isn't it sufficient to just repeat the check at the end after
> reading
> > > > > the log and repeat until you are truly done? At least for the
> > purposes
> > > > > of a snapshot?
> > > > >
> > > > > On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps wrote:
> > > > > > If you catch up off a compacted topic and keep consuming then you
> > > will
> > > > > > become consistent with the log.
> > > > > >
> > > > > > I think what you are saying is that you want to create a snapshot
> > > from
> > > > > the
> > > > > > Kafka topic but NOT do continual reads after that point. For
> > example
> > > you
> > > > > > might be creating a backup of the data to a file.
> > > > > >
> > > > > > I agree that this isn't as easy as it could be. As you say the
> only
> > > > > > solution we have is that timeout which doesn't differentiate
> > between
> > > GC
> > > > > > stall in your process and no more messages left so you would need
> > to
> > > tune
> > > > > > the timeout. This is admittedly kind of a hack.
> > > > > >
> > > > > > You are also correct and perceptive to notice that if you check
> the
> > > end
> > > > > of
> > > > > > the log then begin consuming and read up to that point compaction
> > may
> > > > > have
> > > > > > already kicked in (if the reading takes a while) and hence you
> > might
> > > have
> > > > > > an incomplete snapshot.
> > > > > >
> > > > > > I think there are two features we could add that would make this
> > > easier:
> > > > > > 1. Make the cleaner point configurable on a per-topic basis. This
> > > feature
> > > > > > would allow you to control how long the full log is retained and
> > when
> > > > > > compaction can kick in. This would give a configurable SLA for
> the

Re: Consuming a snapshot from log compacted topic

2015-02-19 Thread Will Funnell
> I'm not sure if I misunderstood Jay's suggestion, but I think it is
> along the lines of: we expose the log-end-offset (actually the high
> watermark) of the partition in the fetch response. However, this is
> not exposed to the consumer (either in the new ConsumerRecord class
> or the existing MessageAndMetadata class). If we did, then if you
> were to consume a record you can check that it has offsets up to the
> log-end offset. If it does then you would know for sure that you have
> consumed everything for that partition

To confirm then, the log-end-offset is the same as the cleaner point?



On 19 February 2015 at 03:10, Jay Kreps  wrote:

> Yeah I was thinking either along the lines Joel was suggesting or else
> adding a logEndOffset(TopicPartition) method or something like that. As
> Joel says the consumer actually has this information internally (we return
> it with the fetch request) but doesn't expose it.
>
> -Jay
>
> On Wed, Feb 18, 2015 at 4:51 PM, Joel Koshy  wrote:
>
> > > > 2. Make the log end offset available more easily in the consumer.
> > >
> > > Was thinking something would need to be added in LogCleanerManager, in
> > the
> > > updateCheckpoints function. Where would be best to publish the
> > information
> > > to make it more easily available, or would you just expose the
> > > offset-cleaner-checkpoint file as it is?
> > > Is it right you would also need to know which offset-cleaner-checkpoint
> > > entry related to each active partition?
> >
> > I'm not sure if I misunderstood Jay's suggestion, but I think it is
> > along the lines of: we expose the log-end-offset (actually the high
> > watermark) of the partition in the fetch response. However, this is
> > not exposed to the consumer (either in the new ConsumerRecord class
> > or the existing MessageAndMetadata class). If we did, then if you
> > were to consume a record you can check that it has offsets up to the
> > log-end offset. If it does then you would know for sure that you have
> > consumed everything for that partition.
> >
> > > Yes, was looking at this initially, but as we have 100-150 writes per
> > > second, it could be a while before there is a pause long enough to
> check
> > it
> > > has caught up. Even with the consumer timeout set to -1, it takes some
> > time
> > > to query the max offset values, which is still long enough for more
> > > messages to arrive.
> >
> > Got it - thanks for clarifying.
> >
> > >
> > >
> > >
> > > On 18 February 2015 at 23:16, Joel Koshy  wrote:
> > >
> > > > > You are also correct and perceptive to notice that if you check the
> > end
> > > > of
> > > > > the log then begin consuming and read up to that point compaction
> may
> > > > have
> > > > > already kicked in (if the reading takes a while) and hence you
> might
> > have
> > > > > an incomplete snapshot.
> > > >
> > > > Isn't it sufficient to just repeat the check at the end after reading
> > > > the log and repeat until you are truly done? At least for the
> purposes
> > > > of a snapshot?
> > > >
> > > > On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps wrote:
> > > > > If you catch up off a compacted topic and keep consuming then you
> > will
> > > > > become consistent with the log.
> > > > >
> > > > > I think what you are saying is that you want to create a snapshot
> > from
> > > > the
> > > > > Kafka topic but NOT do continual reads after that point. For
> example
> > you
> > > > > might be creating a backup of the data to a file.
> > > > >
> > > > > I agree that this isn't as easy as it could be. As you say the only
> > > > > solution we have is that timeout which doesn't differentiate
> between
> > GC
> > > > > stall in your process and no more messages left so you would need
> to
> > tune
> > > > > the timeout. This is admittedly kind of a hack.
> > > > >
> > > > > You are also correct and perceptive to notice that if you check the
> > end
> > > > of
> > > > > the log then begin consuming and read up to that point compaction
> may
> > > > have
> > > > > already kicked in (if the reading takes a while) and hence you
> might
> > have
> > > > > an incomplete snapshot.
> > > > >
> > > > > I think there are two features we could add that would make this
> > easier:
> > > > > 1. Make the cleaner point configurable on a per-topic basis. This
> > feature
> > > > > would allow you to control how long the full log is retained and
> when
> > > > > compaction can kick in. This would give a configurable SLA for the
> > reader
> > > > > process to catch up.
> > > > > 2. Make the log end offset available more easily in the consumer.
> > > > >
> > > > > -Jay
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Feb 18, 2015 at 10:18 AM, Will Funnell <
> > w.f.funn...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > We are currently using Kafka 0.8.1.1 with log compaction in order
> > to
> > > > > > provide streams of messages to our clients.
> > > > > >
> > > > > > As well as constantly consuming the stream, one of our use cases
> > is 

Re: Simple Consumer and offsets

2015-02-19 Thread Joel Koshy
I see - yes, you can use the SimpleConsumer for that. However, your
high-level consumers need to be shutdown while you do that (otherwise
they may auto-commit while you are resetting offsets).

Thanks,

Joel

On Thu, Feb 19, 2015 at 03:29:19PM +, Suren wrote:
> We are using the High Level Consumer API to interact with Kafka for our 
> normal use cases.
> 
> However, on consumer restart in the case of consumer failures, we want to be 
> able to manually
> reset offsets in certain situations.
> And ideally we'd like to use the same api in 0.8.1 and 0.8.2. :-)
> It looked like instantiating a SimpleConsumer just to reset offsets on 
> restart was a viable option, while continuing to use the High Level Consumer 
> for our normal operations. Not sure if there is a better way that is 
> compatible across 0.8.1 and 0.8.2.
> -Suren
>  
> 
>  On Thursday, February 19, 2015 10:25 AM, Joel Koshy 
>  wrote:
>
> 
>  Not sure what you mean by using the SimpleConsumer on failure
> recovery. Can you elaborate on this?
> 
> On Thu, Feb 19, 2015 at 03:04:47PM +, Suren wrote:
> > Haven't used either one now. Sounds like 0.8.2.1 will help.
> > We are using the High Level Consumer generally but are thinking to use the 
> > SimpleConsumer on failure recovery to set the offsets.
> > Is that the recommended approach for this use case?
> > Thanks.
> > -Suren
> >  
> > 
> >      On Thursday, February 19, 2015 9:40 AM, Joel Koshy 
> > wrote:
> >    
> > 
> >  Are you using it from Java or Scala? i.e., are you using the
> > javaapi.SimpleConsumer or kafka.consumer.SimpleConsumer
> > 
> > In 0.8.2 javaapi we explicitly set version 0 of the
> > OffsetCommitRequest/OffsetFetchRequest which means it will
> > commit/fetch to/from ZooKeeper only. If you use the scala API you can
> > create an OffsetCommitRequest with version set to 1 (which will allow
> > you to commit to Kafka).
> > 
> > Since we are doing an 0.8.2.1 release we will make the above more
> > consistent. i.e., you can create OffsetCommitRequests with version 1
> > even from the javaapi. I will be updating the documentation on this to
> > make it clearer.
> > 
> > Thanks,
> > 
> > Joel
> > 
> > On Thu, Feb 19, 2015 at 02:28:32PM +, Suren wrote:
> > > Joel,
> > > Looking at SimpleConsumer in the 0.8.2 code, it is using 
> > > OffsetCommitRequest and sending that over to a broker.
> > > Is the broker storing that in ZK?
> > > -Suren
> > >  
> > > 
> > >      On Tuesday, February 17, 2015 12:22 PM, Joel Koshy 
> > > wrote:
> > >    
> > > 
> > >  Hi Chris,
> > > 
> > > In 0.8.2, the simple consumer Java API supports committing/fetching
> > > offsets that are stored in ZooKeeper. You don't need to issue any
> > > ConsumerMetadataRequest for this. Unfortunately, the API currently
> > > does not support fetching offsets that are stored in Kafka.
> > > 
> > > Thanks,
> > > 
> > > Joel
> > > 
> > > On Mon, Feb 16, 2015 at 05:02:08PM -0500, Christopher Piggott wrote:
> > > > Hi,
> > > > 
> > > > I am still using 0.8.1.1 because of the CPU use concerns.
> > > > 
> > > > I'm confused about why the SimpleConsumer has:
> > > > 
> > > > OffsetCommitResponse commitOffsets(OffsetCommitRequest request)
> > > > 
> > > > and
> > > > 
> > > > OffsetFetchResponse fetchOffsets(OffsetFetchRequest request)
> > > > 
> > > > but no way that I can see to issue a ConsumerMetadataRequest, which is
> > > > what I think when restarting my consumers so that they can begin
> > > > working where they last left off (in the event that they were stopped
> > > > for a while then restarted some time later, and new messages had come
> > > > in).
> > > > 
> > > > The fetchOffsets() works on time, usually it looks like you send it
> > > > Earliest or Latest (beginning or end of what's currently in the
> > > > stream).
> > > > 
> > > > I realize the documentation says this:
> > > > 
> > > > 
> > > > > *Downsides of using SimpleConsumer*The SimpleConsumer does require a 
> > > > > significant amount of work not needed in the Consumer Groups:
> > > > >
> > > > >    1. You must keep track of the offsets in your application to know 
> > > > >where you left off consuming.
> > > > >
> > > > > But that's not really quite true ... not as long as commitOffsets() 
> > > > > has been provided.  It seems the SimpleConsumer provides you with a 
> > > > > solution to only one half of the problem of offset management.
> > > > 
> > > > Using some zookeeper python scripts I wrote I can see that the
> > > > commitOffsets() is doing its job and writing to
> > > > 
> > > > 
> > > > /consumers/myGroupId/offsets/myTopic/0
> > > > 
> > > > 
> > > > That has this value:
> > > > 
> > > > ('32757408', ZnodeStat(czxid=2211679, mzxid=14779964, 
> > > > ctime=1423777630972,
> > > > > mtime=1424122117397, version=12568262, cversion=0, aversion=0,
> > > > > ephemeralOwner=0, dataLength=8, numChildren=0, pzxid=2211679))
> > > > 
> > > > 
> > > > Now the question is just how to retrieve that - do I really have to
> > > > have my c

Re: Simple Consumer and offsets

2015-02-19 Thread Suren
We are using the High Level Consumer API to interact with Kafka for our normal 
use cases.

However, on consumer restart in the case of consumer failures, we want to be 
able to manually
reset offsets in certain situations.
And ideally we'd like to use the same api in 0.8.1 and 0.8.2. :-)
It looked like instantiating a SimpleConsumer just to reset offsets on restart 
was a viable option, while continuing to use the High Level Consumer for our 
normal operations. Not sure if there is a better way that is compatible across 
0.8.1 and 0.8.2.
-Suren
 

 On Thursday, February 19, 2015 10:25 AM, Joel Koshy  
wrote:
   

 Not sure what you mean by using the SimpleConsumer on failure
recovery. Can you elaborate on this?

On Thu, Feb 19, 2015 at 03:04:47PM +, Suren wrote:
> Haven't used either one now. Sounds like 0.8.2.1 will help.
> We are using the High Level Consumer generally but are thinking to use the 
> SimpleConsumer on failure recovery to set the offsets.
> Is that the recommended approach for this use case?
> Thanks.
> -Suren
>  
> 
>      On Thursday, February 19, 2015 9:40 AM, Joel Koshy  
>wrote:
>    
> 
>  Are you using it from Java or Scala? i.e., are you using the
> javaapi.SimpleConsumer or kafka.consumer.SimpleConsumer
> 
> In 0.8.2 javaapi we explicitly set version 0 of the
> OffsetCommitRequest/OffsetFetchRequest which means it will
> commit/fetch to/from ZooKeeper only. If you use the scala API you can
> create an OffsetCommitRequest with version set to 1 (which will allow
> you to commit to Kafka).
> 
> Since we are doing an 0.8.2.1 release we will make the above more
> consistent. i.e., you can create OffsetCommitRequests with version 1
> even from the javaapi. I will be updating the documentation on this to
> make it clearer.
> 
> Thanks,
> 
> Joel
> 
> On Thu, Feb 19, 2015 at 02:28:32PM +, Suren wrote:
> > Joel,
> > Looking at SimpleConsumer in the 0.8.2 code, it is using 
> > OffsetCommitRequest and sending that over to a broker.
> > Is the broker storing that in ZK?
> > -Suren
> >  
> > 
> >      On Tuesday, February 17, 2015 12:22 PM, Joel Koshy 
> > wrote:
> >    
> > 
> >  Hi Chris,
> > 
> > In 0.8.2, the simple consumer Java API supports committing/fetching
> > offsets that are stored in ZooKeeper. You don't need to issue any
> > ConsumerMetadataRequest for this. Unfortunately, the API currently
> > does not support fetching offsets that are stored in Kafka.
> > 
> > Thanks,
> > 
> > Joel
> > 
> > On Mon, Feb 16, 2015 at 05:02:08PM -0500, Christopher Piggott wrote:
> > > Hi,
> > > 
> > > I am still using 0.8.1.1 because of the CPU use concerns.
> > > 
> > > I'm confused about why the SimpleConsumer has:
> > > 
> > > OffsetCommitResponse commitOffsets(OffsetCommitRequest request)
> > > 
> > > and
> > > 
> > > OffsetFetchResponse fetchOffsets(OffsetFetchRequest request)
> > > 
> > > but no way that I can see to issue a ConsumerMetadataRequest, which is
> > > what I think when restarting my consumers so that they can begin
> > > working where they last left off (in the event that they were stopped
> > > for a while then restarted some time later, and new messages had come
> > > in).
> > > 
> > > The fetchOffsets() works on time, usually it looks like you send it
> > > Earliest or Latest (beginning or end of what's currently in the
> > > stream).
> > > 
> > > I realize the documentation says this:
> > > 
> > > 
> > > > *Downsides of using SimpleConsumer*The SimpleConsumer does require a 
> > > > significant amount of work not needed in the Consumer Groups:
> > > >
> > > >    1. You must keep track of the offsets in your application to know 
> > > >where you left off consuming.
> > > >
> > > > But that's not really quite true ... not as long as commitOffsets() has 
> > > > been provided.  It seems the SimpleConsumer provides you with a 
> > > > solution to only one half of the problem of offset management.
> > > 
> > > Using some zookeeper python scripts I wrote I can see that the
> > > commitOffsets() is doing its job and writing to
> > > 
> > > 
> > > /consumers/myGroupId/offsets/myTopic/0
> > > 
> > > 
> > > That has this value:
> > > 
> > > ('32757408', ZnodeStat(czxid=2211679, mzxid=14779964, ctime=1423777630972,
> > > > mtime=1424122117397, version=12568262, cversion=0, aversion=0,
> > > > ephemeralOwner=0, dataLength=8, numChildren=0, pzxid=2211679))
> > > 
> > > 
> > > Now the question is just how to retrieve that - do I really have to
> > > have my client connect to ZK directly?  If that's the case, future
> > > upgrades would break (e.g. 0.8.2 having its own storage for commit
> > > watermarks).
> > > 
> > > 
> > > What was the intent here, and what's the advice on how to proceed
> > > being that 0.8.2 is in an iffy state right now?
> > > 
> > > 
> > > --Chris
> > 
> > 
> > 
> >    
> 
> 
> 
>    



   

Re: Simple Consumer and offsets

2015-02-19 Thread Joel Koshy
Not sure what you mean by using the SimpleConsumer on failure
recovery. Can you elaborate on this?

On Thu, Feb 19, 2015 at 03:04:47PM +, Suren wrote:
> Haven't used either one now. Sounds like 0.8.2.1 will help.
> We are using the High Level Consumer generally but are thinking to use the 
> SimpleConsumer on failure recovery to set the offsets.
> Is that the recommended approach for this use case?
> Thanks.
> -Suren
>  
> 
>  On Thursday, February 19, 2015 9:40 AM, Joel Koshy  
> wrote:
>
> 
>  Are you using it from Java or Scala? i.e., are you using the
> javaapi.SimpleConsumer or kafka.consumer.SimpleConsumer
> 
> In 0.8.2 javaapi we explicitly set version 0 of the
> OffsetCommitRequest/OffsetFetchRequest which means it will
> commit/fetch to/from ZooKeeper only. If you use the scala API you can
> create an OffsetCommitRequest with version set to 1 (which will allow
> you to commit to Kafka).
> 
> Since we are doing an 0.8.2.1 release we will make the above more
> consistent. i.e., you can create OffsetCommitRequests with version 1
> even from the javaapi. I will be updating the documentation on this to
> make it clearer.
> 
> Thanks,
> 
> Joel
> 
> On Thu, Feb 19, 2015 at 02:28:32PM +, Suren wrote:
> > Joel,
> > Looking at SimpleConsumer in the 0.8.2 code, it is using 
> > OffsetCommitRequest and sending that over to a broker.
> > Is the broker storing that in ZK?
> > -Suren
> >  
> > 
> >      On Tuesday, February 17, 2015 12:22 PM, Joel Koshy 
> > wrote:
> >    
> > 
> >  Hi Chris,
> > 
> > In 0.8.2, the simple consumer Java API supports committing/fetching
> > offsets that are stored in ZooKeeper. You don't need to issue any
> > ConsumerMetadataRequest for this. Unfortunately, the API currently
> > does not support fetching offsets that are stored in Kafka.
> > 
> > Thanks,
> > 
> > Joel
> > 
> > On Mon, Feb 16, 2015 at 05:02:08PM -0500, Christopher Piggott wrote:
> > > Hi,
> > > 
> > > I am still using 0.8.1.1 because of the CPU use concerns.
> > > 
> > > I'm confused about why the SimpleConsumer has:
> > > 
> > > OffsetCommitResponse commitOffsets(OffsetCommitRequest request)
> > > 
> > > and
> > > 
> > > OffsetFetchResponse fetchOffsets(OffsetFetchRequest request)
> > > 
> > > but no way that I can see to issue a ConsumerMetadataRequest, which is
> > > what I think when restarting my consumers so that they can begin
> > > working where they last left off (in the event that they were stopped
> > > for a while then restarted some time later, and new messages had come
> > > in).
> > > 
> > > The fetchOffsets() works on time, usually it looks like you send it
> > > Earliest or Latest (beginning or end of what's currently in the
> > > stream).
> > > 
> > > I realize the documentation says this:
> > > 
> > > 
> > > > *Downsides of using SimpleConsumer*The SimpleConsumer does require a 
> > > > significant amount of work not needed in the Consumer Groups:
> > > >
> > > >    1. You must keep track of the offsets in your application to know 
> > > >where you left off consuming.
> > > >
> > > > But that's not really quite true ... not as long as commitOffsets() has 
> > > > been provided.  It seems the SimpleConsumer provides you with a 
> > > > solution to only one half of the problem of offset management.
> > > 
> > > Using some zookeeper python scripts I wrote I can see that the
> > > commitOffsets() is doing its job and writing to
> > > 
> > > 
> > > /consumers/myGroupId/offsets/myTopic/0
> > > 
> > > 
> > > That has this value:
> > > 
> > > ('32757408', ZnodeStat(czxid=2211679, mzxid=14779964, ctime=1423777630972,
> > > > mtime=1424122117397, version=12568262, cversion=0, aversion=0,
> > > > ephemeralOwner=0, dataLength=8, numChildren=0, pzxid=2211679))
> > > 
> > > 
> > > Now the question is just how to retrieve that - do I really have to
> > > have my client connect to ZK directly?  If that's the case, future
> > > upgrades would break (e.g. 0.8.2 having its own storage for commit
> > > watermarks).
> > > 
> > > 
> > > What was the intent here, and what's the advice on how to proceed
> > > being that 0.8.2 is in an iffy state right now?
> > > 
> > > 
> > > --Chris
> > 
> > 
> > 
> >    
> 
> 
> 
>



Re: Simple Consumer and offsets

2015-02-19 Thread Suren
Haven't used either one now. Sounds like 0.8.2.1 will help.
We are using the High Level Consumer generally but are thinking to use the 
SimpleConsumer on failure recovery to set the offsets.
Is that the recommended approach for this use case?
Thanks.
-Suren
 

 On Thursday, February 19, 2015 9:40 AM, Joel Koshy  
wrote:
   

 Are you using it from Java or Scala? i.e., are you using the
javaapi.SimpleConsumer or kafka.consumer.SimpleConsumer

In 0.8.2 javaapi we explicitly set version 0 of the
OffsetCommitRequest/OffsetFetchRequest which means it will
commit/fetch to/from ZooKeeper only. If you use the scala API you can
create an OffsetCommitRequest with version set to 1 (which will allow
you to commit to Kafka).

Since we are doing an 0.8.2.1 release we will make the above more
consistent. i.e., you can create OffsetCommitRequests with version 1
even from the javaapi. I will be updating the documentation on this to
make it clearer.

Thanks,

Joel

On Thu, Feb 19, 2015 at 02:28:32PM +, Suren wrote:
> Joel,
> Looking at SimpleConsumer in the 0.8.2 code, it is using OffsetCommitRequest 
> and sending that over to a broker.
> Is the broker storing that in ZK?
> -Suren
>  
> 
>      On Tuesday, February 17, 2015 12:22 PM, Joel Koshy  
>wrote:
>    
> 
>  Hi Chris,
> 
> In 0.8.2, the simple consumer Java API supports committing/fetching
> offsets that are stored in ZooKeeper. You don't need to issue any
> ConsumerMetadataRequest for this. Unfortunately, the API currently
> does not support fetching offsets that are stored in Kafka.
> 
> Thanks,
> 
> Joel
> 
> On Mon, Feb 16, 2015 at 05:02:08PM -0500, Christopher Piggott wrote:
> > Hi,
> > 
> > I am still using 0.8.1.1 because of the CPU use concerns.
> > 
> > I'm confused about why the SimpleConsumer has:
> > 
> > OffsetCommitResponse commitOffsets(OffsetCommitRequest request)
> > 
> > and
> > 
> > OffsetFetchResponse fetchOffsets(OffsetFetchRequest request)
> > 
> > but no way that I can see to issue a ConsumerMetadataRequest, which is
> > what I think when restarting my consumers so that they can begin
> > working where they last left off (in the event that they were stopped
> > for a while then restarted some time later, and new messages had come
> > in).
> > 
> > The fetchOffsets() works on time, usually it looks like you send it
> > Earliest or Latest (beginning or end of what's currently in the
> > stream).
> > 
> > I realize the documentation says this:
> > 
> > 
> > > *Downsides of using SimpleConsumer*The SimpleConsumer does require a 
> > > significant amount of work not needed in the Consumer Groups:
> > >
> > >    1. You must keep track of the offsets in your application to know 
> > >where you left off consuming.
> > >
> > > But that's not really quite true ... not as long as commitOffsets() has 
> > > been provided.  It seems the SimpleConsumer provides you with a solution 
> > > to only one half of the problem of offset management.
> > 
> > Using some zookeeper python scripts I wrote I can see that the
> > commitOffsets() is doing its job and writing to
> > 
> > 
> > /consumers/myGroupId/offsets/myTopic/0
> > 
> > 
> > That has this value:
> > 
> > ('32757408', ZnodeStat(czxid=2211679, mzxid=14779964, ctime=1423777630972,
> > > mtime=1424122117397, version=12568262, cversion=0, aversion=0,
> > > ephemeralOwner=0, dataLength=8, numChildren=0, pzxid=2211679))
> > 
> > 
> > Now the question is just how to retrieve that - do I really have to
> > have my client connect to ZK directly?  If that's the case, future
> > upgrades would break (e.g. 0.8.2 having its own storage for commit
> > watermarks).
> > 
> > 
> > What was the intent here, and what's the advice on how to proceed
> > being that 0.8.2 is in an iffy state right now?
> > 
> > 
> > --Chris
> 
> 
> 
>    



   

Re: Simple Consumer and offsets

2015-02-19 Thread Joel Koshy
Are you using it from Java or Scala? i.e., are you using the
javaapi.SimpleConsumer or kafka.consumer.SimpleConsumer

In 0.8.2 javaapi we explicitly set version 0 of the
OffsetCommitRequest/OffsetFetchRequest which means it will
commit/fetch to/from ZooKeeper only. If you use the scala API you can
create an OffsetCommitRequest with version set to 1 (which will allow
you to commit to Kafka).

Since we are doing an 0.8.2.1 release we will make the above more
consistent. i.e., you can create OffsetCommitRequests with version 1
even from the javaapi. I will be updating the documentation on this to
make it clearer.

Thanks,

Joel

On Thu, Feb 19, 2015 at 02:28:32PM +, Suren wrote:
> Joel,
> Looking at SimpleConsumer in the 0.8.2 code, it is using OffsetCommitRequest 
> and sending that over to a broker.
> Is the broker storing that in ZK?
> -Suren
>  
> 
>  On Tuesday, February 17, 2015 12:22 PM, Joel Koshy  
> wrote:
>
> 
>  Hi Chris,
> 
> In 0.8.2, the simple consumer Java API supports committing/fetching
> offsets that are stored in ZooKeeper. You don't need to issue any
> ConsumerMetadataRequest for this. Unfortunately, the API currently
> does not support fetching offsets that are stored in Kafka.
> 
> Thanks,
> 
> Joel
> 
> On Mon, Feb 16, 2015 at 05:02:08PM -0500, Christopher Piggott wrote:
> > Hi,
> > 
> > I am still using 0.8.1.1 because of the CPU use concerns.
> > 
> > I'm confused about why the SimpleConsumer has:
> > 
> > OffsetCommitResponse commitOffsets(OffsetCommitRequest request)
> > 
> > and
> > 
> > OffsetFetchResponse fetchOffsets(OffsetFetchRequest request)
> > 
> > but no way that I can see to issue a ConsumerMetadataRequest, which is
> > what I think when restarting my consumers so that they can begin
> > working where they last left off (in the event that they were stopped
> > for a while then restarted some time later, and new messages had come
> > in).
> > 
> > The fetchOffsets() works on time, usually it looks like you send it
> > Earliest or Latest (beginning or end of what's currently in the
> > stream).
> > 
> > I realize the documentation says this:
> > 
> > 
> > > *Downsides of using SimpleConsumer*The SimpleConsumer does require a 
> > > significant amount of work not needed in the Consumer Groups:
> > >
> > >    1. You must keep track of the offsets in your application to know 
> > >where you left off consuming.
> > >
> > > But that's not really quite true ... not as long as commitOffsets() has 
> > > been provided.  It seems the SimpleConsumer provides you with a solution 
> > > to only one half of the problem of offset management.
> > 
> > Using some zookeeper python scripts I wrote I can see that the
> > commitOffsets() is doing its job and writing to
> > 
> > 
> > /consumers/myGroupId/offsets/myTopic/0
> > 
> > 
> > That has this value:
> > 
> > ('32757408', ZnodeStat(czxid=2211679, mzxid=14779964, ctime=1423777630972,
> > > mtime=1424122117397, version=12568262, cversion=0, aversion=0,
> > > ephemeralOwner=0, dataLength=8, numChildren=0, pzxid=2211679))
> > 
> > 
> > Now the question is just how to retrieve that - do I really have to
> > have my client connect to ZK directly?  If that's the case, future
> > upgrades would break (e.g. 0.8.2 having its own storage for commit
> > watermarks).
> > 
> > 
> > What was the intent here, and what's the advice on how to proceed
> > being that 0.8.2 is in an iffy state right now?
> > 
> > 
> > --Chris
> 
> 
> 
>



Re: Simple Consumer and offsets

2015-02-19 Thread Suren
Joel,
Looking at SimpleConsumer in the 0.8.2 code, it is using OffsetCommitRequest 
and sending that over to a broker.
Is the broker storing that in ZK?
-Suren
 

 On Tuesday, February 17, 2015 12:22 PM, Joel Koshy  
wrote:
   

 Hi Chris,

In 0.8.2, the simple consumer Java API supports committing/fetching
offsets that are stored in ZooKeeper. You don't need to issue any
ConsumerMetadataRequest for this. Unfortunately, the API currently
does not support fetching offsets that are stored in Kafka.

Thanks,

Joel

On Mon, Feb 16, 2015 at 05:02:08PM -0500, Christopher Piggott wrote:
> Hi,
> 
> I am still using 0.8.1.1 because of the CPU use concerns.
> 
> I'm confused about why the SimpleConsumer has:
> 
> OffsetCommitResponse commitOffsets(OffsetCommitRequest request)
> 
> and
> 
> OffsetFetchResponse fetchOffsets(OffsetFetchRequest request)
> 
> but no way that I can see to issue a ConsumerMetadataRequest, which is
> what I think when restarting my consumers so that they can begin
> working where they last left off (in the event that they were stopped
> for a while then restarted some time later, and new messages had come
> in).
> 
> The fetchOffsets() works on time, usually it looks like you send it
> Earliest or Latest (beginning or end of what's currently in the
> stream).
> 
> I realize the documentation says this:
> 
> 
> > *Downsides of using SimpleConsumer*The SimpleConsumer does require a 
> > significant amount of work not needed in the Consumer Groups:
> >
> >    1. You must keep track of the offsets in your application to know where 
> >you left off consuming.
> >
> > But that's not really quite true ... not as long as commitOffsets() has 
> > been provided.  It seems the SimpleConsumer provides you with a solution to 
> > only one half of the problem of offset management.
> 
> Using some zookeeper python scripts I wrote I can see that the
> commitOffsets() is doing its job and writing to
> 
> 
> /consumers/myGroupId/offsets/myTopic/0
> 
> 
> That has this value:
> 
> ('32757408', ZnodeStat(czxid=2211679, mzxid=14779964, ctime=1423777630972,
> > mtime=1424122117397, version=12568262, cversion=0, aversion=0,
> > ephemeralOwner=0, dataLength=8, numChildren=0, pzxid=2211679))
> 
> 
> Now the question is just how to retrieve that - do I really have to
> have my client connect to ZK directly?  If that's the case, future
> upgrades would break (e.g. 0.8.2 having its own storage for commit
> watermarks).
> 
> 
> What was the intent here, and what's the advice on how to proceed
> being that 0.8.2 is in an iffy state right now?
> 
> 
> --Chris



   

Re: Custom partitioner in kafka-0.8.2.0

2015-02-19 Thread sunil kalva
thanks mani for quick response, sorry some how i missed this javadoc :)

t
SunilKalva

On Thu, Feb 19, 2015 at 6:14 PM, Manikumar Reddy 
wrote:

> Hi,
>
> In new producer, we can specify the partition number as part of
> ProducerRecord.
>
> From javadocs :
> *"If a valid partition number is specified that partition will be used when
> sending the record. If no partition is specified but a key is present a
> partition will be chosen using a hash of the key. If neither key nor
> partition is present a partition will be assigned in a round-robin fashion.
> "*
>
>
> http://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html
>
>
> ManiKumar
>
> On Thu, Feb 19, 2015 at 6:05 PM, sunil kalva 
> wrote:
>
> > Hi
> > I could not find a way to customize "Partitioner" class in new
> KafaProducer
> > class, is it intentional ?
> >
> > tx
> > SunilKalva
> >
>


Re: Custom partitioner in kafka-0.8.2.0

2015-02-19 Thread Manikumar Reddy
Hi,

In new producer, we can specify the partition number as part of
ProducerRecord.

>From javadocs :
*"If a valid partition number is specified that partition will be used when
sending the record. If no partition is specified but a key is present a
partition will be chosen using a hash of the key. If neither key nor
partition is present a partition will be assigned in a round-robin fashion.
"*

http://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html


ManiKumar

On Thu, Feb 19, 2015 at 6:05 PM, sunil kalva  wrote:

> Hi
> I could not find a way to customize "Partitioner" class in new KafaProducer
> class, is it intentional ?
>
> tx
> SunilKalva
>


Re: Custom partitioner in kafka-0.8.2.0

2015-02-19 Thread sunil kalva
> Hi
> I could not find a way to customize "Partitioner" class in new
> KafaProducer class, is it intentional ?
>
> tx
>
SunilKalva
>


Custom partitioner in kafka-0.8.2.0

2015-02-19 Thread sunil kalva
Hi
I could not find a way to customize "Partitioner" class in new KafaProducer
class, is it intentional ?

tx
SunilKalva