Re: Broker brought down and under replicated partitions

2014-10-16 Thread Jean-Pascal Billaud
The only thing that I find very weird is the fact that brokers that are
dead are still part of the ISR set for hours... and are basically not
removed. Note this is not constantly the case, most of the dead brokers are
properly removed and it is really just in a few cases. I am not sure why
this would happen. Is there a known issue in the 0.8.0 version that was
fixed later on? What can I do to diagnose/fix the situation?

Thanks,

On Wed, Oct 15, 2014 at 9:58 AM, Jean-Pascal Billaud j...@tellapart.com
wrote:

 So I am using 0.8.0. I think I found the issue actually. It turns out that
 some partitions only had a single replica and the leaders of those
 partitions would basically refuse new writes. As soon as I reassigned
 replicas to those partitions things kicked off again. Not sure if that's
 expected... but that seemed to make the problem go away.

 Thanks,


 On Wed, Oct 15, 2014 at 6:46 AM, Neha Narkhede neha.narkh...@gmail.com
 wrote:

 Which version of Kafka are you using? The current stable one is 0.8.1.1

 On Tue, Oct 14, 2014 at 5:51 PM, Jean-Pascal Billaud j...@tellapart.com
 wrote:

  Hey Neha,
 
  so I removed another broker like 30mn ago and since then basically the
  Producer is dying with:
 
  Event queue is full of unsent messages, could not send event:
  KeyedMessage(my_topic,[B@1b71b7a6,[B@35fdd1e7)
  kafka.common.QueueFullException: Event queue is full of unsent messages,
  could not send event: KeyedMessage(my_topic,[B@1b71b7a6,[B@35fdd1e7)
  at kafka.producer.Producer$$anonfun$asyncSend$1.apply(Unknown Source)
  ~[kafka_2.10-0.8.0.jar:0.8.0]
  at kafka.producer.Producer$$anonfun$asyncSend$1.apply(Unknown Source)
  ~[kafka_2.10-0.8.0.jar:0.8.0]
  at scala.collection.Iterator$class.foreach(Iterator.scala:727)
  ~[scala-library-2.10.3.jar:na]
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
  ~[scala-library-2.10.3.jar:na]
  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
  ~[scala-library-2.10.3.jar:na]
  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
  ~[scala-library-2.10.3.jar:na]
  at kafka.producer.Producer.asyncSend(Unknown Source)
  ~[kafka_2.10-0.8.0.jar:0.8.0]
  at kafka.producer.Producer.send(Unknown Source)
  ~[kafka_2.10-0.8.0.jar:0.8.0]
  at kafka.javaapi.producer.Producer.send(Unknown Source)
  ~[kafka_2.10-0.8.0.jar:0.8.0]
 
  It seems like it cannot recover for some reasons. The new leaders were
  elected it seems like so it should have picked up the new meta data
  information about the partitions. Is this something known from 0.8.0?
 What
  should be looking for to debug/fix this?
 
  Thanks,
 
  On Tue, Oct 14, 2014 at 2:22 PM, Neha Narkhede neha.narkh...@gmail.com
 
  wrote:
 
   Regarding (1), I am assuming that it is expected that brokers going
 down
   will be brought back up soon. At which point, they will pick up from
 the
   current leader and get back into the ISR. Am I right?
  
   The broker will be added back to the ISR once it is restarted, but it
  never
   goes out of the replica list until the admin explicitly moves it using
  the
   reassign partitions tool.
  
   Regarding (2), I finally kicked off a reassign_partitions admin task
  adding
   broker 7 to the replicas list for partition 0 which finally fixed the
  under
   replicated issue:
   Is this therefore expected that the user will fix up the under
  replication
   situation?
  
   Yes. Currently, partition reassignment is purely an admin initiated
 task.
  
   Another thing I'd like to clarify is that for another topic Y, broker
 5
  was
   never removed from the ISR array. Note that Y is an unused topic so I
 am
   guessing that technically broker 5 is not out of sync... though it is
  still
   dead. Is this the expected behavior?
  
   Not really. After replica.lag.time.max.ms (which defaults to 10
  seconds),
   the leader should remove the dead broker out of the ISR.
  
   Thanks,
   Neha
  
   On Tue, Oct 14, 2014 at 9:27 AM, Jean-Pascal Billaud 
 j...@tellapart.com
   wrote:
  
hey folks,
   
I have been testing a kafka cluster of 10 nodes on AWS using version
2.8.0-0.8.0
and see some behavior on failover that I want to make sure I
  understand.
   
Initially, I have a topic X with 30 partitions and a replication
 factor
   of
3. Looking at the partition 0:
partition: 0 - leader: 5 preferred leader: 5 brokers: [5, 3, 4]
  in-sync:
[5, 3, 4]
   
While killing broker 5, the controller immediately grab the next
  replica
   in
the ISR and assign it as a leader:
partition: 0 - leader: 3 preferred leader: 5 brokers: [5, 3, 4]
  in-sync:
[3, 4]
   
There are couple of things at this point I would like to clarify:
   
(1) Why is broker 5 still in the brokers array for partition 0? Note
  this
broker array comes from a get of the zookeeper path
   /brokers/topics/[topic]
as documented.
(2) Partition 0 is now under replicated and the controller does not
  seem
   to
do 

Re: Kafka - NotLeaderForPartitionException / LeaderNotAvailableException

2014-10-16 Thread Abraham Jacob
Will definitely take a thread dump! So, far its been running fine.

-Jacob

On Wed, Oct 15, 2014 at 8:40 PM, Jun Rao jun...@gmail.com wrote:

 If you see the hanging again, it would be great if you can take a thread
 dump so that we know where it is hanging.

 Thanks,

 Jun

 On Tue, Oct 14, 2014 at 10:35 PM, Abraham Jacob abe.jac...@gmail.com
 wrote:

  Hi Jun,
 
  Thanks for responding...
 
  I am using Kafka 2.9.2-0.8.1.1
 
  I looked through the controller logs on a couple of nodes and did not
 find
  any exceptions or error.
 
  However in the state change log I see a bunch of the following
 exceptions -
 
  [2014-10-13 14:39:12,475] TRACE Controller 3 epoch 116 started leader
  election for partition [wordcount,1] (state.change.logger)
  [2014-10-13 14:39:12,479] ERROR Controller 3 epoch 116 initiated state
  change for partition [wordcount,1] from OfflinePartition to
 OnlinePartition
  failed (state.change.logger)
  kafka.common.NoReplicaOnlineException: No replica for partition
  [wordcount,1] is alive. Live brokers are: [Set()], Assigned replicas are:
  [List(8, 7, 1)]
  at
 
 
 kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:61)
  at
 
 
 kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:336)
  at
 
 
 kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:185)
  at
 
 
 kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:99)
  at
 
 
 kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:96)
  at
 
 
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
  at
 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
  at
 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
  at
  scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
  at
 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
  at
 scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
  at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
  at
 
 
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
  at
 
 
 kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:96)
  at
 
 
 kafka.controller.PartitionStateMachine.startup(PartitionStateMachine.scala:68)
  at
 
 
 kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:312)
  at
 
 
 kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:162)
  at
 
 kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:63)
  at
 
 
 kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:123)
  at
 
 
 kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:118)
  at
 
 
 kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:118)
  at kafka.utils.Utils$.inLock(Utils.scala:538)
  at
 
 
 kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:118)
  at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549)
  at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
 
 
  Anyways, this morning after sending out the email, I set out to restart
 all
  the brokers. I found that 3 brokers were in a hung state. I tried to use
  the bin/kafka-server-stop.sh script (which is nothing but sending a
 SIGINT
  signal), the java process running kafka would not terminate, I then
 issued
  a 'kill -SIGTERM x' for the java process running Kafka, yet the
 process
  would not terminate. This happened only on 3 nodes (1 node is running
 only
  1 broker). For the other nodes kafka-server-stop.sh successfully bought
  down the java process running Kafka.
 
  For the three brokers that was not responding to either SIGINT and
 SIGTERM
  signal I issued a SIGKILL instead and this, for sure brought down the
  process.
 
  I then restarted brokers on all nodes. After that I again ran the
 describe
  topic script.
 
  bin/kafka-topics.sh --describe --zookeeper tr-pan-hclstr-08.amers1b.
  ciscloud:2181/kafka/kafka-clstr-01 --topic wordcount
 
 
  Topic:wordcount PartitionCount:8ReplicationFactor:3 Configs:
  Topic: wordcountPartition: 0Leader: 7   Replicas:
  7,6,8 Isr: 6,7,8
  Topic: wordcount   

