RE: Using OffsetRequest to get the Head and Tail of a partition in a single request.

2014-11-18 Thread Thunder Stumpges
I feared that was the case. Well they were certainly right when they said "this 
API is slightly funky"

Thanks for the quick confirmation.
Thunder


-Original Message-
From: Todd Palino [bonk...@gmail.com]
Received: Tuesday, 18 Nov 2014, 9:43PM
To: users@kafka.apache.org [users@kafka.apache.org]
Subject: Re: Using OffsetRequest to get the Head and Tail of a partition in a 
single request.

It is not possible, due to how the results for the offset request are stored 
within the broker and API (as a map). You will have to do 2 requests to get 
both offsets.

-Todd


> On Nov 18, 2014, at 8:52 PM, Thunder Stumpges  wrote:
>
> Hey all,
>
> We are working on a .net client, and I have a question about the 
> OffsetRequest api 
> (https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI)
>
> It seems to indicate that you would always get the "log end offset" (tail 
> offset) regardless of the input request:
>
>"The response contains the starting offset of each segment for 
> the requested partition as well as the "log end offset" i.e. the offset of 
> the next message that would be appended to the given partition."
>
> However from inspecting the code, and experimenting with the API, that 
> doesn't seem to be the case. We cannot seem to fetch both the head (earliest 
> offset, based on Time specified as -2L ) and Tail which we would have 
> expected based on the comment above in the protocol docs.
>
> We can get either the earliest OR latest, but not both. We attempted to pass 
> two entries (the Array of [Partition,Time,MaxNumberOfOffsets] per the 
> protocol) for the same partition ID, one with a -1L and one with a -2L, 
> however we get only one result back, and from the code in KafkaApis.scala 
> handleOffsetRequest, it seems like there could never be multiple requests for 
> the same topic/partition.
>
> Does anyone know if this should be possible, or if there is a work-around for 
> this? Also, should something like this go to the dev group instead?
>
> Thanks
> Thunder
>


Re: ISR shrink to 0?

2014-11-18 Thread Jason Rosenberg
Ok,

Makes sense.  But if the node is not actually healthy (and underwent a hard
crash) it would likely not be able to avoid an 'unclean' restart.what
happens if unclean leader election is disabled, but there are no 'clean'
partitions available?

Jason

On Wed, Nov 19, 2014 at 12:40 AM, Jun Rao  wrote:

> Yes, we will preserve the last replica in ISR. This way, we know which
> replica has all committed messages and can wait for it to come back as the
> leader, if unclean leader election is disabled.
>
> Thanks,
>
> Jun
>
> On Mon, Nov 17, 2014 at 11:06 AM, Jason Rosenberg 
> wrote:
>
> > We have had 2 nodes in a 4 node cluster die this weekend, sadly.
> > Fortunately there was no critical data on these machines yet.
> >
> > The cluster is running 0.8.1.1, and using replication factor of 2 for 2
> > topics, each with 20 partitions.
> >
> > For sake of discussion, assume that nodes A and B are still up, and C
> and D
> > are now down.
> >
> > As expected, partitions that had one replica on a good host (A or B) and
> > one on a bad node (C or D), had their ISR shrink to just 1 node (A or B).
> >
> > Roughly 1/6 of the partitions had their 2 replicas on the 2 bad nodes, C
> > and D.  For these, I was expecting the ISR to show up as empty, and the
> > partition unavailable.
> >
> > However, that's not what I'm seeing.  When running TopicCommand
> --describe,
> > I see that the ISR still shows 1 replica, on node D (D was the second
> node
> > to go down).
> >
> > And, producers are still periodically trying to produce to node D (but
> > failing and retrying to one of the good nodes).
> >
> > So, it seems the cluster's meta data is still thinking that node D is up
> > and serving the partitions that were only replicated on C and D.
>  However,
> > for partitions that were on A and D, or B and D, D is not shown as being
> in
> > the ISR.
> >
> > Is this correct?  Should the cluster continue showing the last node to
> have
> > been alive for a partition as still in the ISR?
> >
> > Jason
> >
>


Re: How to recover from ConsumerRebalanceFailedException ?

2014-11-18 Thread Bhavesh Mistry
Hi Jun,

ZK cluster are up and running.  What is best way to programmatically
recover and I would try to exponential recovery process which I am willing
to implement.So do you think monitoring  "ZkClient-EventThread
*"
 thread status will be enough to indicate source thread is dead and
therefore start exponential reconnect process ?

Can you guys at least for new consumer api (0.9.0) provide a call back
method or notification the consumer is died and reason for it ?


Thanks,
Bhavesh



On Tue, Nov 18, 2014 at 9:34 PM, Jun Rao  wrote:

> Is your ZK service alive at that point? If not, you just need to monitor
> the ZK server properly.
>
> Thanks,
>
> Jun
>
> On Mon, Nov 17, 2014 at 2:30 PM, Bhavesh Mistry <
> mistry.p.bhav...@gmail.com>
> wrote:
>
> > Hi Kafka Team,
> >
> >
> > I get following exception due to ZK/Network issues intermittently.  How
> do
> > I recover from consumer thread dying *programmatically* and restart
> source
> > because we have alerts that due to this error we have partition OWNERSHIP
> > is *none* ?  Please let me know how to restart source and detect consumer
> > thread died and need to be restarted ?
> >
> >
> >
> > 17 Nov 2014 04:29:41,180 ERROR [
> > ZkClient-EventThread-65-dare-msgq00.sv.walmartlabs.com:9091,
> > dare-msgq01.sv.walmartlabs.com:9091,dare-msgq02.sv.walmartlabs.com:9091]
> > (org.I0Itec.zkclient.ZkEventThread.run:77)  - Error handling event
> > ZkEvent[New session event sent to
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener@552c5ea8
> > ]
> > kafka.common.ConsumerRebalanceFailedException:
> >
> >
> mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68
> > can't rebalance after 8 retries
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:626)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481)
> > at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
> > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> >
> >
> >
> >
> >
> > ZK Connection Issues:
> >
> > java.net.SocketException: Transport endpoint is not connected
> > at sun.nio.ch.SocketChannelImpl.shutdown(Native Method)
> > at
> > sun.nio.ch.SocketChannelImpl.shutdownInput(SocketChannelImpl.java:633)
> > at sun.nio.ch.SocketAdaptor.shutdownInput(SocketAdaptor.java:360)
> > at
> > org.apache.zookeeper.ClientCnxn$SendThread.cleanup(ClientCnxn.java:1205)
> > at
> > org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1170)
> >
> >
> >
> >
> > at
> > org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
> > at
> > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
> > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
> > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
> > at kafka.utils.ZkUtils$.readData(ZkUtils.scala:449)
> > at
> > kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:61)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:630)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:601)
> > at
> scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:595)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592)
> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:591)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481)
> > at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
> > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> > Caused by: org.apache.zookeeper.KeeperException$NoNodeException:
> > KeeperErrorCode = NoNode for
> >
> >
> /consumers/mupd_statsd_logmon_metric_events18/ids/mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68
> > at
> > org.apache.zookeeper.KeeperException.create(K

Re: Using OffsetRequest to get the Head and Tail of a partition in a single request.

2014-11-18 Thread Todd Palino
It is not possible, due to how the results for the offset request are stored 
within the broker and API (as a map). You will have to do 2 requests to get 
both offsets. 

-Todd


> On Nov 18, 2014, at 8:52 PM, Thunder Stumpges  wrote:
> 
> Hey all,
> 
> We are working on a .net client, and I have a question about the 
> OffsetRequest api 
> (https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI)
> 
> It seems to indicate that you would always get the "log end offset" (tail 
> offset) regardless of the input request:
> 
>"The response contains the starting offset of each segment for 
> the requested partition as well as the "log end offset" i.e. the offset of 
> the next message that would be appended to the given partition."
> 
> However from inspecting the code, and experimenting with the API, that 
> doesn't seem to be the case. We cannot seem to fetch both the head (earliest 
> offset, based on Time specified as -2L ) and Tail which we would have 
> expected based on the comment above in the protocol docs.
> 
> We can get either the earliest OR latest, but not both. We attempted to pass 
> two entries (the Array of [Partition,Time,MaxNumberOfOffsets] per the 
> protocol) for the same partition ID, one with a -1L and one with a -2L, 
> however we get only one result back, and from the code in KafkaApis.scala 
> handleOffsetRequest, it seems like there could never be multiple requests for 
> the same topic/partition.
> 
> Does anyone know if this should be possible, or if there is a work-around for 
> this? Also, should something like this go to the dev group instead?
> 
> Thanks
> Thunder
> 


Re: ISR shrink to 0?

2014-11-18 Thread Jun Rao
Yes, we will preserve the last replica in ISR. This way, we know which
replica has all committed messages and can wait for it to come back as the
leader, if unclean leader election is disabled.

