Re: Broker brought down and under replicated partitions
The only thing that I find very weird is the fact that brokers that are dead are still part of the ISR set for hours... and are basically not removed. Note this is not constantly the case, most of the dead brokers are properly removed and it is really just in a few cases. I am not sure why this would happen. Is there a known issue in the 0.8.0 version that was fixed later on? What can I do to diagnose/fix the situation? Thanks, On Wed, Oct 15, 2014 at 9:58 AM, Jean-Pascal Billaud j...@tellapart.com wrote: So I am using 0.8.0. I think I found the issue actually. It turns out that some partitions only had a single replica and the leaders of those partitions would basically refuse new writes. As soon as I reassigned replicas to those partitions things kicked off again. Not sure if that's expected... but that seemed to make the problem go away. Thanks, On Wed, Oct 15, 2014 at 6:46 AM, Neha Narkhede neha.narkh...@gmail.com wrote: Which version of Kafka are you using? The current stable one is 0.8.1.1 On Tue, Oct 14, 2014 at 5:51 PM, Jean-Pascal Billaud j...@tellapart.com wrote: Hey Neha, so I removed another broker like 30mn ago and since then basically the Producer is dying with: Event queue is full of unsent messages, could not send event: KeyedMessage(my_topic,[B@1b71b7a6,[B@35fdd1e7) kafka.common.QueueFullException: Event queue is full of unsent messages, could not send event: KeyedMessage(my_topic,[B@1b71b7a6,[B@35fdd1e7) at kafka.producer.Producer$$anonfun$asyncSend$1.apply(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at kafka.producer.Producer$$anonfun$asyncSend$1.apply(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at scala.collection.Iterator$class.foreach(Iterator.scala:727) ~[scala-library-2.10.3.jar:na] at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) ~[scala-library-2.10.3.jar:na] at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[scala-library-2.10.3.jar:na] at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[scala-library-2.10.3.jar:na] at kafka.producer.Producer.asyncSend(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at kafka.producer.Producer.send(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at kafka.javaapi.producer.Producer.send(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] It seems like it cannot recover for some reasons. The new leaders were elected it seems like so it should have picked up the new meta data information about the partitions. Is this something known from 0.8.0? What should be looking for to debug/fix this? Thanks, On Tue, Oct 14, 2014 at 2:22 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Regarding (1), I am assuming that it is expected that brokers going down will be brought back up soon. At which point, they will pick up from the current leader and get back into the ISR. Am I right? The broker will be added back to the ISR once it is restarted, but it never goes out of the replica list until the admin explicitly moves it using the reassign partitions tool. Regarding (2), I finally kicked off a reassign_partitions admin task adding broker 7 to the replicas list for partition 0 which finally fixed the under replicated issue: Is this therefore expected that the user will fix up the under replication situation? Yes. Currently, partition reassignment is purely an admin initiated task. Another thing I'd like to clarify is that for another topic Y, broker 5 was never removed from the ISR array. Note that Y is an unused topic so I am guessing that technically broker 5 is not out of sync... though it is still dead. Is this the expected behavior? Not really. After replica.lag.time.max.ms (which defaults to 10 seconds), the leader should remove the dead broker out of the ISR. Thanks, Neha On Tue, Oct 14, 2014 at 9:27 AM, Jean-Pascal Billaud j...@tellapart.com wrote: hey folks, I have been testing a kafka cluster of 10 nodes on AWS using version 2.8.0-0.8.0 and see some behavior on failover that I want to make sure I understand. Initially, I have a topic X with 30 partitions and a replication factor of 3. Looking at the partition 0: partition: 0 - leader: 5 preferred leader: 5 brokers: [5, 3, 4] in-sync: [5, 3, 4] While killing broker 5, the controller immediately grab the next replica in the ISR and assign it as a leader: partition: 0 - leader: 3 preferred leader: 5 brokers: [5, 3, 4] in-sync: [3, 4] There are couple of things at this point I would like to clarify: (1) Why is broker 5 still in the brokers array for partition 0? Note this broker array comes from a get of the zookeeper path /brokers/topics/[topic] as documented. (2) Partition 0 is now under replicated and the controller does not seem to do
Re: Kafka - NotLeaderForPartitionException / LeaderNotAvailableException
Will definitely take a thread dump! So, far its been running fine. -Jacob On Wed, Oct 15, 2014 at 8:40 PM, Jun Rao jun...@gmail.com wrote: If you see the hanging again, it would be great if you can take a thread dump so that we know where it is hanging. Thanks, Jun On Tue, Oct 14, 2014 at 10:35 PM, Abraham Jacob abe.jac...@gmail.com wrote: Hi Jun, Thanks for responding... I am using Kafka 2.9.2-0.8.1.1 I looked through the controller logs on a couple of nodes and did not find any exceptions or error. However in the state change log I see a bunch of the following exceptions - [2014-10-13 14:39:12,475] TRACE Controller 3 epoch 116 started leader election for partition [wordcount,1] (state.change.logger) [2014-10-13 14:39:12,479] ERROR Controller 3 epoch 116 initiated state change for partition [wordcount,1] from OfflinePartition to OnlinePartition failed (state.change.logger) kafka.common.NoReplicaOnlineException: No replica for partition [wordcount,1] is alive. Live brokers are: [Set()], Assigned replicas are: [List(8, 7, 1)] at kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:61) at kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:336) at kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:185) at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:99) at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:96) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95) at scala.collection.Iterator$class.foreach(Iterator.scala:772) at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45) at scala.collection.mutable.HashMap.foreach(HashMap.scala:95) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742) at kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:96) at kafka.controller.PartitionStateMachine.startup(PartitionStateMachine.scala:68) at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:312) at kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:162) at kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:63) at kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:123) at kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:118) at kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:118) at kafka.utils.Utils$.inLock(Utils.scala:538) at kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:118) at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) Anyways, this morning after sending out the email, I set out to restart all the brokers. I found that 3 brokers were in a hung state. I tried to use the bin/kafka-server-stop.sh script (which is nothing but sending a SIGINT signal), the java process running kafka would not terminate, I then issued a 'kill -SIGTERM x' for the java process running Kafka, yet the process would not terminate. This happened only on 3 nodes (1 node is running only 1 broker). For the other nodes kafka-server-stop.sh successfully bought down the java process running Kafka. For the three brokers that was not responding to either SIGINT and SIGTERM signal I issued a SIGKILL instead and this, for sure brought down the process. I then restarted brokers on all nodes. After that I again ran the describe topic script. bin/kafka-topics.sh --describe --zookeeper tr-pan-hclstr-08.amers1b. ciscloud:2181/kafka/kafka-clstr-01 --topic wordcount Topic:wordcount PartitionCount:8ReplicationFactor:3 Configs: Topic: wordcountPartition: 0Leader: 7 Replicas: 7,6,8 Isr: 6,7,8 Topic: wordcount
getOffsetsBefore(...) = kafka.common.UnknownException
Hi, I’m trying to make a request for offset information from my broker, and I get a kafka.common.UnknownException as the result. I’m trying to use the Simple Consumer API val topicAndPartition = new TopicAndPartition(“topic3”, 0) val requestInfo = new java.util.HashMap[TopicAndPartition, PartitionOffsetRequestInfo]() requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime, 1)) val request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion, clientName) import kafka.javaapi._ // conn: kafka.javaapi.consumer.SimpleConsumer val response: OffsetResponse = conn.getOffsetsBefore(request) println(got response [ + response + “]) Output: got response [OffsetResponse(0,Map([test3,1] - error: kafka.common.UnknownException offsets: 0))] I really can’t figure out why I’m getting this response. As far as I know, “topic3” with partition “0” exists on the broker, and I can use bin/kafka-console-consumer.sh to consume from it without any problems. Is there any idea of what could cause this exception? As it is right now, I’m not even sure if the request gets to the broker. Is there any way of activating more verbose logs on the broker? I think I’m using a trunk build (2.10-0.8.3-SNAPSHOT) BR /Magnus
Re: Consistency and Availability on Node Failures
Just note that this is not a universal solution. Many use-cases care about which partition you end up writing to since partitions are used to... well, partition logical entities such as customers and users. On Wed, Oct 15, 2014 at 9:03 PM, Jun Rao jun...@gmail.com wrote: Kyle, What you wanted is not supported out of box. You can achieve this using the new java producer. The new java producer allows you to pick an arbitrary partition when sending a message. If you receive NotEnoughReplicasException when sending a message, you can resend it to another partition. Thanks, Jun On Tue, Oct 14, 2014 at 1:51 PM, Kyle Banker kyleban...@gmail.com wrote: Consider a 12-node Kafka cluster with a 200-parition topic having a replication factor of 3. Let's assume, in addition, that we're running Kafka v0.8.2, we've disabled unclean leader election, acks is -1, and min.isr is 2. Now suppose we lose 2 nodes. In this case, there's a good chance that 2/3 replicas of one or more partitions will be unavailable. This means that messages assigned to those partitions will not be writable. If we're writing a large number of messages, I would expect that all producers would eventually halt. It is somewhat surprising that, if we rely on a basic durability setting, the cluster would likely be unavailable even after losing only 2 / 12 nodes. It might be useful in this scenario for the producer to be able to detect which partitions are no longer available and reroute messages that would have hashed to the unavailable partitions (as defined by our acks and min.isr settings). This way, the cluster as a whole would remain available for writes at the cost of a slightly higher load on the remaining machines. Is this limitation accurately described? Is the proposed producer functionality worth pursuing?
Re: Cross-Data-Center Mirroring, and Guaranteed Minimum Time Period on Data
I assume the messages themselves contain the timestamp? If you use Flume, you can configure a Kafka source to pull data from Kafka, use an interceptor to pull the date out of your message and place it in the event header and then the HDFS sink can write to a partition based on the timestamp. Gwen On Wed, Oct 15, 2014 at 8:47 PM, Jun Rao jun...@gmail.com wrote: One way you can do that is to continually load data from Kafka to Hadoop. During load, you put data into different HDFS directories based on the timestamp. The Hadoop admin can decide when to open up those directories for read based on whether data from all data centers have arrived. Thanks, Jun On Tue, Oct 14, 2014 at 11:54 PM, Alex Melville amelvi...@g.hmc.edu wrote: Hi Apache Community, My company has the following use case. We have multiple geographically disparate data centers each with their own Kafka cluster, and we want to aggregate all of these center's data to one central Kafka cluster located in a data center distinct from the rest using MirrorMaker. Once in the central cluster, most of this data will be fed into Hadoop for analytics purposes. However, with how we have Hadoop working right now, it must wait until it has received data from all of the other data centers for a specific time period before it has the green light to load that data into HDFS and process it. For example, say we have 3 remote (as in not central) data centers, and DC1 has pushed to the central data center all of its data up to 4:00 PM, DC2 has pushed everything up to 3:30 PM, and DC2 is lagging behind and only pushed data up to the 2:00PM time period. Then Hadoop processes all data tagged with modification times before 2:00PM, and it must wait until DC3 catches up by pushing 2:15, 2:30, etc. data to the central cluster before it can process the 3:00 PM data. So our question is: What is the best way to handle this time-period-ordered requirement on our data using a distributed messaging log like Kafka? We originally started using Kafka to move away from a batch-oriented backend data pipeline transport system in favor of a more streaming-focused system, but we still need to keep track of the latest common time period of data streaming in from the remote clusters. Cheers, Alex M.
Re: Cross-Data-Center Mirroring, and Guaranteed Minimum Time Period on Data
Check out Camus. It was built to do parallel loads from Kafka into time bucketed directories in HDFS. On Oct 16, 2014, at 9:32 AM, Gwen Shapira gshap...@cloudera.com wrote: I assume the messages themselves contain the timestamp? If you use Flume, you can configure a Kafka source to pull data from Kafka, use an interceptor to pull the date out of your message and place it in the event header and then the HDFS sink can write to a partition based on the timestamp. Gwen On Wed, Oct 15, 2014 at 8:47 PM, Jun Rao jun...@gmail.com wrote: One way you can do that is to continually load data from Kafka to Hadoop. During load, you put data into different HDFS directories based on the timestamp. The Hadoop admin can decide when to open up those directories for read based on whether data from all data centers have arrived. Thanks, Jun On Tue, Oct 14, 2014 at 11:54 PM, Alex Melville amelvi...@g.hmc.edu wrote: Hi Apache Community, My company has the following use case. We have multiple geographically disparate data centers each with their own Kafka cluster, and we want to aggregate all of these center's data to one central Kafka cluster located in a data center distinct from the rest using MirrorMaker. Once in the central cluster, most of this data will be fed into Hadoop for analytics purposes. However, with how we have Hadoop working right now, it must wait until it has received data from all of the other data centers for a specific time period before it has the green light to load that data into HDFS and process it. For example, say we have 3 remote (as in not central) data centers, and DC1 has pushed to the central data center all of its data up to 4:00 PM, DC2 has pushed everything up to 3:30 PM, and DC2 is lagging behind and only pushed data up to the 2:00PM time period. Then Hadoop processes all data tagged with modification times before 2:00PM, and it must wait until DC3 catches up by pushing 2:15, 2:30, etc. data to the central cluster before it can process the 3:00 PM data. So our question is: What is the best way to handle this time-period-ordered requirement on our data using a distributed messaging log like Kafka? We originally started using Kafka to move away from a batch-oriented backend data pipeline transport system in favor of a more streaming-focused system, but we still need to keep track of the latest common time period of data streaming in from the remote clusters. Cheers, Alex M.
Re: Consistency and Availability on Node Failures
I didn't realize that anyone used partitions to logically divide a topic. When would that be preferable to simply having a separate topic? Isn't this a minority case? On Thu, Oct 16, 2014 at 7:28 AM, Gwen Shapira gshap...@cloudera.com wrote: Just note that this is not a universal solution. Many use-cases care about which partition you end up writing to since partitions are used to... well, partition logical entities such as customers and users. On Wed, Oct 15, 2014 at 9:03 PM, Jun Rao jun...@gmail.com wrote: Kyle, What you wanted is not supported out of box. You can achieve this using the new java producer. The new java producer allows you to pick an arbitrary partition when sending a message. If you receive NotEnoughReplicasException when sending a message, you can resend it to another partition. Thanks, Jun On Tue, Oct 14, 2014 at 1:51 PM, Kyle Banker kyleban...@gmail.com wrote: Consider a 12-node Kafka cluster with a 200-parition topic having a replication factor of 3. Let's assume, in addition, that we're running Kafka v0.8.2, we've disabled unclean leader election, acks is -1, and min.isr is 2. Now suppose we lose 2 nodes. In this case, there's a good chance that 2/3 replicas of one or more partitions will be unavailable. This means that messages assigned to those partitions will not be writable. If we're writing a large number of messages, I would expect that all producers would eventually halt. It is somewhat surprising that, if we rely on a basic durability setting, the cluster would likely be unavailable even after losing only 2 / 12 nodes. It might be useful in this scenario for the producer to be able to detect which partitions are no longer available and reroute messages that would have hashed to the unavailable partitions (as defined by our acks and min.isr settings). This way, the cluster as a whole would remain available for writes at the cost of a slightly higher load on the remaining machines. Is this limitation accurately described? Is the proposed producer functionality worth pursuing?
Re: 0.8.x = 0.8.2 upgrade - live seamless?
Yes, you should be able to upgrade seamlessly. On Wed, Oct 15, 2014 at 10:07 PM, Otis Gospodnetic otis.gospodne...@gmail.com wrote: Hi, Some of our SPM users who are eager to monitor their Kafka 0.8.x clusters with SPM are asking us whether the upgrade to 0.8.2 from 0.8.1 will be seamless. I believe this will be the case, but wanted to double-check on that... Thanks, Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/
Re: Kafka/Zookeeper deployment Questions
In other words, if I change the number of partitions, can I restart the brokers one at a time so that I can continue processing data? Changing the # of partitions is an online operation and doesn't require restarting the brokers. However, any other configuration (with the exception of a few operations) that requires a broker restart can be done in a rolling manner. On Wed, Oct 15, 2014 at 7:16 PM, Sybrandy, Casey casey.sybra...@six3systems.com wrote: Hello, We're looking into deploying Kafka and Zookeeper into an environment where we want things to be as easy to stand up and administer. To do this, we're looking into using Consul, or similar, and Confd to try to make this as automatic as possible. I was wondering if anyone had an experience in this area. My major concern is reconfiguring Kafka as, in my experience, is making sure we don't end up losing messages. Also, can kafka and zookeeper be reconfigured in a rolling manner? In other words, if I change the number of partitions, can I restart the brokers one at a time so that I can continue processing data? Thanks.
Re: Broker brought down and under replicated partitions
Is there a known issue in the 0.8.0 version that was fixed later on? What can I do to diagnose/fix the situation? Yes, quite a few bugs related to this have been fixed since 0.8.0. I'd suggest upgrading to 0.8.1.1 On Wed, Oct 15, 2014 at 11:09 PM, Jean-Pascal Billaud j...@tellapart.com wrote: The only thing that I find very weird is the fact that brokers that are dead are still part of the ISR set for hours... and are basically not removed. Note this is not constantly the case, most of the dead brokers are properly removed and it is really just in a few cases. I am not sure why this would happen. Is there a known issue in the 0.8.0 version that was fixed later on? What can I do to diagnose/fix the situation? Thanks, On Wed, Oct 15, 2014 at 9:58 AM, Jean-Pascal Billaud j...@tellapart.com wrote: So I am using 0.8.0. I think I found the issue actually. It turns out that some partitions only had a single replica and the leaders of those partitions would basically refuse new writes. As soon as I reassigned replicas to those partitions things kicked off again. Not sure if that's expected... but that seemed to make the problem go away. Thanks, On Wed, Oct 15, 2014 at 6:46 AM, Neha Narkhede neha.narkh...@gmail.com wrote: Which version of Kafka are you using? The current stable one is 0.8.1.1 On Tue, Oct 14, 2014 at 5:51 PM, Jean-Pascal Billaud j...@tellapart.com wrote: Hey Neha, so I removed another broker like 30mn ago and since then basically the Producer is dying with: Event queue is full of unsent messages, could not send event: KeyedMessage(my_topic,[B@1b71b7a6,[B@35fdd1e7) kafka.common.QueueFullException: Event queue is full of unsent messages, could not send event: KeyedMessage(my_topic,[B@1b71b7a6,[B@35fdd1e7) at kafka.producer.Producer$$anonfun$asyncSend$1.apply(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at kafka.producer.Producer$$anonfun$asyncSend$1.apply(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at scala.collection.Iterator$class.foreach(Iterator.scala:727) ~[scala-library-2.10.3.jar:na] at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) ~[scala-library-2.10.3.jar:na] at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[scala-library-2.10.3.jar:na] at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[scala-library-2.10.3.jar:na] at kafka.producer.Producer.asyncSend(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at kafka.producer.Producer.send(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at kafka.javaapi.producer.Producer.send(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] It seems like it cannot recover for some reasons. The new leaders were elected it seems like so it should have picked up the new meta data information about the partitions. Is this something known from 0.8.0? What should be looking for to debug/fix this? Thanks, On Tue, Oct 14, 2014 at 2:22 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Regarding (1), I am assuming that it is expected that brokers going down will be brought back up soon. At which point, they will pick up from the current leader and get back into the ISR. Am I right? The broker will be added back to the ISR once it is restarted, but it never goes out of the replica list until the admin explicitly moves it using the reassign partitions tool. Regarding (2), I finally kicked off a reassign_partitions admin task adding broker 7 to the replicas list for partition 0 which finally fixed the under replicated issue: Is this therefore expected that the user will fix up the under replication situation? Yes. Currently, partition reassignment is purely an admin initiated task. Another thing I'd like to clarify is that for another topic Y, broker 5 was never removed from the ISR array. Note that Y is an unused topic so I am guessing that technically broker 5 is not out of sync... though it is still dead. Is this the expected behavior? Not really. After replica.lag.time.max.ms (which defaults to 10 seconds), the leader should remove the dead broker out of the ISR. Thanks, Neha On Tue, Oct 14, 2014 at 9:27 AM, Jean-Pascal Billaud j...@tellapart.com wrote: hey folks, I have been testing a kafka cluster of 10 nodes on AWS using version 2.8.0-0.8.0 and see some behavior on failover that I want to make sure I understand. Initially, I have a topic X with 30 partitions and a replication factor of 3. Looking at the partition 0: partition: 0 - leader: 5 preferred leader: 5 brokers: [5, 3, 4] in-sync: [5, 3, 4] While killing broker 5, the controller immediately grab the next replica in the ISR and assign it as a leader:
[Kafka-users] Producer not distributing across all partitions
Hi, I have a question about 'topic.metadata.refresh.interval.ms' configuration. As I know, the default value of it is 10 minutes. Does it means that producer will change the partition at every 10 minutes? What I am experiencing is producer does not change to another partition at every 10 minutes. Sometime, It never changed during the process which costs about 25 minutes. I also changed the value of it to 1 minute for testing. It looks like working well at first time. However, same problem happens start from second test. Sometime, it takes more than 10 minutes to change the partition even if I set the value as 1 minute. Am i missing something? Any help will be great. Thanks. - Mungeol
Monitoring connection with kafka client
Hi, I'm trying to monitor the kafka connection on the consumer side. In other words, if the broker cluster is unavailable (or zookeer dies), I would like to know about that problem as soon as possible. Unfortunately, I didn't find anything useful to achieve that when using kafka library. Are there any suggestions about how to fix this issue? Thanks, Alex
read N items from topic
hi, How do I read N items from a topic? I also would like to do this for a consumer group, so that each consumer can specify an N number of tuples to read, and each consumer reads distinct tuples. Thanks, Josh
Re: java api code and javadoc
Thanks Joseph. I built the javadoc but its incomplete. Where can I find the code itself for classes like KafkaStream, MessageAndOffset, CosumerConnector etc? On Wed, Oct 15, 2014 at 11:10 AM, Joseph Lawson jlaw...@roomkey.com wrote: You probably have to build your own right now. Check out https://github.com/apache/kafka#building-javadocs-and-scaladocs From: 4mayank 4may...@gmail.com Sent: Wednesday, October 15, 2014 11:38 AM To: users@kafka.apache.org Subject: java api code and javadoc Hi I downloaded kafka 0.8.1.1 src and went through some documentation and wikis, but could not find any documentation (javadoc or other) on the java API - info on classes like SimpleConsumer, MessageAndOffset etc. Nor could I locate the source code (.java). I see only scala files. Can anyone provide info on where I can find doc to get list of attributes, methods, signatures etc? Thanks. -Mayank.
Re: Consistency and Availability on Node Failures
It may be a minority, I can't tell yet. But in some apps we need to know that a consumer, who is assigned a single partition, will get all data about a subset of users. This is way more flexible than multiple topics since we still have the benefits of partition reassignment, load balancing between consumers, fault protection, etc. — Sent from Mailbox On Thu, Oct 16, 2014 at 9:52 AM, Kyle Banker kyleban...@gmail.com wrote: I didn't realize that anyone used partitions to logically divide a topic. When would that be preferable to simply having a separate topic? Isn't this a minority case? On Thu, Oct 16, 2014 at 7:28 AM, Gwen Shapira gshap...@cloudera.com wrote: Just note that this is not a universal solution. Many use-cases care about which partition you end up writing to since partitions are used to... well, partition logical entities such as customers and users. On Wed, Oct 15, 2014 at 9:03 PM, Jun Rao jun...@gmail.com wrote: Kyle, What you wanted is not supported out of box. You can achieve this using the new java producer. The new java producer allows you to pick an arbitrary partition when sending a message. If you receive NotEnoughReplicasException when sending a message, you can resend it to another partition. Thanks, Jun On Tue, Oct 14, 2014 at 1:51 PM, Kyle Banker kyleban...@gmail.com wrote: Consider a 12-node Kafka cluster with a 200-parition topic having a replication factor of 3. Let's assume, in addition, that we're running Kafka v0.8.2, we've disabled unclean leader election, acks is -1, and min.isr is 2. Now suppose we lose 2 nodes. In this case, there's a good chance that 2/3 replicas of one or more partitions will be unavailable. This means that messages assigned to those partitions will not be writable. If we're writing a large number of messages, I would expect that all producers would eventually halt. It is somewhat surprising that, if we rely on a basic durability setting, the cluster would likely be unavailable even after losing only 2 / 12 nodes. It might be useful in this scenario for the producer to be able to detect which partitions are no longer available and reroute messages that would have hashed to the unavailable partitions (as defined by our acks and min.isr settings). This way, the cluster as a whole would remain available for writes at the cost of a slightly higher load on the remaining machines. Is this limitation accurately described? Is the proposed producer functionality worth pursuing?
Re: read N items from topic
Using the high level consumer, each consumer in the group can call iter.next () in a loop until they get the number of messages you need. — Sent from Mailbox On Thu, Oct 16, 2014 at 10:18 AM, Josh J joshjd...@gmail.com wrote: hi, How do I read N items from a topic? I also would like to do this for a consumer group, so that each consumer can specify an N number of tuples to read, and each consumer reads distinct tuples. Thanks, Josh
Re: java api code and javadoc
KafkaStream and MessageAndOffset are Scala classes, so you'll find them under the scaladocs. The ConsumerConnector interface should show up in the javadocs with good documentation coverage. Some classes like MessageAndOffset are so simple (just compositions of other data) that they aren't going to have any docs associated with them. On Thu, Oct 16, 2014, at 08:03 AM, 4mayank wrote: Thanks Joseph. I built the javadoc but its incomplete. Where can I find the code itself for classes like KafkaStream, MessageAndOffset, CosumerConnector etc? On Wed, Oct 15, 2014 at 11:10 AM, Joseph Lawson jlaw...@roomkey.com wrote: You probably have to build your own right now. Check out https://github.com/apache/kafka#building-javadocs-and-scaladocs From: 4mayank 4may...@gmail.com Sent: Wednesday, October 15, 2014 11:38 AM To: users@kafka.apache.org Subject: java api code and javadoc Hi I downloaded kafka 0.8.1.1 src and went through some documentation and wikis, but could not find any documentation (javadoc or other) on the java API - info on classes like SimpleConsumer, MessageAndOffset etc. Nor could I locate the source code (.java). I see only scala files. Can anyone provide info on where I can find doc to get list of attributes, methods, signatures etc? Thanks. -Mayank.
Re: read N items from topic
Josh, The consumer's API doesn't allow you to specify N messages, but you can invoke iter.next() as Gwen suggested and count the messages. Note that the iterator can block if you have less than N messages so you will have to careful design around it. The new consumer's API provides a non blocking poll() API so this sort of use case is better handled. In any case, getting messages based on a count is something that has to happen on the consumer side since the server sends the bytes using the sendfile API that doesn't allow it to inspect the bytes. Thanks, Neha On Thu, Oct 16, 2014 at 8:37 AM, gshap...@cloudera.com wrote: Using the high level consumer, each consumer in the group can call iter.next () in a loop until they get the number of messages you need. — Sent from Mailbox On Thu, Oct 16, 2014 at 10:18 AM, Josh J joshjd...@gmail.com wrote: hi, How do I read N items from a topic? I also would like to do this for a consumer group, so that each consumer can specify an N number of tuples to read, and each consumer reads distinct tuples. Thanks, Josh
Re: [DISCUSS] Release 0.8.2-beta before 0.8.2?
Another JIRA that will be nice to include as part of 0.8.2-beta is https://issues.apache.org/jira/browse/KAFKA-1481 that fixes the mbean naming. Looking for people's thoughts on 2 things here - 1. How do folks feel about doing a 0.8.2-beta release right now and 0.8.2 final 4-5 weeks later? 2. Do people want to include any JIRAs (other than the ones mentioned above) in 0.8.2-beta? If so, it will be great to know now so it will allow us to move forward with the beta release quickly. Thanks, Neha On Wed, Oct 15, 2014 at 4:46 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Hi, We have accumulated an impressive list of pretty major features in 0.8.2 - Delete topic Automated leader rebalancing Controlled shutdown Offset management Parallel recovery min.isr and clean leader election In the past, what has worked for major feature releases is a beta release prior to a final release. I'm proposing we do the same for 0.8.2. The only blockers for 0.8.2-beta, that I know of are - https://issues.apache.org/jira/browse/KAFKA-1493 (Is a major change and requires some thinking about the new dependency. Since it is not fully ready and there are things to think about, I suggest we take it out, think it end to end and then include it in 0.8.3.) https://issues.apache.org/jira/browse/KAFKA-1634 (This has an owner: Guozhang Wang) https://issues.apache.org/jira/browse/KAFKA-1671 (Has a patch and is waiting on a review by Joe Stein) It seems that 1634 and 1671 can get wrapped up in a week. Do people think we should cut 0.8.2-beta by next week? Thanks, Neha
Re: [Kafka-users] Producer not distributing across all partitions
A topic.metadata.refresh.interval.ms of 10 mins means that the producer will take 10 mins to detect new partitions. So newly added or reassigned partitions might not get data for 10 mins. In general, if you're still at prototyping stages, I'd recommend using the new producer available on kafka trunk (org.apache.kafka.clients.producer.KafkaProducer). It has better performance and APIs. On Thu, Oct 16, 2014 at 3:07 AM, Mungeol Heo mungeol@gmail.com wrote: Hi, I have a question about 'topic.metadata.refresh.interval.ms' configuration. As I know, the default value of it is 10 minutes. Does it means that producer will change the partition at every 10 minutes? What I am experiencing is producer does not change to another partition at every 10 minutes. Sometime, It never changed during the process which costs about 25 minutes. I also changed the value of it to 1 minute for testing. It looks like working well at first time. However, same problem happens start from second test. Sometime, it takes more than 10 minutes to change the partition even if I set the value as 1 minute. Am i missing something? Any help will be great. Thanks. - Mungeol
Re: Monitoring connection with kafka client
If you want to know if the Kafka and zookeeper cluster is healthy or not, you'd want to monitor the cluster directly. Here are pointers for monitoring the Kafka brokers - http://kafka.apache.org/documentation.html#monitoring Thanks, Neha On Thu, Oct 16, 2014 at 3:09 AM, Alex Objelean alex.objel...@gmail.com wrote: Hi, I'm trying to monitor the kafka connection on the consumer side. In other words, if the broker cluster is unavailable (or zookeer dies), I would like to know about that problem as soon as possible. Unfortunately, I didn't find anything useful to achieve that when using kafka library. Are there any suggestions about how to fix this issue? Thanks, Alex
Re: getOffsetsBefore(...) = kafka.common.UnknownException
Do you see any errors on the broker? Are you sure that the consumer's fetch offset is set higher than the largest message in your topic? It should be higher than message.max.bytes on the broker (which defaults to 1MB). On Thu, Oct 16, 2014 at 3:56 AM, Magnus Vojbacke magnus.vojba...@digitalroute.com wrote: Hi, I’m trying to make a request for offset information from my broker, and I get a kafka.common.UnknownException as the result. I’m trying to use the Simple Consumer API val topicAndPartition = new TopicAndPartition(“topic3”, 0) val requestInfo = new java.util.HashMap[TopicAndPartition, PartitionOffsetRequestInfo]() requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime, 1)) val request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion, clientName) import kafka.javaapi._ // conn: kafka.javaapi.consumer.SimpleConsumer val response: OffsetResponse = conn.getOffsetsBefore(request) println(got response [ + response + “]) Output: got response [OffsetResponse(0,Map([test3,1] - error: kafka.common.UnknownException offsets: 0))] I really can’t figure out why I’m getting this response. As far as I know, “topic3” with partition “0” exists on the broker, and I can use bin/kafka-console-consumer.sh to consume from it without any problems. Is there any idea of what could cause this exception? As it is right now, I’m not even sure if the request gets to the broker. Is there any way of activating more verbose logs on the broker? I think I’m using a trunk build (2.10-0.8.3-SNAPSHOT) BR /Magnus
Re: Consistency and Availability on Node Failures
Knowing that the partitioning is consistent for a given key means that (apart from other benefits) a given consumer only deals with a partition of the keyspace. So if you are in a system with tens of millions of users each consumer only has to store state on a small number of them with inconsistent partitioning each consumer would have to be able to handle all of the users. This could just be storing a bit of data for each user or something much more complicated. You may not care which consumer a given user ends up on, just that they don't end up on more than one for long periods of time. Christian On Thu, Oct 16, 2014 at 8:20 AM, gshap...@cloudera.com wrote: It may be a minority, I can't tell yet. But in some apps we need to know that a consumer, who is assigned a single partition, will get all data about a subset of users. This is way more flexible than multiple topics since we still have the benefits of partition reassignment, load balancing between consumers, fault protection, etc. — Sent from Mailbox On Thu, Oct 16, 2014 at 9:52 AM, Kyle Banker kyleban...@gmail.com wrote: I didn't realize that anyone used partitions to logically divide a topic. When would that be preferable to simply having a separate topic? Isn't this a minority case? On Thu, Oct 16, 2014 at 7:28 AM, Gwen Shapira gshap...@cloudera.com wrote: Just note that this is not a universal solution. Many use-cases care about which partition you end up writing to since partitions are used to... well, partition logical entities such as customers and users. On Wed, Oct 15, 2014 at 9:03 PM, Jun Rao jun...@gmail.com wrote: Kyle, What you wanted is not supported out of box. You can achieve this using the new java producer. The new java producer allows you to pick an arbitrary partition when sending a message. If you receive NotEnoughReplicasException when sending a message, you can resend it to another partition. Thanks, Jun On Tue, Oct 14, 2014 at 1:51 PM, Kyle Banker kyleban...@gmail.com wrote: Consider a 12-node Kafka cluster with a 200-parition topic having a replication factor of 3. Let's assume, in addition, that we're running Kafka v0.8.2, we've disabled unclean leader election, acks is -1, and min.isr is 2. Now suppose we lose 2 nodes. In this case, there's a good chance that 2/3 replicas of one or more partitions will be unavailable. This means that messages assigned to those partitions will not be writable. If we're writing a large number of messages, I would expect that all producers would eventually halt. It is somewhat surprising that, if we rely on a basic durability setting, the cluster would likely be unavailable even after losing only 2 / 12 nodes. It might be useful in this scenario for the producer to be able to detect which partitions are no longer available and reroute messages that would have hashed to the unavailable partitions (as defined by our acks and min.isr settings). This way, the cluster as a whole would remain available for writes at the cost of a slightly higher load on the remaining machines. Is this limitation accurately described? Is the proposed producer functionality worth pursuing?
Re: [DISCUSS] Release 0.8.2-beta before 0.8.2?
+1 for a 0.8.2-beta next week and 0.8.2.0 final 4-5 weeks later. I agree to the tickets you brought up to have in 0.8.2-beta and also https://issues.apache.org/jira/browse/KAFKA-1493 for lz4 compression. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop / On Oct 16, 2014 12:55 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Another JIRA that will be nice to include as part of 0.8.2-beta is https://issues.apache.org/jira/browse/KAFKA-1481 that fixes the mbean naming. Looking for people's thoughts on 2 things here - 1. How do folks feel about doing a 0.8.2-beta release right now and 0.8.2 final 4-5 weeks later? 2. Do people want to include any JIRAs (other than the ones mentioned above) in 0.8.2-beta? If so, it will be great to know now so it will allow us to move forward with the beta release quickly. Thanks, Neha On Wed, Oct 15, 2014 at 4:46 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Hi, We have accumulated an impressive list of pretty major features in 0.8.2 - Delete topic Automated leader rebalancing Controlled shutdown Offset management Parallel recovery min.isr and clean leader election In the past, what has worked for major feature releases is a beta release prior to a final release. I'm proposing we do the same for 0.8.2. The only blockers for 0.8.2-beta, that I know of are - https://issues.apache.org/jira/browse/KAFKA-1493 (Is a major change and requires some thinking about the new dependency. Since it is not fully ready and there are things to think about, I suggest we take it out, think it end to end and then include it in 0.8.3.) https://issues.apache.org/jira/browse/KAFKA-1634 (This has an owner: Guozhang Wang) https://issues.apache.org/jira/browse/KAFKA-1671 (Has a patch and is waiting on a review by Joe Stein) It seems that 1634 and 1671 can get wrapped up in a week. Do people think we should cut 0.8.2-beta by next week? Thanks, Neha
Re: getOffsetsBefore(...) = kafka.common.UnknownException
The OffsetRequest can only be answered by the leader of the partition. Did you connect the SimpleConsumer to the leader broker? If not, you need to use TopicMetadataRequest to find out the leader broker first. Thanks, Jun On Thu, Oct 16, 2014 at 3:56 AM, Magnus Vojbacke magnus.vojba...@digitalroute.com wrote: Hi, I’m trying to make a request for offset information from my broker, and I get a kafka.common.UnknownException as the result. I’m trying to use the Simple Consumer API val topicAndPartition = new TopicAndPartition(“topic3”, 0) val requestInfo = new java.util.HashMap[TopicAndPartition, PartitionOffsetRequestInfo]() requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime, 1)) val request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion, clientName) import kafka.javaapi._ // conn: kafka.javaapi.consumer.SimpleConsumer val response: OffsetResponse = conn.getOffsetsBefore(request) println(got response [ + response + “]) Output: got response [OffsetResponse(0,Map([test3,1] - error: kafka.common.UnknownException offsets: 0))] I really can’t figure out why I’m getting this response. As far as I know, “topic3” with partition “0” exists on the broker, and I can use bin/kafka-console-consumer.sh to consume from it without any problems. Is there any idea of what could cause this exception? As it is right now, I’m not even sure if the request gets to the broker. Is there any way of activating more verbose logs on the broker? I think I’m using a trunk build (2.10-0.8.3-SNAPSHOT) BR /Magnus
Re: ConsumerOffsetChecker shows none partitions assigned
Which version of ZK are you using? Also, see https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whyaretheremanyrebalancesinmyconsumerlog ? Thanks, Jun On Thu, Oct 16, 2014 at 3:29 PM, Hari Gorak hari.go...@rediffmail.com wrote: Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.0 Environment: HP 40 x Intel(R) Xeon(R) CPU E5-2470 v2 @ 2.40GHz/1.2e+02GB bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker shows some partitions having none consumers after re-balance triggered due to new consumer joined/disconnected to the group. The lag gets piling up till the partitions are assigned to it usually after another re-balance trigger. Is this a known issue and if so what is the plan for fixing this? Are there any measures to ensure that we don't run into this situation? Thanks Hari