getOffsetsBefore(...) = kafka.common.UnknownException

2014-10-16 Thread Magnus Vojbacke
Hi,

I’m trying to make a request for offset information from my broker, and I get a 
kafka.common.UnknownException as the result.

I’m trying to use the Simple Consumer API



val topicAndPartition = new TopicAndPartition(“topic3”, 0)
val requestInfo = new java.util.HashMap[TopicAndPartition, 
PartitionOffsetRequestInfo]()
requestInfo.put(topicAndPartition, new 
PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime, 1))

val request = new kafka.javaapi.OffsetRequest(requestInfo, 
kafka.api.OffsetRequest.CurrentVersion, clientName)

import kafka.javaapi._
// conn: kafka.javaapi.consumer.SimpleConsumer
val response: OffsetResponse = conn.getOffsetsBefore(request)

println(got response [ + response + “])



Output:
got response [OffsetResponse(0,Map([test3,1] - error: 
kafka.common.UnknownException offsets: 0))]


I really can’t figure out why I’m getting this response. As far as I know, 
“topic3” with partition “0” exists on the broker, and I can use 
bin/kafka-console-consumer.sh to consume from it without any problems.


Is there any idea of what could cause this exception?

As it is right now, I’m not even sure if the request gets to the broker. Is 
there any way of activating more verbose logs on the broker?

I think I’m using a trunk build (2.10-0.8.3-SNAPSHOT)


BR
/Magnus



Re: Consistency and Availability on Node Failures

2014-10-16 Thread Gwen Shapira
Just note that this is not  a universal solution. Many use-cases care
about which partition you end up writing to since partitions are used
to... well, partition logical entities such as customers and users.



On Wed, Oct 15, 2014 at 9:03 PM, Jun Rao jun...@gmail.com wrote:
 Kyle,

 What you wanted is not supported out of box. You can achieve this using the
 new java producer. The new java producer allows you to pick an arbitrary
 partition when sending a message. If you receive NotEnoughReplicasException
 when sending a message, you can resend it to another partition.

 Thanks,

 Jun

 On Tue, Oct 14, 2014 at 1:51 PM, Kyle Banker kyleban...@gmail.com wrote:

 Consider a 12-node Kafka cluster with a 200-parition topic having a
 replication factor of 3. Let's assume, in addition, that we're running
 Kafka v0.8.2, we've disabled unclean leader election, acks is -1, and
 min.isr is 2.

 Now suppose we lose 2 nodes. In this case, there's a good chance that 2/3
 replicas of one or more partitions will be unavailable. This means that
 messages assigned to those partitions will not be writable. If we're
 writing a large number of messages, I would expect that all producers would
 eventually halt. It is somewhat surprising that, if we rely on a basic
 durability setting, the cluster would likely be unavailable even after
 losing only 2 / 12 nodes.

 It might be useful in this scenario for the producer to be able to detect
 which partitions are no longer available and reroute messages that would
 have hashed to the unavailable partitions (as defined by our acks and
 min.isr settings). This way, the cluster as a whole would remain available
 for writes at the cost of a slightly higher load on the remaining machines.

 Is this limitation accurately described? Is the proposed producer
 functionality worth pursuing?