Thanks,

Jun

On Mon, Nov 17, 2014 at 11:06 AM, Jason Rosenberg  wrote:

> We have had 2 nodes in a 4 node cluster die this weekend, sadly.
> Fortunately there was no critical data on these machines yet.
>
> The cluster is running 0.8.1.1, and using replication factor of 2 for 2
> topics, each with 20 partitions.
>
> For sake of discussion, assume that nodes A and B are still up, and C and D
> are now down.
>
> As expected, partitions that had one replica on a good host (A or B) and
> one on a bad node (C or D), had their ISR shrink to just 1 node (A or B).
>
> Roughly 1/6 of the partitions had their 2 replicas on the 2 bad nodes, C
> and D.  For these, I was expecting the ISR to show up as empty, and the
> partition unavailable.
>
> However, that's not what I'm seeing.  When running TopicCommand --describe,
> I see that the ISR still shows 1 replica, on node D (D was the second node
> to go down).
>
> And, producers are still periodically trying to produce to node D (but
> failing and retrying to one of the good nodes).
>
> So, it seems the cluster's meta data is still thinking that node D is up
> and serving the partitions that were only replicated on C and D.   However,
> for partitions that were on A and D, or B and D, D is not shown as being in
> the ISR.
>
> Is this correct?  Should the cluster continue showing the last node to have
> been alive for a partition as still in the ISR?
>
> Jason
>


Re: selecting java producer (0.8.2 or 0.8.1.1?)

2014-11-18 Thread Jun Rao
In the 0.8.2 final release, the java doc will be updated properly.
See KAFKA-1769

Thanks,

Jun

On Tue, Nov 18, 2014 at 8:46 PM, Jason Rosenberg  wrote:

> So that java-doc link includes a new KafkaConsumer (but it seems in other
> threads that's not being referred to as ready for use until 0.9, is that
> right?).Is there a way to know which parts of that javadoc are
> considered beta-ready in 0.8.2 and which are not?
>
> Jason
>
> On Tue, Nov 18, 2014 at 11:03 PM, Joe Stein  wrote:
>
> > 0.8.2-beta java doc
> > https://archive.apache.org/dist/kafka/0.8.2-beta/java-doc/
> >
> > /***
> > Joe Stein
> > Founder, Principal Consultant
> > Big Data Open Source Security LLC
> > http://www.stealth.ly
> > Twitter: @allthingshadoop
> > /
> > On Nov 18, 2014 10:33 PM, "Jason Rosenberg"  wrote:
> >
> > > Hi Jun,
> > >
> > > Is this the official java doc for the new producer (www.trieuvan.com)?
> > > I'm
> > > not seeing any links to it (or any documentation) on the apache kafka
> > site
> > > (am I overlooking it)?
> > >
> > > Should there be a link to it in the 0.8.2-beta documentation page?
> > >
> > > Jason
> > >
> > > On Tue, Nov 18, 2014 at 7:23 PM, Jun Rao  wrote:
> > >
> > > > The new producer in 0.8.2 is considered stable, although it's
> > relatively
> > > > new. Compared with the old producer, it has the following features.
> > > >
> > > > 1. Use non-blocking socket to send requests to the broker. So uses
> > fewer
> > > > threads and have better throughput.
> > > > 2. Bound the memory consumption.
> > > > 3. Support a callback when sending a request asynchronously.
> > > > 4. Returns the offset for each produced message.
> > > >
> > > > You can look at the example in the java doc.
> > > >
> > > >
> > >
> >
> http://www.trieuvan.com/apache/kafka/0.8.2-beta/java-doc/org/apache/kafka/clients/producer/KafkaProducer.html
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Mon, Nov 17, 2014 at 4:46 AM, Shlomi Hazan 
> > wrote:
> > > >
> > > > > Hi,
> > > > > I need to make a choice and I can't get a full picture on the
> > > differences
> > > > > between the two.
> > > > > E.g.:
> > > > > Are both producers async capable to the same extent?
> > > > > Is the new producer stable for production?
> > > > > Is there some usage example for the new producer?
> > > > > What are the tradeoffs using one or another?
> > > > > 10x,
> > > > > Shlomi
> > > > >
> > > >
> > >
> >
>


Re: How to recover from ConsumerRebalanceFailedException ?

2014-11-18 Thread Jun Rao
Is your ZK service alive at that point? If not, you just need to monitor
the ZK server properly.

Thanks,

Jun

On Mon, Nov 17, 2014 at 2:30 PM, Bhavesh Mistry 
wrote:

> Hi Kafka Team,
>
>
> I get following exception due to ZK/Network issues intermittently.  How do
> I recover from consumer thread dying *programmatically* and restart source
> because we have alerts that due to this error we have partition OWNERSHIP
> is *none* ?  Please let me know how to restart source and detect consumer
> thread died and need to be restarted ?
>
>
>
> 17 Nov 2014 04:29:41,180 ERROR [
> ZkClient-EventThread-65-dare-msgq00.sv.walmartlabs.com:9091,
> dare-msgq01.sv.walmartlabs.com:9091,dare-msgq02.sv.walmartlabs.com:9091]
> (org.I0Itec.zkclient.ZkEventThread.run:77)  - Error handling event
> ZkEvent[New session event sent to
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener@552c5ea8
> ]
> kafka.common.ConsumerRebalanceFailedException:
>
> mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68
> can't rebalance after 8 retries
> at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:626)
> at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481)
> at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
>
>
>
>
>
> ZK Connection Issues:
>
> java.net.SocketException: Transport endpoint is not connected
> at sun.nio.ch.SocketChannelImpl.shutdown(Native Method)
> at
> sun.nio.ch.SocketChannelImpl.shutdownInput(SocketChannelImpl.java:633)
> at sun.nio.ch.SocketAdaptor.shutdownInput(SocketAdaptor.java:360)
> at
> org.apache.zookeeper.ClientCnxn$SendThread.cleanup(ClientCnxn.java:1205)
> at
> org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1170)
>
>
>
>
> at
> org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
> at
> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
> at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
> at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
> at kafka.utils.ZkUtils$.readData(ZkUtils.scala:449)
> at
> kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:61)
> at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:630)
> at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:601)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
> at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:595)
> at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592)
> at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:591)
> at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481)
> at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> Caused by: org.apache.zookeeper.KeeperException$NoNodeException:
> KeeperErrorCode = NoNode for
>
> /consumers/mupd_statsd_logmon_metric_events18/ids/mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68
> at
> org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
> at
> org.apache.zookeeper.KeeperException.create(KeeperException.java:42)
> at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:921)
> at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:950)
> at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
> at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
> at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
> at
> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>


Re: Topic Creation

2014-11-18 Thread Jun Rao
Yes, that's the best option for now. The creation is async.

Thanks,

Jun

On Tue, Nov 18, 2014 at 8:55 AM, Bryan Baugher  wrote:

> Hi everyone,
>
> I'm looking for the API to create/manager topics. Is this class[1] the one
> I should be using? Also what if I'm wanting to create the topic
> synchronously?
>
> [1] -
>
> https://github.com/apache/kafka/blob/0.8.1.1/core/src/main/scala/kafka/admin/AdminUtils.scala#L149-L157
>
> Bryan
>


Re: Kafka consumer - want to print partition id along with message

2014-11-18 Thread Jun Rao
You need to define msg as MessageAndMetadata.

Thanks,

Jun

On Mon, Nov 17, 2014 at 10:43 PM, kade...@yahoo.com.INVALID <
kade...@yahoo.com.invalid> wrote:

> Hi,
>
> I am trying to learn more on Kafka consuming patterns if we have
> partitions on the topic and testing few scenarios. I would like to print
> the message when the consumer consumes along with the partition id. How to
> get this done ?
>
> We have simple consumer code (high level)
>
>   public void run() {
> ConsumerIterator it = m_stream.iterator();
> while (it.hasNext())
> System.out.println("Thread " + m_threadNumber + ":
> " + new String(it.next().message()));
> System.out.println("Shutting down Thread: " + m_threadNumber);
> }
>
> How to print the message as well as partition id ?
>
> Tried to assign it.next() to MessageandMetadata variable and printed
> partition id (which is declared as int). But while printing message() - it
> is declared as object in MessageandMetadata. Message is getting printed as
> junk data
>
> Tried this:
>
> MessageandMetadata msg;
> msg = it.next()
> System.out.println("Partition "+ msg.partition() + "Message data" +
> msg.message().tostring());
>
> Here msg.message() is not printing string value.
> Can anyone help on this.
>
> Kader .
>
> Sent from Yahoo Mail on Android
>
>


Re: Enforcing Network Bandwidth Quote with New Java Producer

2014-11-18 Thread Jun Rao
Either is fine. "outgoing-byte-rate" gives you the total and "byte-rate"
gives you per topic.

Thanks,

Jun

On Mon, Nov 17, 2014 at 3:58 PM, Bhavesh Mistry 
wrote:

> Hi Jun,
>
> So If I set the "*metrics.sample.window.ms
> *" to 1 minute, I will be able to get
> compression bytes count per minute.  Which one should I be using from
> following ?
>
> "outgoing-byte-rate", //The average number of outgoing
> bytes sent
> per second to all servers.
> "byte-rate", //Rate Per seconds
>
>
> Thanks,
>
> Bhavesh
>
> On Fri, Nov 14, 2014 at 3:39 PM, Jun Rao  wrote:
>
> > We have a metric that measures the per-topic bytes send rate (after
> > compression). You can get the values through the producer api.
> >
> > Thanks,
> >
> > Jun
> >
> > On Fri, Nov 14, 2014 at 10:34 AM, Bhavesh Mistry <
> > mistry.p.bhav...@gmail.com
> > > wrote:
> >
> > > HI Kafka Team,
> > >
> > > We like to enforce a network bandwidth quota limit per minute on
> producer
> > > side.  How can I do this ?  I need some way to count compressed bytes
> on
> > > producer ?  I know there is callback does not give this ability ?  Let
> me
> > > know the best way.
> > >
> > >
> > >
> > > Thanks,
> > >
> > > Bhavesh
> > >
> >
>


Re: kafka application logdir

2014-11-18 Thread Jun Rao
Try the following.

KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:/mylog4jdir"
/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
Thanks,

Jun

On Mon, Nov 17, 2014 at 11:11 AM, Jimmy John  wrote:

> How do I configure the application kafka log dir?
>
> Right now the default is /var/log/upstart/kafka.log . I want to point it to
> a different mount dir... e.g.
>
> /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
> --logdir /mnt/kafka/kafka-app-logs
>
> But the above gives me errors :
>
> USAGE: java [options] KafkaServer server.properties
>
>
> NOTE: these are application log dir , not the data directory.
>
>
> Any recommended way to point the application log to a different place?
>
> thx
>
> jim
>


Using OffsetRequest to get the Head and Tail of a partition in a single request.

2014-11-18 Thread Thunder Stumpges
Hey all,

We are working on a .net client, and I have a question about the OffsetRequest 
api 
(https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI)

It seems to indicate that you would always get the "log end offset" (tail 
offset) regardless of the input request:

"The response contains the starting offset of each segment for 
the requested partition as well as the "log end offset" i.e. the offset of the 
next message that would be appended to the given partition."

However from inspecting the code, and experimenting with the API, that doesn't 
seem to be the case. We cannot seem to fetch both the head (earliest offset, 
based on Time specified as -2L ) and Tail which we would have expected based on 
the comment above in the protocol docs.

We can get either the earliest OR latest, but not both. We attempted to pass 
two entries (the Array of [Partition,Time,MaxNumberOfOffsets] per the protocol) 
for the same partition ID, one with a -1L and one with a -2L, however we get 
only one result back, and from the code in KafkaApis.scala handleOffsetRequest, 
it seems like there could never be multiple requests for the same 
topic/partition.

Does anyone know if this should be possible, or if there is a work-around for 
this? Also, should something like this go to the dev group instead?

Thanks
Thunder



Re: selecting java producer (0.8.2 or 0.8.1.1?)

2014-11-18 Thread Jason Rosenberg
So that java-doc link includes a new KafkaConsumer (but it seems in other
threads that's not being referred to as ready for use until 0.9, is that
right?).Is there a way to know which parts of that javadoc are
considered beta-ready in 0.8.2 and which are not?

Jason

On Tue, Nov 18, 2014 at 11:03 PM, Joe Stein  wrote:

> 0.8.2-beta java doc
> https://archive.apache.org/dist/kafka/0.8.2-beta/java-doc/
>
> /***
> Joe Stein
> Founder, Principal Consultant
> Big Data Open Source Security LLC
> http://www.stealth.ly
> Twitter: @allthingshadoop
> /
> On Nov 18, 2014 10:33 PM, "Jason Rosenberg"  wrote:
>
> > Hi Jun,
> >
> > Is this the official java doc for the new producer (www.trieuvan.com)?
> > I'm
> > not seeing any links to it (or any documentation) on the apache kafka
> site
> > (am I overlooking it)?
> >
> > Should there be a link to it in the 0.8.2-beta documentation page?
> >
> > Jason
> >
> > On Tue, Nov 18, 2014 at 7:23 PM, Jun Rao  wrote:
> >
> > > The new producer in 0.8.2 is considered stable, although it's
> relatively
> > > new. Compared with the old producer, it has the following features.
> > >
> > > 1. Use non-blocking socket to send requests to the broker. So uses
> fewer
> > > threads and have better throughput.
> > > 2. Bound the memory consumption.
> > > 3. Support a callback when sending a request asynchronously.
> > > 4. Returns the offset for each produced message.
> > >
> > > You can look at the example in the java doc.
> > >
> > >
> >
> http://www.trieuvan.com/apache/kafka/0.8.2-beta/java-doc/org/apache/kafka/clients/producer/KafkaProducer.html
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Mon, Nov 17, 2014 at 4:46 AM, Shlomi Hazan 
> wrote:
> > >
> > > > Hi,
> > > > I need to make a choice and I can't get a full picture on the
> > differences
> > > > between the two.
> > > > E.g.:
> > > > Are both producers async capable to the same extent?
> > > > Is the new producer stable for production?
> > > > Is there some usage example for the new producer?
> > > > What are the tradeoffs using one or another?
> > > > 10x,
> > > > Shlomi
> > > >
> > >
> >
>


Re: benchmark kafka on 10GbE network

2014-11-18 Thread Jay Kreps
Yeah this will involve some experimentation.

The metrics are visible with jconsole or another jmx viewer.

It may also be worth looking at the cpu usage per-thread (e.g. start top
and press 't' I think).

Another simple test for broker vs client as the bottleneck is just to start
another producer or consumer and see if that improves throughput (if so it
is probably a client bottleneck).

-Jay

On Tue, Nov 18, 2014 at 4:44 PM, Manu Zhang  wrote:

> Thanks Jay for the quick response.
>
> Yes, it's a single producer and consumer both configured with multiple
> threads but I'm not using the new producer.
> CPU is typically 50% utilized on client and merely used on broker. Disks
> aren't busy either as a lot of data are cached in memory.
> Would you please give a link for the producer metrics you are referring to
> ?
>
> Thanks,
> Manu
>
> On Wed, Nov 19, 2014 at 2:39 AM, Jay Kreps  wrote:
>
> > Hey Manu,
> >
> > I'm not aware of a benchmark on 10GbE. I'd love to see that though.
> Diving
> > into the results may help us find bottlenecks hidden by the slower
> network.
> >
> > Can you figure out where the bottleneck is in your test? I assume this
> is a
> > single producer and consumer instance and you are using the new producer
> as
> > in those benchmarks?
> >
> > This can be slightly tricky as it can be cpu or I/O on either the clients
> > or the brokers. You basically have to look at top, iostat, and the jmx
> > metrics for clues. The producer has good metrics that explain whether it
> is
> > spending most of its time waiting or sending data. Not sure if there is a
> > similar diagnostic for the consumer.
> >
> > -Jay
> >
> > On Tue, Nov 18, 2014 at 5:10 AM, Manu Zhang 
> > wrote:
> >
> > > Hi all,
> > >
> > > I have been trying out kafka benchmarks described in Jay's
> > > benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
> > > <
> > >
> >
> https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machine
> > > >.
> > > I'm able to get similar results on a 4-node GbE network whose in-bytes
> > > could be saturated at 120MB/s. However, on a 4-node, 10GbE network, I
> can
> > > not get in-bytes higher than 150MB/s. *Has anyone benchmarked kafka on
> a
> > > 10GbE network ? Any rule of thumb on 10GbE network for configurations
> of
> > > broker, producer and consumer ? *
> > >
> > > My kafka version is 0.8.1.1 and I've created a topic with 8 partitions
> > with
> > > 1 replica distributed evenly among the 4 nodes. Message size is 100
> > bytes.
> > > I use all the default kafka settings.
> > > My cluster has 4 nodes, where each node has 32 cores, 128MB RAM and 3
> > disks
> > > for kafka.
> > >
> > > I've tried increasing message size to 1000 bytes which improved
> > producer's
> > > throughput but not consumer's.
> > >
> > >
> > > Thanks,
> > > Manu
> > >
> >
>


Re: selecting java producer (0.8.2 or 0.8.1.1?)

2014-11-18 Thread Joe Stein
0.8.2-beta java doc
https://archive.apache.org/dist/kafka/0.8.2-beta/java-doc/

/***
Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
http://www.stealth.ly
Twitter: @allthingshadoop
/
On Nov 18, 2014 10:33 PM, "Jason Rosenberg"  wrote:

> Hi Jun,
>
> Is this the official java doc for the new producer (www.trieuvan.com)?
> I'm
> not seeing any links to it (or any documentation) on the apache kafka site
> (am I overlooking it)?
>
> Should there be a link to it in the 0.8.2-beta documentation page?
>
> Jason
>
> On Tue, Nov 18, 2014 at 7:23 PM, Jun Rao  wrote:
>
> > The new producer in 0.8.2 is considered stable, although it's relatively
> > new. Compared with the old producer, it has the following features.
> >
> > 1. Use non-blocking socket to send requests to the broker. So uses fewer
> > threads and have better throughput.
> > 2. Bound the memory consumption.
> > 3. Support a callback when sending a request asynchronously.
> > 4. Returns the offset for each produced message.
> >
> > You can look at the example in the java doc.
> >
> >
> http://www.trieuvan.com/apache/kafka/0.8.2-beta/java-doc/org/apache/kafka/clients/producer/KafkaProducer.html
> >
> > Thanks,
> >
> > Jun
> >
> > On Mon, Nov 17, 2014 at 4:46 AM, Shlomi Hazan  wrote:
> >
> > > Hi,
> > > I need to make a choice and I can't get a full picture on the
> differences
> > > between the two.
> > > E.g.:
> > > Are both producers async capable to the same extent?
> > > Is the new producer stable for production?
> > > Is there some usage example for the new producer?
> > > What are the tradeoffs using one or another?
> > > 10x,
> > > Shlomi
> > >
> >
>


Re: selecting java producer (0.8.2 or 0.8.1.1?)

2014-11-18 Thread Jason Rosenberg
Hi Jun,

Is this the official java doc for the new producer (www.trieuvan.com)?  I'm
not seeing any links to it (or any documentation) on the apache kafka site
(am I overlooking it)?

Should there be a link to it in the 0.8.2-beta documentation page?

Jason

On Tue, Nov 18, 2014 at 7:23 PM, Jun Rao  wrote:

> The new producer in 0.8.2 is considered stable, although it's relatively
> new. Compared with the old producer, it has the following features.
>
> 1. Use non-blocking socket to send requests to the broker. So uses fewer
> threads and have better throughput.
> 2. Bound the memory consumption.
> 3. Support a callback when sending a request asynchronously.
> 4. Returns the offset for each produced message.
>
> You can look at the example in the java doc.
>
> http://www.trieuvan.com/apache/kafka/0.8.2-beta/java-doc/org/apache/kafka/clients/producer/KafkaProducer.html
>
> Thanks,
>
> Jun
>
> On Mon, Nov 17, 2014 at 4:46 AM, Shlomi Hazan  wrote:
>
> > Hi,
> > I need to make a choice and I can't get a full picture on the differences
> > between the two.
> > E.g.:
> > Are both producers async capable to the same extent?
> > Is the new producer stable for production?
> > Is there some usage example for the new producer?
> > What are the tradeoffs using one or another?
> > 10x,
> > Shlomi
> >
>


Re: ISR shrink to 0?

2014-11-18 Thread Jason Rosenberg
Not sure what happened, but the issue went away once revived the broker id
on a new host

But it does seem host D's ISR leadership could not be cleared until another
member of the ISR came back.somehow D was stale and remained stuck (and
clients therefore kept trying to connect to it)...

Jason

On Mon, Nov 17, 2014 at 2:06 PM, Jason Rosenberg  wrote:

> We have had 2 nodes in a 4 node cluster die this weekend, sadly.
> Fortunately there was no critical data on these machines yet.
>
> The cluster is running 0.8.1.1, and using replication factor of 2 for 2
> topics, each with 20 partitions.
>
> For sake of discussion, assume that nodes A and B are still up, and C and
> D are now down.
>
> As expected, partitions that had one replica on a good host (A or B) and
> one on a bad node (C or D), had their ISR shrink to just 1 node (A or B).
>
> Roughly 1/6 of the partitions had their 2 replicas on the 2 bad nodes, C
> and D.  For these, I was expecting the ISR to show up as empty, and the
> partition unavailable.
>
> However, that's not what I'm seeing.  When running TopicCommand
> --describe, I see that the ISR still shows 1 replica, on node D (D was the
> second node to go down).
>
> And, producers are still periodically trying to produce to node D (but
> failing and retrying to one of the good nodes).
>
> So, it seems the cluster's meta data is still thinking that node D is up
> and serving the partitions that were only replicated on C and D.   However,
> for partitions that were on A and D, or B and D, D is not shown as being in
> the ISR.
>
> Is this correct?  Should the cluster continue showing the last node to
> have been alive for a partition as still in the ISR?
>
> Jason
>
>
>


Re: How to recover from ConsumerRebalanceFailedException ?

2014-11-18 Thread Bhavesh Mistry
Hi Kakfa team,

So just monitor "ZkClient-EventThread
*"
threads via ThreadInfo[] threads =
ManagementFactory.getThreadMXBean().; and if this ZkClient-EventThread
* dies
thread dies, then restart the sources.  Is there any alter approach or life
cycle method  that so api consumer can attached to Consumer life cycle that
it is dying and get notified so we can take some action.

Thanks,

Bhavesh

On Mon, Nov 17, 2014 at 2:30 PM, Bhavesh Mistry 
wrote:

> Hi Kafka Team,
>
>
> I get following exception due to ZK/Network issues intermittently.  How do
> I recover from consumer thread dying *programmatically* and restart
> source because we have alerts that due to this error we have partition
> OWNERSHIP is *none* ?  Please let me know how to restart source and
> detect consumer thread died and need to be restarted ?
>
>
>
> 17 Nov 2014 04:29:41,180 ERROR [
> ZkClient-EventThread-65-dare-msgq00.sv.walmartlabs.com:9091,
> dare-msgq01.sv.walmartlabs.com:9091,dare-msgq02.sv.walmartlabs.com:9091]
> (org.I0Itec.zkclient.ZkEventThread.run:77)  - Error handling event
> ZkEvent[New session event sent to
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener@552c5ea8
> ]
> kafka.common.ConsumerRebalanceFailedException:
> mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68
> can't rebalance after 8 retries
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:626)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481)
> at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
>
>
>
>
>
> ZK Connection Issues:
>
> java.net.SocketException: Transport endpoint is not connected
> at sun.nio.ch.SocketChannelImpl.shutdown(Native Method)
> at
> sun.nio.ch.SocketChannelImpl.shutdownInput(SocketChannelImpl.java:633)
> at sun.nio.ch.SocketAdaptor.shutdownInput(SocketAdaptor.java:360)
> at
> org.apache.zookeeper.ClientCnxn$SendThread.cleanup(ClientCnxn.java:1205)
> at
> org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1170)
>
>
>
>
> at
> org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
> at
> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
> at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
> at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
> at kafka.utils.ZkUtils$.readData(ZkUtils.scala:449)
> at
> kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:61)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:630)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:601)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:595)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:591)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481)
> at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> Caused by: org.apache.zookeeper.KeeperException$NoNodeException:
> KeeperErrorCode = NoNode for
> /consumers/mupd_statsd_logmon_metric_events18/ids/mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68
> at
> org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
> at
> org.apache.zookeeper.KeeperException.create(KeeperException.java:42)
> at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:921)
> at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:950)
> at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
> at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
> at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)

Re: benchmark kafka on 10GbE network

2014-11-18 Thread Manu Zhang
Thanks Jay for the quick response.

Yes, it's a single producer and consumer both configured with multiple
threads but I'm not using the new producer.
CPU is typically 50% utilized on client and merely used on broker. Disks
aren't busy either as a lot of data are cached in memory.
Would you please give a link for the producer metrics you are referring to
?

Thanks,
Manu

On Wed, Nov 19, 2014 at 2:39 AM, Jay Kreps  wrote:

> Hey Manu,
>
> I'm not aware of a benchmark on 10GbE. I'd love to see that though. Diving
> into the results may help us find bottlenecks hidden by the slower network.
>
> Can you figure out where the bottleneck is in your test? I assume this is a
> single producer and consumer instance and you are using the new producer as
> in those benchmarks?
>
> This can be slightly tricky as it can be cpu or I/O on either the clients
> or the brokers. You basically have to look at top, iostat, and the jmx
> metrics for clues. The producer has good metrics that explain whether it is
> spending most of its time waiting or sending data. Not sure if there is a
> similar diagnostic for the consumer.
>
> -Jay
>
> On Tue, Nov 18, 2014 at 5:10 AM, Manu Zhang 
> wrote:
>
> > Hi all,
> >
> > I have been trying out kafka benchmarks described in Jay's
> > benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
> > <
> >
> https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machine
> > >.
> > I'm able to get similar results on a 4-node GbE network whose in-bytes
> > could be saturated at 120MB/s. However, on a 4-node, 10GbE network, I can
> > not get in-bytes higher than 150MB/s. *Has anyone benchmarked kafka on a
> > 10GbE network ? Any rule of thumb on 10GbE network for configurations of
> > broker, producer and consumer ? *
> >
> > My kafka version is 0.8.1.1 and I've created a topic with 8 partitions
> with
> > 1 replica distributed evenly among the 4 nodes. Message size is 100
> bytes.
> > I use all the default kafka settings.
> > My cluster has 4 nodes, where each node has 32 cores, 128MB RAM and 3
> disks
> > for kafka.
> >
> > I've tried increasing message size to 1000 bytes which improved
> producer's
> > throughput but not consumer's.
> >
> >
> > Thanks,
> > Manu
> >
>


Re: selecting java producer (0.8.2 or 0.8.1.1?)

2014-11-18 Thread Jun Rao
The new producer in 0.8.2 is considered stable, although it's relatively
new. Compared with the old producer, it has the following features.

1. Use non-blocking socket to send requests to the broker. So uses fewer
threads and have better throughput.
2. Bound the memory consumption.
3. Support a callback when sending a request asynchronously.
4. Returns the offset for each produced message.

You can look at the example in the java doc.
http://www.trieuvan.com/apache/kafka/0.8.2-beta/java-doc/org/apache/kafka/clients/producer/KafkaProducer.html

Thanks,

Jun

On Mon, Nov 17, 2014 at 4:46 AM, Shlomi Hazan  wrote:

> Hi,
> I need to make a choice and I can't get a full picture on the differences
> between the two.
> E.g.:
> Are both producers async capable to the same extent?
> Is the new producer stable for production?
> Is there some usage example for the new producer?
> What are the tradeoffs using one or another?
> 10x,
> Shlomi
>


Re: Getting Simple consumer details using MBean

2014-11-18 Thread Jun Rao
If you connect to the jvm of the consumer, you will see the following mbean.

"kafka.consumer":type="FetchRequestAndResponseMetrics",name="SimpleConsumerShell-AllBrokersFetchRequestRateAndTimeMs"

If you want to monitor the consumption rate on the broker, you can look at
the byte out rate and the fetch-consumer request time mentioned in
http://kafka.apache.org/documentation.html#monitoring

Thanks,

Jun

On Sun, Nov 16, 2014 at 10:19 PM, Madhukar Bharti 
wrote:

> Hi,
>
> Thank you for your reply Otis. So simple consumers can be fetched from
> 8.0.2? Is There no other way to get it from 8.0.1?
>
> @Jun:  The link you have shared doesn't have any beans to get simple
> consumer details.
>
> Kindly help me in this.
>
>
> Thanks and Regards
> Madhukar
>
> On Mon, Nov 17, 2014 at 12:20 AM, Otis Gospodnetic <
> otis.gospodne...@gmail.com> wrote:
>
> > Hi Madhukar,
> >
> > Maybe you want to look at SPM , which has
> Kafka
> > monitoring.
> > But please note this:
> >
> >
> https://sematext.atlassian.net/wiki/display/PUBSPM/SPM+FAQ#SPMFAQ-WhyamInotseeingallKafkametricsifI'mrunninga0.8.xversionofKafkathatispre-0.8.2
> > ?
> >
> > Otis
> > --
> > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> > Solr & Elasticsearch Support * http://sematext.com/
> >
> >
> > On Fri, Nov 14, 2014 at 5:52 PM, Jun Rao  wrote:
> >
> > > So, you want to monitor the mbeans on the broker side? Take a look at
> > > http://kafka.apache.org/documentation.html#monitoring
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Thu, Nov 13, 2014 at 10:58 PM, Madhukar Bharti <
> > > bhartimadhu...@gmail.com>
> > > wrote:
> > >
> > > > Hi Jun Rao,
> > > >
> > > > Sorry to disturb you. But I my Kafka setup it is not showing. I am
> > > > attaching screen shot taken from all brokers.
> > > >
> > > > In kafka.consumer it is listing only "ReplicaFetcherThread".
> > > >
> > > > As I said earlier I am using "2.10-0.8.1.1" version. Do i need to
> > > > configure any extra parameter for this? I am simply using the same
> > > > configuration as described in wiki page.
> > > >
> > > >
> > > >
> > > > Thanks and Regards,
> > > > Madhukar
> > > >
> > > >
> > > > On Fri, Nov 14, 2014 at 1:17 AM, Jun Rao  wrote:
> > > >
> > > >> I tried running kafka-simple-consumer-shell. I can see the following
> > > >> mbean.
> > > >>
> > > >>
> > > >>
> > >
> >
> "kafka.consumer":type="FetchRequestAndResponseMetrics",name="SimpleConsumerShell-AllBrokersFetchRequestRateAndTimeMs"
> > > >>
> > > >> Thanks,
> > > >>
> > > >> Jun
> > > >>
> > > >> On Wed, Nov 12, 2014 at 9:57 PM, Madhukar Bharti <
> > > >> bhartimadhu...@gmail.com>
> > > >> wrote:
> > > >>
> > > >> > Hi Jun Rao,
> > > >> >
> > > >> > Thanks for your quick reply.
> > > >> >
> > > >> > I am not able to see this  any bean named as "SimpleConsumer". Is
> > > there
> > > >> any
> > > >> > configuration related to this?
> > > >> >
> > > >> > How can I see this bean named listing in Jconsole window?
> > > >> >
> > > >> >
> > > >> > Thanks and Regards
> > > >> > Madhukar
> > > >> >
> > > >> > On Thu, Nov 13, 2014 at 6:06 AM, Jun Rao 
> wrote:
> > > >> >
> > > >> > > Those are for 0.7. In 0.8, you should see sth
> > > >> > > like FetchRequestRateAndTimeMs in SimpleConsumer.
> > > >> > >
> > > >> > > Thanks,
> > > >> > >
> > > >> > > Jun
> > > >> > >
> > > >> > > On Wed, Nov 12, 2014 at 5:14 AM, Madhukar Bharti <
> > > >> > bhartimadhu...@gmail.com
> > > >> > > >
> > > >> > > wrote:
> > > >> > >
> > > >> > > > Hi,
> > > >> > > >
> > > >> > > > I want to get the simple consumer details using MBean as
> > described
> > > >> here
> > > >> > > > <
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Operations#Operations-Monitoring
> > > >> > > > >.
> > > >> > > > But these bean names are not showing in JConsole as well as
> > while
> > > >> > trying
> > > >> > > to
> > > >> > > > read from JMX.
> > > >> > > >
> > > >> > > > Please help me to get simple consumer details.
> > > >> > > >
> > > >> > > > I am using Kafka 0.8.1.1 version.
> > > >> > > >
> > > >> > > >
> > > >> > > > Thanks and Regards,
> > > >> > > > Madhukar Bharti
> > > >> > > >
> > > >> > >
> > > >> >
> > > >> >
> > > >> >
> > > >> > --
> > > >> > Thanks and Regards,
> > > >> > Madhukar Bharti
> > > >> > Mob: 7845755539
> > > >> >
> > > >>
> > > >
> > > >
> > > >
> > > > --
> > > > Thanks and Regards,
> > > > Madhukar Bharti
> > > > Mob: 7845755539
> > > >
> > >
> >
>
>
>
> --
> Thanks and Regards,
> Madhukar Bharti
> Mob: 7845755539
>


Re: Consumer and offset management support in 0.8.2 and 0.9

2014-11-18 Thread Joel Koshy
Yes it should be backwards compatible. So for e.g., you should be able
to use an 0.8.1 client with an 0.8.2 broker. In general, you should
not upgrade your clients until after the brokers have been upgraded.
However, you can point an 0.8.2 client at an 0.8.1 broker. One wire
protocol change I'm aware of is the OffsetCommitRequest.  There is a
change in the OffsetCommitRequest format (KAFKA-1634) although you can
explicitly construct an OffsetCommitRequest with the earlier version.

Thanks,

Joel

On Tue, Nov 18, 2014 at 03:39:41PM -0500, Marius Bogoevici wrote:
> Hi Joel,
> 
> Thanks for all the clarifications!  Just another question on this: will
> 0.8.2 be backwards compatible with 0.8.1, just as 0.8.1 was with 0.8?
> Generally speaking, would there be any concerns with using the 0.8.2
> consumer with a 0.8.1 broker, for instance?
> 
> Marius
> 
> On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy  wrote:
> 
> > Inline..
> >
> > On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici wrote:
> > > Hello everyone,
> > >
> > > I have a few questions about the current status and future of the Kafka
> > > consumers.
> > >
> > > We have been working to adding Kafka support in Spring XD [1], currently
> > > using the high level consumer via Spring Integration Kafka [2]. We are
> > > working on adding features such as:
> > > - the ability to control offsets/replay topics;
> > > - the ability to control partition allocation across multiple consumers;
> > >
> > > We are currently at version 0.8.1.1, so using the simple consumer is a
> > > pretty straightforward choice right now. However, in the light of the
> > > upcoming consumer changes for 0.8.2 and 0.9, I have a few questions:
> > >
> > > 1) With respect to the consumer redesign for 0.9, what is the future of
> > the
> > > Simple Consumer and High Level Consumer? To my best understanding, the
> > > existing high level consumer API will be deprecated in favour of the new
> > > consumer API. What is the future of the Simple Consumer, in this case? it
> > > will continue to exist as a low-level API implementing the Kafka protocol
> > > [3] and providing the building blocks for the new consumer, or will it be
> > > deprecated as well?
> >
> > The new consumer will subsume both use-cases (simple and high-level).
> > You can still use the old SimpleConsumer if you wish - i.e., the wire
> > protocol for fetch and other requests will still be supported.
> >
> > >
> > > 2) Regarding the new consumer: the v0.8.2 codebase contains an early
> > > implementation of it, but since this a feature scheduled only for 0.9,
> > what
> > > is its status as well? Is it included only as a future reference and for
> > > stabilizing the API?
> >
> > It is a WIP so you cannot really use it.
> >
> > > 3) Obviously, offset management is a concern if using the simple
> > consumer,
> > > so - wondering about the Offset Management API as well. The Kafka
> > protocol
> > > document specifically indicates that it will be fully functional in 0.8.2
> > > [4] - however, a functional implementation is already available in
> > 0.8.1.1
> > > (accessible via the SimpleConsumer API but not documented in [5]). Again,
> > > trying to understand the extent of what 0.8.1.1 already supports
> > > (ostensibly, the offset manager support seems to have been added only in
> > > 0.8.2 - please correct me if I am wrong), and whether if it is
> > recommended
> > > for use in production in any form (with the caveats that accompany the
> > use
> > > of ZooKeeper).
> >
> > In 0.8.2 the OffsetCommitRequest and OffsetFetchRequest will use Kafka
> > as the offsets storage mechanism (not zookeeper). High-level Java
> > consumers can choose to store offsets in ZooKeeper instead by setting
> > offsets.storage=zookeeper
> >
> > However, if you are using the simple consumer and wish to store
> > offsets in ZooKeeper you will need to commit to ZooKeeper directly.
> > You can use ZkUtils in the kafka.utils package for this.
> >
> > If you wish to move to Kafka-based offsets we will be adding a new
> > OffsetsClient that can be used to commit/fetch offsets to/from Kafka.
> > This is currently not listed as a blocker for 0.8.2 but I think we
> > should include it. I will update that ticket.
> >
> > > 4) Trying to interpret the existing examples in [6] and the comments on
> > [7]
> > > - the version of the Offset Management API that exists in 0.8.1.1 is
> > using
> > > ZooKeeper - whereas ZooKeeper will be optional in 0.8.2 - to be replaced
> > by
> > > Kafka, and phased out if possible. To my understanding, the switch
> > between
> > > the two will be controlled by the broker configuration (along with other
> > > parameters that control the performance of offset queues. Is that
> > correct?
> >
> > The switch is a client-side configuration. That wiki is not
> > up-to-date. The most current documentation is available as a patch in
> > https://issues.apache.org/jira/browse/KAFKA-1729
> >
> > > 5) Also, wondering about the timeline o

Re: Consumer and offset management support in 0.8.2 and 0.9

2014-11-18 Thread Marius Bogoevici
Hi Joel,

Thanks for all the clarifications!  Just another question on this: will
0.8.2 be backwards compatible with 0.8.1, just as 0.8.1 was with 0.8?
Generally speaking, would there be any concerns with using the 0.8.2
consumer with a 0.8.1 broker, for instance?

Marius

On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy  wrote:

> Inline..
>
> On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici wrote:
> > Hello everyone,
> >
> > I have a few questions about the current status and future of the Kafka
> > consumers.
> >
> > We have been working to adding Kafka support in Spring XD [1], currently
> > using the high level consumer via Spring Integration Kafka [2]. We are
> > working on adding features such as:
> > - the ability to control offsets/replay topics;
> > - the ability to control partition allocation across multiple consumers;
> >
> > We are currently at version 0.8.1.1, so using the simple consumer is a
> > pretty straightforward choice right now. However, in the light of the
> > upcoming consumer changes for 0.8.2 and 0.9, I have a few questions:
> >
> > 1) With respect to the consumer redesign for 0.9, what is the future of
> the
> > Simple Consumer and High Level Consumer? To my best understanding, the
> > existing high level consumer API will be deprecated in favour of the new
> > consumer API. What is the future of the Simple Consumer, in this case? it
> > will continue to exist as a low-level API implementing the Kafka protocol
> > [3] and providing the building blocks for the new consumer, or will it be
> > deprecated as well?
>
> The new consumer will subsume both use-cases (simple and high-level).
> You can still use the old SimpleConsumer if you wish - i.e., the wire
> protocol for fetch and other requests will still be supported.
>
> >
> > 2) Regarding the new consumer: the v0.8.2 codebase contains an early
> > implementation of it, but since this a feature scheduled only for 0.9,
> what
> > is its status as well? Is it included only as a future reference and for
> > stabilizing the API?
>
> It is a WIP so you cannot really use it.
>
> > 3) Obviously, offset management is a concern if using the simple
> consumer,
> > so - wondering about the Offset Management API as well. The Kafka
> protocol
> > document specifically indicates that it will be fully functional in 0.8.2
> > [4] - however, a functional implementation is already available in
> 0.8.1.1
> > (accessible via the SimpleConsumer API but not documented in [5]). Again,
> > trying to understand the extent of what 0.8.1.1 already supports
> > (ostensibly, the offset manager support seems to have been added only in
> > 0.8.2 - please correct me if I am wrong), and whether if it is
> recommended
> > for use in production in any form (with the caveats that accompany the
> use
> > of ZooKeeper).
>
> In 0.8.2 the OffsetCommitRequest and OffsetFetchRequest will use Kafka
> as the offsets storage mechanism (not zookeeper). High-level Java
> consumers can choose to store offsets in ZooKeeper instead by setting
> offsets.storage=zookeeper
>
> However, if you are using the simple consumer and wish to store
> offsets in ZooKeeper you will need to commit to ZooKeeper directly.
> You can use ZkUtils in the kafka.utils package for this.
>
> If you wish to move to Kafka-based offsets we will be adding a new
> OffsetsClient that can be used to commit/fetch offsets to/from Kafka.
> This is currently not listed as a blocker for 0.8.2 but I think we
> should include it. I will update that ticket.
>
> > 4) Trying to interpret the existing examples in [6] and the comments on
> [7]
> > - the version of the Offset Management API that exists in 0.8.1.1 is
> using
> > ZooKeeper - whereas ZooKeeper will be optional in 0.8.2 - to be replaced
> by
> > Kafka, and phased out if possible. To my understanding, the switch
> between
> > the two will be controlled by the broker configuration (along with other
> > parameters that control the performance of offset queues. Is that
> correct?
>
> The switch is a client-side configuration. That wiki is not
> up-to-date. The most current documentation is available as a patch in
> https://issues.apache.org/jira/browse/KAFKA-1729
>
> > 5) Also, wondering about the timeline of 0.8.2 - according to the
> roadmaps
> > it should be released relatively shortly. Is that correct?
>
> Yes - once the blockers are ironed out.
>
> >
> > Thanks,
> > Marius
> >
> > [1] http://projects.spring.io/spring-xd/
> > [2] https://github.com/spring-projects/spring-integration-kafka
> > [3]
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> > [4]
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
> > [5] http://kafka.apache.org/082/documentation.html#simpleconsumerapi
> > [6]
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
> > [7] https://issues.apache.org/jira/browse/KAFKA

Re: Consumer and offset management support in 0.8.2 and 0.9

2014-11-18 Thread Joel Koshy
Inline..

On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici wrote:
> Hello everyone,
> 
> I have a few questions about the current status and future of the Kafka
> consumers.
> 
> We have been working to adding Kafka support in Spring XD [1], currently
> using the high level consumer via Spring Integration Kafka [2]. We are
> working on adding features such as:
> - the ability to control offsets/replay topics;
> - the ability to control partition allocation across multiple consumers;
> 
> We are currently at version 0.8.1.1, so using the simple consumer is a
> pretty straightforward choice right now. However, in the light of the
> upcoming consumer changes for 0.8.2 and 0.9, I have a few questions:
> 
> 1) With respect to the consumer redesign for 0.9, what is the future of the
> Simple Consumer and High Level Consumer? To my best understanding, the
> existing high level consumer API will be deprecated in favour of the new
> consumer API. What is the future of the Simple Consumer, in this case? it
> will continue to exist as a low-level API implementing the Kafka protocol
> [3] and providing the building blocks for the new consumer, or will it be
> deprecated as well?

The new consumer will subsume both use-cases (simple and high-level).
You can still use the old SimpleConsumer if you wish - i.e., the wire
protocol for fetch and other requests will still be supported.

> 
> 2) Regarding the new consumer: the v0.8.2 codebase contains an early
> implementation of it, but since this a feature scheduled only for 0.9, what
> is its status as well? Is it included only as a future reference and for
> stabilizing the API?

It is a WIP so you cannot really use it.

> 3) Obviously, offset management is a concern if using the simple consumer,
> so - wondering about the Offset Management API as well. The Kafka protocol
> document specifically indicates that it will be fully functional in 0.8.2
> [4] - however, a functional implementation is already available in 0.8.1.1
> (accessible via the SimpleConsumer API but not documented in [5]). Again,
> trying to understand the extent of what 0.8.1.1 already supports
> (ostensibly, the offset manager support seems to have been added only in
> 0.8.2 - please correct me if I am wrong), and whether if it is recommended
> for use in production in any form (with the caveats that accompany the use
> of ZooKeeper).

In 0.8.2 the OffsetCommitRequest and OffsetFetchRequest will use Kafka
as the offsets storage mechanism (not zookeeper). High-level Java
consumers can choose to store offsets in ZooKeeper instead by setting
offsets.storage=zookeeper

However, if you are using the simple consumer and wish to store
offsets in ZooKeeper you will need to commit to ZooKeeper directly.
You can use ZkUtils in the kafka.utils package for this.

If you wish to move to Kafka-based offsets we will be adding a new
OffsetsClient that can be used to commit/fetch offsets to/from Kafka.
This is currently not listed as a blocker for 0.8.2 but I think we
should include it. I will update that ticket.

> 4) Trying to interpret the existing examples in [6] and the comments on [7]
> - the version of the Offset Management API that exists in 0.8.1.1 is using
> ZooKeeper - whereas ZooKeeper will be optional in 0.8.2 - to be replaced by
> Kafka, and phased out if possible. To my understanding, the switch between
> the two will be controlled by the broker configuration (along with other
> parameters that control the performance of offset queues. Is that correct?

The switch is a client-side configuration. That wiki is not
up-to-date. The most current documentation is available as a patch in
https://issues.apache.org/jira/browse/KAFKA-1729

> 5) Also, wondering about the timeline of 0.8.2 - according to the roadmaps
> it should be released relatively shortly. Is that correct?

Yes - once the blockers are ironed out.

> 
> Thanks,
> Marius
> 
> [1] http://projects.spring.io/spring-xd/
> [2] https://github.com/spring-projects/spring-integration-kafka
> [3]
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> [4]
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
> [5] http://kafka.apache.org/082/documentation.html#simpleconsumerapi
> [6]
> https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
> [7] https://issues.apache.org/jira/browse/KAFKA-1729



Re: partition auto-rebalance

2014-11-18 Thread Joel Koshy
The imbalance is measured wrt preferred leaders. i.e., for every
partition, the first replica in the assigned replica list (as shown in
the output of kafka-topics.sh) is called the preferred replica. On
each broker, the auto-balancer counts the number of partitions led by
that broker for which the preferred leader is on another broker. If it
exceeds the thresholds it does a preferred replica leader election.

In your case, can you run the kafka-topics.sh script and see if the
preferred replicas are evenly distributed? Also, which version of
Kafka are you using?

Thanks,

Joel

On Tue, Nov 18, 2014 at 11:48:35AM -0500, Wes Chow wrote:
> I'm trying to understand the config options for auto-rebalancing.
> This is what we have in /etc/kafka/server.properties for all the
> nodes:
> 
> auto.leader.rebalance.enable=true
> leader.imbalance.per.broker.percentage=10
> leader.imbalance.check.interval.seconds=300
> 
> We have 10 nodes for this topic which has 512 partitions. They were
> evenly balanced before I started my experiment. I shut down two of
> the nodes, and the number of leaders per node is now:
> 
>  75 10
>  68 3
>  57 4
>  67 5
>  57 6
>  68 7
>  63 8
>  57 9
> 
> Where the first column is # of leaders, and the second column is
> node #. You can see that nodes 1 and 2 have no leaders, since
> they're down. It's been about half an hour since I did this and the
> balancing hasn't changed.
> 
> The documentation on the config option is very ambiguous. My
> interpretation is that it says if any particular node has 10% more
> leaders then auto-rebalance kicks in. If that means 10% more than
> the average, then node #10 has 75 partitioners, and the average is
> 64, so that's a 17% difference.
> 
> So I think I'm misunderstanding either what auto-rebalance is
> supposed to do or the condition that's supposed to trigger it. Any
> clues?
> 
> Thanks,
> Wes
> 



Re: partition auto-rebalance

2014-11-18 Thread Guozhang Wang
Hello Wes,

The document here is a bit misleading indeed:

http://kafka.apache.org/documentation.html#brokerconfigs

In Kafka a partition has a replica list {A,B,C..} and the first replica
would be the leader of the partition. When it is not the case, for example
since A is down B becomes the leader, the replica list will still be
{A,B,C..} but A's status will be "offline replica" and B as the new leader;
later on even when A resumes it will still be a follower, and hence this is
a case of "imbalance".

The "leader.imbalance.per.broker.percentage" kicks in when this percentage
of this imbalance cases are higher than the threshold. In your case those
imbalances cases will be more than 10%, but since those two brokers are not
back the rebalance logic, although triggered, will not be able to do
anything (you may check the controller logs for entries like ""Starting
preferred replica leader election for ..." to verify).

When you brought those two brokers base online, I think the auto leader
rebalance will execute to move the leaders back to those brokers.

Guozhang


On Tue, Nov 18, 2014 at 8:48 AM, Wes Chow  wrote:

> I'm trying to understand the config options for auto-rebalancing. This is
> what we have in /etc/kafka/server.properties for all the nodes:
>
> auto.leader.rebalance.enable=true
> leader.imbalance.per.broker.percentage=10
> leader.imbalance.check.interval.seconds=300
>
> We have 10 nodes for this topic which has 512 partitions. They were evenly
> balanced before I started my experiment. I shut down two of the nodes, and
> the number of leaders per node is now:
>
>  75 10
>  68 3
>  57 4
>  67 5
>  57 6
>  68 7
>  63 8
>  57 9
>
> Where the first column is # of leaders, and the second column is node #.
> You can see that nodes 1 and 2 have no leaders, since they're down. It's
> been about half an hour since I did this and the balancing hasn't changed.
>
> The documentation on the config option is very ambiguous. My
> interpretation is that it says if any particular node has 10% more leaders
> then auto-rebalance kicks in. If that means 10% more than the average, then
> node #10 has 75 partitioners, and the average is 64, so that's a 17%
> difference.
>
> So I think I'm misunderstanding either what auto-rebalance is supposed to
> do or the condition that's supposed to trigger it. Any clues?
>
> Thanks,
> Wes
>
>


-- 
-- Guozhang


Re: benchmark kafka on 10GbE network

2014-11-18 Thread Jay Kreps
Hey Manu,

I'm not aware of a benchmark on 10GbE. I'd love to see that though. Diving
into the results may help us find bottlenecks hidden by the slower network.

Can you figure out where the bottleneck is in your test? I assume this is a
single producer and consumer instance and you are using the new producer as
in those benchmarks?

This can be slightly tricky as it can be cpu or I/O on either the clients
or the brokers. You basically have to look at top, iostat, and the jmx
metrics for clues. The producer has good metrics that explain whether it is
spending most of its time waiting or sending data. Not sure if there is a
similar diagnostic for the consumer.

-Jay

On Tue, Nov 18, 2014 at 5:10 AM, Manu Zhang  wrote:

> Hi all,
>
> I have been trying out kafka benchmarks described in Jay's
> benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
> <
> https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machine
> >.
> I'm able to get similar results on a 4-node GbE network whose in-bytes
> could be saturated at 120MB/s. However, on a 4-node, 10GbE network, I can
> not get in-bytes higher than 150MB/s. *Has anyone benchmarked kafka on a
> 10GbE network ? Any rule of thumb on 10GbE network for configurations of
> broker, producer and consumer ? *
>
> My kafka version is 0.8.1.1 and I've created a topic with 8 partitions with
> 1 replica distributed evenly among the 4 nodes. Message size is 100 bytes.
> I use all the default kafka settings.
> My cluster has 4 nodes, where each node has 32 cores, 128MB RAM and 3 disks
> for kafka.
>
> I've tried increasing message size to 1000 bytes which improved producer's
> throughput but not consumer's.
>
>
> Thanks,
> Manu
>


Topic Creation

2014-11-18 Thread Bryan Baugher
Hi everyone,

I'm looking for the API to create/manager topics. Is this class[1] the one
I should be using? Also what if I'm wanting to create the topic
synchronously?

[1] -
https://github.com/apache/kafka/blob/0.8.1.1/core/src/main/scala/kafka/admin/AdminUtils.scala#L149-L157

Bryan


partition auto-rebalance

2014-11-18 Thread Wes Chow
I'm trying to understand the config options for auto-rebalancing. This 
is what we have in /etc/kafka/server.properties for all the nodes:


auto.leader.rebalance.enable=true
leader.imbalance.per.broker.percentage=10
leader.imbalance.check.interval.seconds=300

We have 10 nodes for this topic which has 512 partitions. They were 
evenly balanced before I started my experiment. I shut down two of the 
nodes, and the number of leaders per node is now:


 75 10
 68 3
 57 4
 67 5
 57 6
 68 7
 63 8
 57 9

Where the first column is # of leaders, and the second column is node #. 
You can see that nodes 1 and 2 have no leaders, since they're down. It's 
been about half an hour since I did this and the balancing hasn't changed.


The documentation on the config option is very ambiguous. My 
interpretation is that it says if any particular node has 10% more 
leaders then auto-rebalance kicks in. If that means 10% more than the 
average, then node #10 has 75 partitioners, and the average is 64, so 
that's a 17% difference.


So I think I'm misunderstanding either what auto-rebalance is supposed 
to do or the condition that's supposed to trigger it. Any clues?


Thanks,
Wes



Kafka consumer - want to print partition id along with message

2014-11-18 Thread kade...@yahoo.com.INVALID
Hi, 

I am trying to learn more on Kafka consuming patterns if we have partitions on 
the topic and testing few scenarios. I would like to print the message when the 
consumer consumes along with the partition id. How to get this done ? 

We have simple consumer code (high level) 

  public void run() { 
        ConsumerIterator it = m_stream.iterator(); 
        while (it.hasNext()) 
            System.out.println("Thread " + m_threadNumber + ": " + new 
String(it.next().message())); 
System.out.println("Shutting down Thread: " + m_threadNumber); 
    } 

How to print the message as well as partition id ? 

Tried to assign it.next() to MessageandMetadata variable and printed partition 
id (which is declared as int). But while printing message() - it is declared as 
object in MessageandMetadata. Message is getting printed as junk data 

Tried this: 

MessageandMetadata msg; 
msg = it.next() 
System.out.println("Partition "+ msg.partition() + "Message data" + 
msg.message().tostring()); 

Here msg.message() is not printing string value.   
Can anyone help on this. 

Kader .

Sent from Yahoo Mail on Android



benchmark kafka on 10GbE network

2014-11-18 Thread Manu Zhang
Hi all,

I have been trying out kafka benchmarks described in Jay's
benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
.
I'm able to get similar results on a 4-node GbE network whose in-bytes
could be saturated at 120MB/s. However, on a 4-node, 10GbE network, I can
not get in-bytes higher than 150MB/s. *Has anyone benchmarked kafka on a
10GbE network ? Any rule of thumb on 10GbE network for configurations of
broker, producer and consumer ? *

My kafka version is 0.8.1.1 and I've created a topic with 8 partitions with
1 replica distributed evenly among the 4 nodes. Message size is 100 bytes.
I use all the default kafka settings.
My cluster has 4 nodes, where each node has 32 cores, 128MB RAM and 3 disks
for kafka.

I've tried increasing message size to 1000 bytes which improved producer's
throughput but not consumer's.


Thanks,
Manu


Consumer and offset management support in 0.8.2 and 0.9

2014-11-18 Thread Marius Bogoevici
Hello everyone,

I have a few questions about the current status and future of the Kafka
consumers.

We have been working to adding Kafka support in Spring XD [1], currently
using the high level consumer via Spring Integration Kafka [2]. We are
working on adding features such as:
- the ability to control offsets/replay topics;
- the ability to control partition allocation across multiple consumers;

We are currently at version 0.8.1.1, so using the simple consumer is a
pretty straightforward choice right now. However, in the light of the
upcoming consumer changes for 0.8.2 and 0.9, I have a few questions:

1) With respect to the consumer redesign for 0.9, what is the future of the
Simple Consumer and High Level Consumer? To my best understanding, the
existing high level consumer API will be deprecated in favour of the new
consumer API. What is the future of the Simple Consumer, in this case? it
will continue to exist as a low-level API implementing the Kafka protocol
[3] and providing the building blocks for the new consumer, or will it be
deprecated as well?

2) Regarding the new consumer: the v0.8.2 codebase contains an early
implementation of it, but since this a feature scheduled only for 0.9, what
is its status as well? Is it included only as a future reference and for
stabilizing the API?

3) Obviously, offset management is a concern if using the simple consumer,
so - wondering about the Offset Management API as well. The Kafka protocol
document specifically indicates that it will be fully functional in 0.8.2
[4] - however, a functional implementation is already available in 0.8.1.1
(accessible via the SimpleConsumer API but not documented in [5]). Again,
trying to understand the extent of what 0.8.1.1 already supports
(ostensibly, the offset manager support seems to have been added only in
0.8.2 - please correct me if I am wrong), and whether if it is recommended
for use in production in any form (with the caveats that accompany the use
of ZooKeeper).

4) Trying to interpret the existing examples in [6] and the comments on [7]
- the version of the Offset Management API that exists in 0.8.1.1 is using
ZooKeeper - whereas ZooKeeper will be optional in 0.8.2 - to be replaced by
Kafka, and phased out if possible. To my understanding, the switch between
the two will be controlled by the broker configuration (along with other
parameters that control the performance of offset queues. Is that correct?

5) Also, wondering about the timeline of 0.8.2 - according to the roadmaps
it should be released relatively shortly. Is that correct?

Thanks,
Marius

[1] http://projects.spring.io/spring-xd/
[2] https://github.com/spring-projects/spring-integration-kafka
[3]
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
[4]
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
[5] http://kafka.apache.org/082/documentation.html#simpleconsumerapi
[6]
https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
[7] https://issues.apache.org/jira/browse/KAFKA-1729


Re: Kafka broker cannot start after running out of disk space

2014-11-18 Thread Yury Ruchin
Thanks Guozhang for the pointer to the mapped NIO. The issue in my case was
related to the disk still being out of space (I thought I did free up some,
but I actually didn't). Curiously, I ran out of space on two occasions. In
one case the error message was clear "No space left on device", and in
another case that was the cryptic InternalError I mentioned previously.

2014-11-17 20:24 GMT+03:00 Guozhang Wang :

> This is interesting as I have not seen it before. Searched a bit on the web
> and this seems promising?
>
>
> http://stackoverflow.com/questions/2949371/java-map-nio-nfs-issue-causing-a-vm-fault-a-fault-occurred-in-a-recent-uns
>
> Guozhang
>
> On Fri, Nov 14, 2014 at 5:38 AM, Yury Ruchin 
> wrote:
>
> > Hello,
> >
> > I've run into an issue with Kafka 0.8.1.1 broker. The broker stopped
> > working after the disk it was writing to ran out of space. I freed up
> some
> > space and tried to restart the broker. It started some recovery
> procedure,
> > but after some short time in the logs I see the following strange error
> > message:
> >
> > FATAL kafka.server.KafkaServerStartable  - Fatal error during
> > KafkaServerStable startup. Prepare to shutdown
> > java.lang.InternalError: a fault occurred in a recent unsafe memory
> access
> > operation in compiled Java code
> > at java.nio.HeapByteBuffer.(HeapByteBuffer.java:39)
> > at java.nio.ByteBuffer.allocate(ByteBuffer.java:312)
> > at
> > kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:188)
> > at
> > kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:165)
> > at
> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
> > at
> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
> > at kafka.log.LogSegment.recover(LogSegment.scala:165)
> > at kafka.log.Log.recoverLog(Log.scala:179)
> > at kafka.log.Log.loadSegments(Log.scala:155)
> > at kafka.log.Log.(Log.scala:64)
> > at
> >
> >
> kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$4.apply(LogManager.scala:118)
> > at
> >
> >
> kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$4.apply(LogManager.scala:113)
> > at
> >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> > at
> > scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105)
> > at
> > kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:113)
> > at
> > kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:105)
> > at
> >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> > at
> > scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
> > at kafka.log.LogManager.loadLogs(LogManager.scala:105)
> > at kafka.log.LogManager.(LogManager.scala:57)
> > at
> kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275)
> > at kafka.server.KafkaServer.startup(KafkaServer.scala:72)
> > at
> > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
> > at kafka.Kafka$.main(Kafka.scala:46)
> > at kafka.Kafka.main(Kafka.scala)
> >
> > and then everything starts over. I've been waiting for a while, but the
> > broker keeps restarting. How can I bring it back to life?
> >
> > Thanks!
> >
>
>
>
> --
> -- Guozhang
>