Re: Cross-Data-Center Mirroring, and Guaranteed Minimum Time Period on Data

2014-10-16 Thread Gwen Shapira
I assume the messages themselves contain the timestamp?

If you use Flume, you can configure a Kafka source to pull data from
Kafka, use an interceptor to pull the date out of your message and
place it in the event header and then the HDFS sink can write to a
partition based on the timestamp.

Gwen

On Wed, Oct 15, 2014 at 8:47 PM, Jun Rao jun...@gmail.com wrote:
 One way you can do that is to continually load data from Kafka to Hadoop.
 During load, you put data into different HDFS directories based on the
 timestamp. The Hadoop admin can decide when to open up those directories
 for read based on whether data from all data centers have arrived.

 Thanks,

 Jun

 On Tue, Oct 14, 2014 at 11:54 PM, Alex Melville amelvi...@g.hmc.edu wrote:

 Hi Apache Community,


 My company has the following use case. We have multiple geographically
 disparate data centers each with their own Kafka cluster, and we want to
 aggregate all of these center's data to one central Kafka cluster located
 in a data center distinct from the rest using MirrorMaker. Once in the
 central cluster, most of this data will be fed into Hadoop for analytics
 purposes. However, with how we have Hadoop working right now, it must wait
 until it has received data from all of the other data centers for a
 specific time period before it has the green light to load that data into
 HDFS and process it. For example, say we have 3 remote (as in not central)
 data centers, and DC1 has pushed to the central data center all of its data
 up to 4:00 PM, DC2 has pushed everything up to 3:30 PM, and DC2 is lagging
 behind and only pushed data up to the 2:00PM time period. Then Hadoop
 processes all data tagged with modification times before 2:00PM, and it
 must wait until DC3 catches up by pushing 2:15, 2:30, etc. data to the
 central cluster before it can process the 3:00 PM data.

 So our question is: What is the best way to handle this time-period-ordered
 requirement on our data using a distributed messaging log like Kafka? We
 originally started using Kafka to move away from a batch-oriented backend
 data pipeline transport system in favor of a more streaming-focused system,
 but we still need to keep track of the latest common time period of data
 streaming in from the remote clusters.


 Cheers,

 Alex M.



Re: Cross-Data-Center Mirroring, and Guaranteed Minimum Time Period on Data

2014-10-16 Thread Andrew Otto
Check out Camus.  It was built to do parallel loads from Kafka into time 
bucketed directories in HDFS.



On Oct 16, 2014, at 9:32 AM, Gwen Shapira gshap...@cloudera.com wrote:

 I assume the messages themselves contain the timestamp?
 
 If you use Flume, you can configure a Kafka source to pull data from
 Kafka, use an interceptor to pull the date out of your message and
 place it in the event header and then the HDFS sink can write to a
 partition based on the timestamp.
 
 Gwen
 
 On Wed, Oct 15, 2014 at 8:47 PM, Jun Rao jun...@gmail.com wrote:
 One way you can do that is to continually load data from Kafka to Hadoop.
 During load, you put data into different HDFS directories based on the
 timestamp. The Hadoop admin can decide when to open up those directories
 for read based on whether data from all data centers have arrived.
 
 Thanks,
 
 Jun
 
 On Tue, Oct 14, 2014 at 11:54 PM, Alex Melville amelvi...@g.hmc.edu wrote:
 
 Hi Apache Community,
 
 
 My company has the following use case. We have multiple geographically
 disparate data centers each with their own Kafka cluster, and we want to
 aggregate all of these center's data to one central Kafka cluster located
 in a data center distinct from the rest using MirrorMaker. Once in the
 central cluster, most of this data will be fed into Hadoop for analytics
 purposes. However, with how we have Hadoop working right now, it must wait
 until it has received data from all of the other data centers for a
 specific time period before it has the green light to load that data into
 HDFS and process it. For example, say we have 3 remote (as in not central)
 data centers, and DC1 has pushed to the central data center all of its data
 up to 4:00 PM, DC2 has pushed everything up to 3:30 PM, and DC2 is lagging
 behind and only pushed data up to the 2:00PM time period. Then Hadoop
 processes all data tagged with modification times before 2:00PM, and it
 must wait until DC3 catches up by pushing 2:15, 2:30, etc. data to the
 central cluster before it can process the 3:00 PM data.
 
 So our question is: What is the best way to handle this time-period-ordered
 requirement on our data using a distributed messaging log like Kafka? We
 originally started using Kafka to move away from a batch-oriented backend
 data pipeline transport system in favor of a more streaming-focused system,
 but we still need to keep track of the latest common time period of data
 streaming in from the remote clusters.
 
 
 Cheers,
 
 Alex M.
 



Re: Consistency and Availability on Node Failures

2014-10-16 Thread Kyle Banker
I didn't realize that anyone used partitions to logically divide a topic.
When would that be preferable to simply having a separate topic? Isn't this
a minority case?

On Thu, Oct 16, 2014 at 7:28 AM, Gwen Shapira gshap...@cloudera.com wrote:

 Just note that this is not  a universal solution. Many use-cases care
 about which partition you end up writing to since partitions are used
 to... well, partition logical entities such as customers and users.



 On Wed, Oct 15, 2014 at 9:03 PM, Jun Rao jun...@gmail.com wrote:
  Kyle,
 
  What you wanted is not supported out of box. You can achieve this using
 the
  new java producer. The new java producer allows you to pick an arbitrary
  partition when sending a message. If you receive
 NotEnoughReplicasException
  when sending a message, you can resend it to another partition.
 
  Thanks,
 
  Jun
 
  On Tue, Oct 14, 2014 at 1:51 PM, Kyle Banker kyleban...@gmail.com
 wrote:
 
  Consider a 12-node Kafka cluster with a 200-parition topic having a
  replication factor of 3. Let's assume, in addition, that we're running
  Kafka v0.8.2, we've disabled unclean leader election, acks is -1, and
  min.isr is 2.
 
  Now suppose we lose 2 nodes. In this case, there's a good chance that
 2/3
  replicas of one or more partitions will be unavailable. This means that
  messages assigned to those partitions will not be writable. If we're
  writing a large number of messages, I would expect that all producers
 would
  eventually halt. It is somewhat surprising that, if we rely on a basic
  durability setting, the cluster would likely be unavailable even after
  losing only 2 / 12 nodes.
 
  It might be useful in this scenario for the producer to be able to
 detect
  which partitions are no longer available and reroute messages that would
  have hashed to the unavailable partitions (as defined by our acks and
  min.isr settings). This way, the cluster as a whole would remain
 available
  for writes at the cost of a slightly higher load on the remaining
 machines.
 
  Is this limitation accurately described? Is the proposed producer
  functionality worth pursuing?
 



Re: 0.8.x = 0.8.2 upgrade - live seamless?

2014-10-16 Thread Neha Narkhede
Yes, you should be able to upgrade seamlessly.

On Wed, Oct 15, 2014 at 10:07 PM, Otis Gospodnetic 
otis.gospodne...@gmail.com wrote:

 Hi,

 Some of our SPM users who are eager to monitor their Kafka 0.8.x clusters
 with SPM are asking us whether the upgrade to 0.8.2 from 0.8.1 will be
 seamless.  I believe this will be the case, but wanted to double-check on
 that...

 Thanks,
 Otis
 --
 Monitoring * Alerting * Anomaly Detection * Centralized Log Management
 Solr  Elasticsearch Support * http://sematext.com/



Re: Kafka/Zookeeper deployment Questions

2014-10-16 Thread Neha Narkhede
In other words, if I change the number of partitions, can I restart the
brokers one at a time so that I can continue processing data?

Changing the # of partitions is an online operation and doesn't require
restarting the brokers. However, any other configuration (with the
exception of a few operations) that requires a broker restart can be done
in a rolling manner.

On Wed, Oct 15, 2014 at 7:16 PM, Sybrandy, Casey 
casey.sybra...@six3systems.com wrote:

 Hello,

 We're looking into deploying Kafka and Zookeeper into an environment where
 we want things to be as easy to stand up and administer.  To do this, we're
 looking into using Consul, or similar, and Confd to try to make this as
 automatic as possible.  I was wondering if anyone had an experience in this
 area.  My major concern is reconfiguring Kafka as, in my experience, is
 making sure we don't end up losing messages.

 Also, can kafka and zookeeper be reconfigured in a rolling manner?  In
 other words, if I change the number of partitions, can I restart the
 brokers one at a time so that I can continue processing data?

 Thanks.


Re: Broker brought down and under replicated partitions

2014-10-16 Thread Neha Narkhede
Is there a known issue in the 0.8.0 version that was
fixed later on? What can I do to diagnose/fix the situation?

Yes, quite a few bugs related to this have been fixed since 0.8.0. I'd
suggest upgrading to 0.8.1.1

On Wed, Oct 15, 2014 at 11:09 PM, Jean-Pascal Billaud j...@tellapart.com
wrote:

 The only thing that I find very weird is the fact that brokers that are
 dead are still part of the ISR set for hours... and are basically not
 removed. Note this is not constantly the case, most of the dead brokers are
 properly removed and it is really just in a few cases. I am not sure why
 this would happen. Is there a known issue in the 0.8.0 version that was
 fixed later on? What can I do to diagnose/fix the situation?

 Thanks,

 On Wed, Oct 15, 2014 at 9:58 AM, Jean-Pascal Billaud j...@tellapart.com
 wrote:

  So I am using 0.8.0. I think I found the issue actually. It turns out
 that
  some partitions only had a single replica and the leaders of those
  partitions would basically refuse new writes. As soon as I reassigned
  replicas to those partitions things kicked off again. Not sure if that's
  expected... but that seemed to make the problem go away.
 
  Thanks,
 
 
  On Wed, Oct 15, 2014 at 6:46 AM, Neha Narkhede neha.narkh...@gmail.com
  wrote:
 
  Which version of Kafka are you using? The current stable one is 0.8.1.1
 
  On Tue, Oct 14, 2014 at 5:51 PM, Jean-Pascal Billaud j...@tellapart.com
  wrote:
 
   Hey Neha,
  
   so I removed another broker like 30mn ago and since then basically the
   Producer is dying with:
  
   Event queue is full of unsent messages, could not send event:
   KeyedMessage(my_topic,[B@1b71b7a6,[B@35fdd1e7)
   kafka.common.QueueFullException: Event queue is full of unsent
 messages,
   could not send event: KeyedMessage(my_topic,[B@1b71b7a6,[B@35fdd1e7)
   at kafka.producer.Producer$$anonfun$asyncSend$1.apply(Unknown Source)
   ~[kafka_2.10-0.8.0.jar:0.8.0]
   at kafka.producer.Producer$$anonfun$asyncSend$1.apply(Unknown Source)
   ~[kafka_2.10-0.8.0.jar:0.8.0]
   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
   ~[scala-library-2.10.3.jar:na]
   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
   ~[scala-library-2.10.3.jar:na]
   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
   ~[scala-library-2.10.3.jar:na]
   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
   ~[scala-library-2.10.3.jar:na]
   at kafka.producer.Producer.asyncSend(Unknown Source)
   ~[kafka_2.10-0.8.0.jar:0.8.0]
   at kafka.producer.Producer.send(Unknown Source)
   ~[kafka_2.10-0.8.0.jar:0.8.0]
   at kafka.javaapi.producer.Producer.send(Unknown Source)
   ~[kafka_2.10-0.8.0.jar:0.8.0]
  
   It seems like it cannot recover for some reasons. The new leaders were
   elected it seems like so it should have picked up the new meta data
   information about the partitions. Is this something known from 0.8.0?
  What
   should be looking for to debug/fix this?
  
   Thanks,
  
   On Tue, Oct 14, 2014 at 2:22 PM, Neha Narkhede 
 neha.narkh...@gmail.com
  
   wrote:
  
Regarding (1), I am assuming that it is expected that brokers going
  down
will be brought back up soon. At which point, they will pick up from
  the
current leader and get back into the ISR. Am I right?
   
The broker will be added back to the ISR once it is restarted, but
 it
   never
goes out of the replica list until the admin explicitly moves it
 using
   the
reassign partitions tool.
   
Regarding (2), I finally kicked off a reassign_partitions admin task
   adding
broker 7 to the replicas list for partition 0 which finally fixed
 the
   under
replicated issue:
Is this therefore expected that the user will fix up the under
   replication
situation?
   
Yes. Currently, partition reassignment is purely an admin initiated
  task.
   
Another thing I'd like to clarify is that for another topic Y,
 broker
  5
   was
never removed from the ISR array. Note that Y is an unused topic so
 I
  am
guessing that technically broker 5 is not out of sync... though it
 is
   still
dead. Is this the expected behavior?
   
Not really. After replica.lag.time.max.ms (which defaults to 10
   seconds),
the leader should remove the dead broker out of the ISR.
   
Thanks,
Neha
   
On Tue, Oct 14, 2014 at 9:27 AM, Jean-Pascal Billaud 
  j...@tellapart.com
wrote:
   
 hey folks,

 I have been testing a kafka cluster of 10 nodes on AWS using
 version
 2.8.0-0.8.0
 and see some behavior on failover that I want to make sure I
   understand.

 Initially, I have a topic X with 30 partitions and a replication
  factor
of
 3. Looking at the partition 0:
 partition: 0 - leader: 5 preferred leader: 5 brokers: [5, 3, 4]
   in-sync:
 [5, 3, 4]

 While killing broker 5, the controller immediately grab the next
   replica
in
 the ISR and assign it as a leader:
 

[Kafka-users] Producer not distributing across all partitions

2014-10-16 Thread Mungeol Heo
Hi,

I have a question about 'topic.metadata.refresh.interval.ms' configuration.
As I know, the default value of it is 10 minutes.
Does it means that producer will change the partition at every 10 minutes?
What I am experiencing is producer does not change to another
partition at every 10 minutes.
Sometime, It never changed during the process which costs about 25 minutes.
I also changed the value of it to 1 minute for testing.
It looks like working well at first time.
However, same problem happens start from second test.
Sometime, it takes more than 10 minutes to change the partition even
if I set the value as 1 minute.
Am i missing something?
Any help will be great.

Thanks.

- Mungeol


Monitoring connection with kafka client

2014-10-16 Thread Alex Objelean
Hi,

I'm trying to monitor the kafka connection on the consumer side. In other
words, if the broker cluster is unavailable (or zookeer dies), I would like
to know about that problem as soon as possible.
Unfortunately, I didn't find anything useful to achieve that when using
kafka library.
Are there any suggestions about how to fix this issue?

Thanks,
Alex


read N items from topic

2014-10-16 Thread Josh J
hi,

How do I read N items from a topic? I also would like to do this for a
consumer group, so that each consumer can specify an N number of tuples to
read, and each consumer reads distinct tuples.

Thanks,
Josh


Re: java api code and javadoc

2014-10-16 Thread 4mayank
Thanks Joseph. I built the javadoc but its incomplete.
Where can I find the code itself for classes like KafkaStream,
MessageAndOffset, CosumerConnector etc?

On Wed, Oct 15, 2014 at 11:10 AM, Joseph Lawson jlaw...@roomkey.com wrote:

 You probably have to build your own right now.  Check out
 https://github.com/apache/kafka#building-javadocs-and-scaladocs
 
 From: 4mayank 4may...@gmail.com
 Sent: Wednesday, October 15, 2014 11:38 AM
 To: users@kafka.apache.org
 Subject: java api code and javadoc

 Hi

 I downloaded kafka 0.8.1.1 src and went through some documentation and
 wikis, but could not find any documentation (javadoc or other) on the java
 API - info on classes like SimpleConsumer, MessageAndOffset etc. Nor could
 I locate the source code (.java). I see only scala files.

 Can anyone provide info on where I can find doc to get list of attributes,
 methods, signatures etc?

 Thanks.
 -Mayank.



Re: Consistency and Availability on Node Failures

2014-10-16 Thread gshapira
It may be a minority,  I can't tell yet. But in some apps we need to know that 
a consumer, who is assigned a single partition, will get all data about a 
subset of users.
This is way more flexible than multiple topics since we still have the benefits 
of partition reassignment,  load balancing between consumers, fault protection, 
 etc.

—
Sent from Mailbox

On Thu, Oct 16, 2014 at 9:52 AM, Kyle Banker kyleban...@gmail.com wrote:

 I didn't realize that anyone used partitions to logically divide a topic.
 When would that be preferable to simply having a separate topic? Isn't this
 a minority case?
 On Thu, Oct 16, 2014 at 7:28 AM, Gwen Shapira gshap...@cloudera.com wrote:
 Just note that this is not  a universal solution. Many use-cases care
 about which partition you end up writing to since partitions are used
 to... well, partition logical entities such as customers and users.



 On Wed, Oct 15, 2014 at 9:03 PM, Jun Rao jun...@gmail.com wrote:
  Kyle,
 
  What you wanted is not supported out of box. You can achieve this using
 the
  new java producer. The new java producer allows you to pick an arbitrary
  partition when sending a message. If you receive
 NotEnoughReplicasException
  when sending a message, you can resend it to another partition.
 
  Thanks,
 
  Jun
 
  On Tue, Oct 14, 2014 at 1:51 PM, Kyle Banker kyleban...@gmail.com
 wrote:
 
  Consider a 12-node Kafka cluster with a 200-parition topic having a
  replication factor of 3. Let's assume, in addition, that we're running
  Kafka v0.8.2, we've disabled unclean leader election, acks is -1, and
  min.isr is 2.
 
  Now suppose we lose 2 nodes. In this case, there's a good chance that
 2/3
  replicas of one or more partitions will be unavailable. This means that
  messages assigned to those partitions will not be writable. If we're
  writing a large number of messages, I would expect that all producers
 would
  eventually halt. It is somewhat surprising that, if we rely on a basic
  durability setting, the cluster would likely be unavailable even after
  losing only 2 / 12 nodes.
 
  It might be useful in this scenario for the producer to be able to
 detect
  which partitions are no longer available and reroute messages that would
  have hashed to the unavailable partitions (as defined by our acks and
  min.isr settings). This way, the cluster as a whole would remain
 available
  for writes at the cost of a slightly higher load on the remaining
 machines.
 
  Is this limitation accurately described? Is the proposed producer
  functionality worth pursuing?
 


Re: read N items from topic

2014-10-16 Thread gshapira
Using the high level consumer, each consumer in the group can call iter.next () 
in a loop until they get the number of messages you need.

—
Sent from Mailbox

On Thu, Oct 16, 2014 at 10:18 AM, Josh J joshjd...@gmail.com wrote:

 hi,
 How do I read N items from a topic? I also would like to do this for a
 consumer group, so that each consumer can specify an N number of tuples to
 read, and each consumer reads distinct tuples.
 Thanks,
 Josh

Re: java api code and javadoc

2014-10-16 Thread Ewen Cheslack-Postava
KafkaStream and MessageAndOffset are Scala classes, so you'll find them
under the scaladocs. The ConsumerConnector interface should show up in
the javadocs with good documentation coverage. Some classes like
MessageAndOffset are so simple (just compositions of other data) that
they aren't going to have any docs associated with them.

On Thu, Oct 16, 2014, at 08:03 AM, 4mayank wrote:
 Thanks Joseph. I built the javadoc but its incomplete.
 Where can I find the code itself for classes like KafkaStream,
 MessageAndOffset, CosumerConnector etc?
 
 On Wed, Oct 15, 2014 at 11:10 AM, Joseph Lawson jlaw...@roomkey.com
 wrote:
 
  You probably have to build your own right now.  Check out
  https://github.com/apache/kafka#building-javadocs-and-scaladocs
  
  From: 4mayank 4may...@gmail.com
  Sent: Wednesday, October 15, 2014 11:38 AM
  To: users@kafka.apache.org
  Subject: java api code and javadoc
 
  Hi
 
  I downloaded kafka 0.8.1.1 src and went through some documentation and
  wikis, but could not find any documentation (javadoc or other) on the java
  API - info on classes like SimpleConsumer, MessageAndOffset etc. Nor could
  I locate the source code (.java). I see only scala files.
 
  Can anyone provide info on where I can find doc to get list of attributes,
  methods, signatures etc?
 
  Thanks.
  -Mayank.
 


Re: read N items from topic

2014-10-16 Thread Neha Narkhede
Josh,

The consumer's API doesn't allow you to specify N messages, but you can
invoke iter.next() as Gwen suggested and count the messages. Note that the
iterator can block if you have less than N messages so you will have to
careful design around it. The new consumer's API provides a non blocking
poll() API so this sort of use case is better handled. In any case, getting
messages based on a count is something that has to happen on the consumer
side since the server sends the bytes using the sendfile API that doesn't
allow it to inspect the bytes.

Thanks,
Neha

On Thu, Oct 16, 2014 at 8:37 AM, gshap...@cloudera.com wrote:

 Using the high level consumer, each consumer in the group can call
 iter.next () in a loop until they get the number of messages you need.

 —
 Sent from Mailbox

 On Thu, Oct 16, 2014 at 10:18 AM, Josh J joshjd...@gmail.com wrote:

  hi,
  How do I read N items from a topic? I also would like to do this for a
  consumer group, so that each consumer can specify an N number of tuples
 to
  read, and each consumer reads distinct tuples.
  Thanks,
  Josh



Re: [DISCUSS] Release 0.8.2-beta before 0.8.2?

2014-10-16 Thread Neha Narkhede
Another JIRA that will be nice to include as part of 0.8.2-beta is
https://issues.apache.org/jira/browse/KAFKA-1481 that fixes the mbean
naming. Looking for people's thoughts on 2 things here -

1. How do folks feel about doing a 0.8.2-beta release right now and 0.8.2
final 4-5 weeks later?
2. Do people want to include any JIRAs (other than the ones mentioned
above) in 0.8.2-beta? If so, it will be great to know now so it will allow
us to move forward with the beta release quickly.

Thanks,
Neha

On Wed, Oct 15, 2014 at 4:46 PM, Neha Narkhede neha.narkh...@gmail.com
wrote:

 Hi,

 We have accumulated an impressive list of pretty major features in 0.8.2 -
 Delete topic
 Automated leader rebalancing
 Controlled shutdown
 Offset management
 Parallel recovery
 min.isr and
 clean leader election

 In the past, what has worked for major feature releases is a beta release
 prior to a final release. I'm proposing we do the same for 0.8.2. The only
 blockers for 0.8.2-beta, that I know of are -

 https://issues.apache.org/jira/browse/KAFKA-1493 (Is a major change and
 requires some thinking about the new dependency. Since it is not fully
 ready and there are things to think about, I suggest we take it out, think
 it end to end and then include it in 0.8.3.)
 https://issues.apache.org/jira/browse/KAFKA-1634 (This has an owner:
 Guozhang Wang)
 https://issues.apache.org/jira/browse/KAFKA-1671 (Has a patch and is
 waiting on a review by Joe Stein)

 It seems that 1634 and 1671 can get wrapped up in a week. Do people think
 we should cut 0.8.2-beta by next week?

 Thanks,
 Neha



Re: [Kafka-users] Producer not distributing across all partitions

2014-10-16 Thread Neha Narkhede
A topic.metadata.refresh.interval.ms of 10 mins means that the producer
will take 10 mins to detect new partitions. So newly added or reassigned
partitions might not get data for 10 mins. In general, if you're still at
prototyping stages, I'd recommend using the new producer available on kafka
trunk (org.apache.kafka.clients.producer.KafkaProducer). It has better
performance and APIs.

On Thu, Oct 16, 2014 at 3:07 AM, Mungeol Heo mungeol@gmail.com wrote:

 Hi,

 I have a question about 'topic.metadata.refresh.interval.ms'
 configuration.
 As I know, the default value of it is 10 minutes.
 Does it means that producer will change the partition at every 10 minutes?
 What I am experiencing is producer does not change to another
 partition at every 10 minutes.
 Sometime, It never changed during the process which costs about 25 minutes.
 I also changed the value of it to 1 minute for testing.
 It looks like working well at first time.
 However, same problem happens start from second test.
 Sometime, it takes more than 10 minutes to change the partition even
 if I set the value as 1 minute.
 Am i missing something?
 Any help will be great.

 Thanks.

 - Mungeol



Re: Monitoring connection with kafka client

2014-10-16 Thread Neha Narkhede
If you want to know if the Kafka and zookeeper cluster is healthy or not,
you'd want to monitor the cluster directly. Here are pointers for
monitoring the Kafka brokers -
http://kafka.apache.org/documentation.html#monitoring

Thanks,
Neha

On Thu, Oct 16, 2014 at 3:09 AM, Alex Objelean alex.objel...@gmail.com
wrote:

 Hi,

 I'm trying to monitor the kafka connection on the consumer side. In other
 words, if the broker cluster is unavailable (or zookeer dies), I would like
 to know about that problem as soon as possible.
 Unfortunately, I didn't find anything useful to achieve that when using
 kafka library.
 Are there any suggestions about how to fix this issue?

 Thanks,
 Alex



Re: getOffsetsBefore(...) = kafka.common.UnknownException

2014-10-16 Thread Neha Narkhede
Do you see any errors on the broker?
Are you sure that the consumer's fetch offset is set higher than the
largest message in your topic? It should be higher than message.max.bytes
on the broker (which defaults to 1MB).

On Thu, Oct 16, 2014 at 3:56 AM, Magnus Vojbacke 
magnus.vojba...@digitalroute.com wrote:

 Hi,

 I’m trying to make a request for offset information from my broker, and I
 get a kafka.common.UnknownException as the result.

 I’m trying to use the Simple Consumer API



 val topicAndPartition = new TopicAndPartition(“topic3”, 0)
 val requestInfo = new java.util.HashMap[TopicAndPartition,
 PartitionOffsetRequestInfo]()
 requestInfo.put(topicAndPartition, new
 PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime, 1))

 val request = new kafka.javaapi.OffsetRequest(requestInfo,
 kafka.api.OffsetRequest.CurrentVersion, clientName)

 import kafka.javaapi._
 // conn: kafka.javaapi.consumer.SimpleConsumer
 val response: OffsetResponse = conn.getOffsetsBefore(request)

 println(got response [ + response + “])



 Output:
 got response [OffsetResponse(0,Map([test3,1] - error:
 kafka.common.UnknownException offsets: 0))]


 I really can’t figure out why I’m getting this response. As far as I know,
 “topic3” with partition “0” exists on the broker, and I can use
 bin/kafka-console-consumer.sh to consume from it without any problems.


 Is there any idea of what could cause this exception?

 As it is right now, I’m not even sure if the request gets to the broker.
 Is there any way of activating more verbose logs on the broker?

 I think I’m using a trunk build (2.10-0.8.3-SNAPSHOT)


 BR
 /Magnus




Re: Consistency and Availability on Node Failures

2014-10-16 Thread cac...@gmail.com
Knowing that the partitioning is consistent for a given key means that
(apart from other benefits) a given consumer only deals with a partition of
the keyspace. So if you are in a system with tens of millions of users each
consumer only has to store state on a small number of them with
inconsistent partitioning each consumer would have to be able to handle all
of the users. This could just be storing a bit of data for each user or
something much more complicated. You may not care which consumer a given
user ends up on, just that they don't end up on more than one for long
periods of time.

Christian

On Thu, Oct 16, 2014 at 8:20 AM, gshap...@cloudera.com wrote:

 It may be a minority,  I can't tell yet. But in some apps we need to know
 that a consumer, who is assigned a single partition, will get all data
 about a subset of users.
 This is way more flexible than multiple topics since we still have the
 benefits of partition reassignment,  load balancing between consumers,
 fault protection,  etc.

 —
 Sent from Mailbox

 On Thu, Oct 16, 2014 at 9:52 AM, Kyle Banker kyleban...@gmail.com wrote:

  I didn't realize that anyone used partitions to logically divide a topic.
  When would that be preferable to simply having a separate topic? Isn't
 this
  a minority case?
  On Thu, Oct 16, 2014 at 7:28 AM, Gwen Shapira gshap...@cloudera.com
 wrote:
  Just note that this is not  a universal solution. Many use-cases care
  about which partition you end up writing to since partitions are used
  to... well, partition logical entities such as customers and users.
 
 
 
  On Wed, Oct 15, 2014 at 9:03 PM, Jun Rao jun...@gmail.com wrote:
   Kyle,
  
   What you wanted is not supported out of box. You can achieve this
 using
  the
   new java producer. The new java producer allows you to pick an
 arbitrary
   partition when sending a message. If you receive
  NotEnoughReplicasException
   when sending a message, you can resend it to another partition.
  
   Thanks,
  
   Jun
  
   On Tue, Oct 14, 2014 at 1:51 PM, Kyle Banker kyleban...@gmail.com
  wrote:
  
   Consider a 12-node Kafka cluster with a 200-parition topic having a
   replication factor of 3. Let's assume, in addition, that we're
 running
   Kafka v0.8.2, we've disabled unclean leader election, acks is -1, and
   min.isr is 2.
  
   Now suppose we lose 2 nodes. In this case, there's a good chance that
  2/3
   replicas of one or more partitions will be unavailable. This means
 that
   messages assigned to those partitions will not be writable. If we're
   writing a large number of messages, I would expect that all producers
  would
   eventually halt. It is somewhat surprising that, if we rely on a
 basic
   durability setting, the cluster would likely be unavailable even
 after
   losing only 2 / 12 nodes.
  
   It might be useful in this scenario for the producer to be able to
  detect
   which partitions are no longer available and reroute messages that
 would
   have hashed to the unavailable partitions (as defined by our acks and
   min.isr settings). This way, the cluster as a whole would remain
  available
   for writes at the cost of a slightly higher load on the remaining
  machines.
  
   Is this limitation accurately described? Is the proposed producer
   functionality worth pursuing?
  
 



Re: [DISCUSS] Release 0.8.2-beta before 0.8.2?

2014-10-16 Thread Joe Stein
+1 for a 0.8.2-beta next week and 0.8.2.0 final 4-5 weeks later.

I agree to the tickets you brought up to have in 0.8.2-beta and also
https://issues.apache.org/jira/browse/KAFKA-1493 for lz4 compression.

/***
Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
http://www.stealth.ly
Twitter: @allthingshadoop
/
On Oct 16, 2014 12:55 PM, Neha Narkhede neha.narkh...@gmail.com wrote:

 Another JIRA that will be nice to include as part of 0.8.2-beta is
 https://issues.apache.org/jira/browse/KAFKA-1481 that fixes the mbean
 naming. Looking for people's thoughts on 2 things here -

 1. How do folks feel about doing a 0.8.2-beta release right now and 0.8.2
 final 4-5 weeks later?
 2. Do people want to include any JIRAs (other than the ones mentioned
 above) in 0.8.2-beta? If so, it will be great to know now so it will allow
 us to move forward with the beta release quickly.

 Thanks,
 Neha

 On Wed, Oct 15, 2014 at 4:46 PM, Neha Narkhede neha.narkh...@gmail.com
 wrote:

  Hi,
 
  We have accumulated an impressive list of pretty major features in 0.8.2
 -
  Delete topic
  Automated leader rebalancing
  Controlled shutdown
  Offset management
  Parallel recovery
  min.isr and
  clean leader election
 
  In the past, what has worked for major feature releases is a beta release
  prior to a final release. I'm proposing we do the same for 0.8.2. The
 only
  blockers for 0.8.2-beta, that I know of are -
 
  https://issues.apache.org/jira/browse/KAFKA-1493 (Is a major change and
  requires some thinking about the new dependency. Since it is not fully
  ready and there are things to think about, I suggest we take it out,
 think
  it end to end and then include it in 0.8.3.)
  https://issues.apache.org/jira/browse/KAFKA-1634 (This has an owner:
  Guozhang Wang)
  https://issues.apache.org/jira/browse/KAFKA-1671 (Has a patch and is
  waiting on a review by Joe Stein)
 
  It seems that 1634 and 1671 can get wrapped up in a week. Do people think
  we should cut 0.8.2-beta by next week?
 
  Thanks,
  Neha
 



Re: getOffsetsBefore(...) = kafka.common.UnknownException

2014-10-16 Thread Jun Rao
The OffsetRequest can only be answered by the leader of the partition. Did
you connect the SimpleConsumer to the leader broker? If not, you need to
use TopicMetadataRequest to find out the leader broker first.

Thanks,

Jun

On Thu, Oct 16, 2014 at 3:56 AM, Magnus Vojbacke 
magnus.vojba...@digitalroute.com wrote:

 Hi,

 I’m trying to make a request for offset information from my broker, and I
 get a kafka.common.UnknownException as the result.

 I’m trying to use the Simple Consumer API



 val topicAndPartition = new TopicAndPartition(“topic3”, 0)
 val requestInfo = new java.util.HashMap[TopicAndPartition,
 PartitionOffsetRequestInfo]()
 requestInfo.put(topicAndPartition, new
 PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime, 1))

 val request = new kafka.javaapi.OffsetRequest(requestInfo,
 kafka.api.OffsetRequest.CurrentVersion, clientName)

 import kafka.javaapi._
 // conn: kafka.javaapi.consumer.SimpleConsumer
 val response: OffsetResponse = conn.getOffsetsBefore(request)

 println(got response [ + response + “])



 Output:
 got response [OffsetResponse(0,Map([test3,1] - error:
 kafka.common.UnknownException offsets: 0))]


 I really can’t figure out why I’m getting this response. As far as I know,
 “topic3” with partition “0” exists on the broker, and I can use
 bin/kafka-console-consumer.sh to consume from it without any problems.


 Is there any idea of what could cause this exception?

 As it is right now, I’m not even sure if the request gets to the broker.
 Is there any way of activating more verbose logs on the broker?

 I think I’m using a trunk build (2.10-0.8.3-SNAPSHOT)


 BR
 /Magnus




Re: ConsumerOffsetChecker shows none partitions assigned

2014-10-16 Thread Jun Rao
Which version of ZK are you using? Also, see
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whyaretheremanyrebalancesinmyconsumerlog
?

Thanks,

Jun

On Thu, Oct 16, 2014 at 3:29 PM, Hari Gorak hari.go...@rediffmail.com
wrote:

 Project: Kafka

 Issue Type: Bug

 Components: consumer

 Affects Versions: 0.8.0

 Environment: HP 40 x Intel(R) Xeon(R) CPU E5-2470 v2 @ 2.40GHz/1.2e+02GB


 bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker shows some
 partitions having none consumers after re-balance triggered due to new
 consumer joined/disconnected to the group. The lag gets piling up till the
 partitions are assigned to it usually after another re-balance trigger. Is
 this a known issue and if so what is the plan for fixing this? Are there
 any measures to ensure that we don't run into this situation?

 Thanks
 Hari