Log end offset
Hi, What is the best way for finding out the log end offset for a topic? Currently I am using the SimpleConsumer getLastOffset logic mentioned in: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example But we are running into ClosedChannelException for some of the topics. We use Kafka for offset storage and version 0.8.2.1. What is the ideal way to compute the topic log end offset? -- Regards Vamsi Subhash
Re: Kafka Rebalance on Watcher event Question
Thanks Manikumar for you super fast replies. Let me go through the docs and will raise my questions, if any. Thanks, Dinesh On 11 May 2015 at 11:46, Manikumar Reddy ku...@nmsworks.co.in wrote: All the consumers in the same consumer group will share the load across given topic/partitions. So for any consumer failure, there will be a re-balance to assign the failed topic/partitions to live consumers. pl check consumer documentation here https://kafka.apache.org/documentation.html#introduction On Mon, May 11, 2015 at 11:17 AM, dinesh kumar dinesh...@gmail.com wrote: But why? What is reason for triggering a rebalance if none of the topics of a consumers are affected? Is there some reason for triggering a rebalance irrespective of the consumers topics getting affected ? On 11 May 2015 at 11:06, Manikumar Reddy ku...@nmsworks.co.in wrote: If both C1,C2 belongs to same consumer group, then the re-balance will be triggered. A consumer subscribes to event changes of the consumer id registry within its group. On Mon, May 11, 2015 at 10:55 AM, dinesh kumar dinesh...@gmail.com wrote: Hi, I am looking at the code of kafka.consumer.ZookeeperConsumerConnector.scala (link here https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala ) and I see that all ids registered to a particular group ids are registered to the path /consumers/[group_id]/ids in zookeeper. the ids contain the consumer_id - topics mapping. A watcher is registered in zookeeper that is triggered when there is a change to /consumers/[group_id]/ids. This watcher event is handled by the class ZKRebalancerListener. This class calls a synced rebalance whenever a watcher event is received. So here is my question. 1. Lets consider a scenario where there a two topics T1 and T2 and two consumers C1 and C2. C1 consumes only from T1 and C2 only from T2. Say if C2 dies for some reason as explained before, C1 will get a watcher event from zookeeper and a synced rebalance will be triggered. Why does C2 dying which has absolutely nothing with C1 (there is no intersection of topics between C1 and C2) should trigger a rebalance event in C1. Is there some condition where this is necessary that I am missing? Thanks, Dinesh
Re: Kafka Rebalance on Watcher event Question
All the consumers in the same consumer group will share the load across given topic/partitions. So for any consumer failure, there will be a re-balance to assign the failed topic/partitions to live consumers. pl check consumer documentation here https://kafka.apache.org/documentation.html#introduction On Mon, May 11, 2015 at 11:17 AM, dinesh kumar dinesh...@gmail.com wrote: But why? What is reason for triggering a rebalance if none of the topics of a consumers are affected? Is there some reason for triggering a rebalance irrespective of the consumers topics getting affected ? On 11 May 2015 at 11:06, Manikumar Reddy ku...@nmsworks.co.in wrote: If both C1,C2 belongs to same consumer group, then the re-balance will be triggered. A consumer subscribes to event changes of the consumer id registry within its group. On Mon, May 11, 2015 at 10:55 AM, dinesh kumar dinesh...@gmail.com wrote: Hi, I am looking at the code of kafka.consumer.ZookeeperConsumerConnector.scala (link here https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala ) and I see that all ids registered to a particular group ids are registered to the path /consumers/[group_id]/ids in zookeeper. the ids contain the consumer_id - topics mapping. A watcher is registered in zookeeper that is triggered when there is a change to /consumers/[group_id]/ids. This watcher event is handled by the class ZKRebalancerListener. This class calls a synced rebalance whenever a watcher event is received. So here is my question. 1. Lets consider a scenario where there a two topics T1 and T2 and two consumers C1 and C2. C1 consumes only from T1 and C2 only from T2. Say if C2 dies for some reason as explained before, C1 will get a watcher event from zookeeper and a synced rebalance will be triggered. Why does C2 dying which has absolutely nothing with C1 (there is no intersection of topics between C1 and C2) should trigger a rebalance event in C1. Is there some condition where this is necessary that I am missing? Thanks, Dinesh
Re: Is there a way to know when I've reached the end of a partition (consumed all messages) when using the high-level consumer?
I have used what Gwen has suggested but to avoid false positive: While consuming records keep track of *last* consumed offset and compare with latest offset on broker for consumed topic when you get TimeOut Exception for that particular partition for given topic (e.g JMX Bean *LogEndOffset *for consumed topic for given partition. This works well. In our use case, we were using High Level Consumer for only *single* topic. I hope this helps ! Thanks, Bhavesh On Sun, May 10, 2015 at 2:03 PM, Ewen Cheslack-Postava e...@confluent.io wrote: @Gwen- But that only works for topics that have low enough traffic that you would ever actually hit that timeout. The Confluent schema registry needs to do something similar to make sure it has fully consumed the topic it stores data in so it doesn't serve stale data. We know in our case we'll only have a single producer to the topic (the current leader of the schema registry cluster) so we have a different solution. We produce a message to the topic (which is 1 partition, but this works for a topic partition too), grab the resulting offset from the response, then consume until we see the message we produced. Obviously this isn't ideal since we a) have to produce extra bogus messages to the topic and b) it only works in the case where you know the consumer is also the only producer. The new consumer interface sort of addresses this since it has seek functionality, where one of the options is seekToEnd. However, I think you have to be very careful with this, especially using the current implementation. It seeks to the end, but it also marks those messages as consumed. This means that even if you keep track of your original position and seek back to it, if you use background offset commits you could end up committing incorrect offsets, crashing, and then missing some messages when another consumer claims that partition (or just due to another consumer joining the group). Not sure if there are many other use cases for grabbing the offset data with a simple API. Might mean there's a use case for either some additional API or some utilities independent of an actual consumer instance which allow you to easily query the state of topics/partitions. On Sun, May 10, 2015 at 12:43 AM, Gwen Shapira gshap...@cloudera.com wrote: For Flume, we use the timeout configuration and catch the exception, with the assumption that no messages for few seconds == the end. On Sat, May 9, 2015 at 2:04 AM, James Cheng jch...@tivo.com wrote: Hi, I want to use the high level consumer to read all partitions for a topic, and know when I have reached the end. I know the end might be a little vague, since items keep showing up, but I'm trying to get as close as possible. I know that more messages might show up later, but I want to know when I've received all the items that are currently available in the topic. Is there a standard/recommended way to do this? I know one way to do it is to first issue an OffsetRequest for each partition, which would get me the last offset, and then use that information in my high level consumer to detect when I've reached that a message with that offset. Which is exactly what the SimpleConsumer example does ( https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example ). That involves finding the leader for the partition, etc etc. Not hard, but a bunch of steps. I noticed that kafkacat has an option similar to what I'm looking for: -e Exit successfully when last message received Looking at the code, it appears that a FetchRequest returns the HighwaterMarkOffset mark for a partition, and the API docs confirm that: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchResponse Does the Java high-level consumer expose the HighwaterMarkOffset in any way? I looked but I couldn't find such a thing. Thanks, -James -- Thanks, Ewen
Issue with kafka-topics.sh tool for adding new partitions with replica assignment
Hi, with Kafka 0.8 it is possible to add new partitions on newly added brokers and supply a partition assignment to put the new partitions mainly on the new brokers (4 and 5 are the new brokers): bin/kafka-add-partitions.sh --topic scale-test-001 \ --partition 14 \ --replica-assignment-list 4:5,4:1,4:2,4:3,4:5,4:1,4:2,5:3,5:4,5:1,5:2,5:3,5:4,5:1 \ --zookeeper 127.0.0.1:2181 For 0.8.1+ the kafka-add-partitions.sh tool was merged into kafka-topics.sh, but when you try to execute something similar you receive the following error (in Kafka 0.8.2.1): kafka_2.10-0.8.2.1$ bin/kafka-topics.sh --alter --topic scale-test-002 \ --zookeeper 127.0.0.1:2181 \ --partitions 35 \ --replica-assignment 2:3,3:2,2:3,3:2,2:3,3:2,2:3,3:2,2:3,3:2,2:3,3:2,2:3,3:2,2:3,3:2,2:3,3:2,2:3,3:2,2:3,4:5,4:2,4:2,4:3,4:5,4:3,4:2,5:3,5:4,5:4,5:2,5:3,5:4,5:3 Option [replica-assignment] can't be used with option[partitions] However, upon investigation of alterTopics in TopicCommand.scala the code it wants to execute is: val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue val replicaAssignmentStr = opts.options.valueOf(opts.replicaAssignmentOpt) AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr, config = configs) So assigning both the --partitions and the --replica-assignment parameters should be totally fine. The issue is with the following line in checkArgs: CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, allTopicLevelOpts -- Set(alterOpt, createOpt) + partitionsOpt + replicationFactorOpt) If it is removed, then the above command executes just fine. The created partitions are as well filled quite happily. I'm not fully sure what the correct configuration of the replicaAssignmentOpt should be, so I don't provide a patch, but it would be great if that could be fixed. Best regards, Stefan -- *Dr. Stefan Schadwinkel* Senior Big Data Developer stefan.schadwin...@smaato.com Smaato Inc. San Francisco – New York - Hamburg - Singapore www.smaato.com Germany: Valentinskamp 70, Emporio, 19th Floor 20355 Hamburg T +49 (40) 3480 949 0 F +49 (40) 492 19 055 The information contained in this communication may be CONFIDENTIAL and is intended only for the use of the recipient(s) named above. If you are not the intended recipient, you are hereby notified that any dissemination, distribution, or copying of this communication, or any of its contents, is strictly prohibited. If you have received this communication in error, please notify the sender and delete/destroy the original message and any copy of it from your computer or paper files.
Re: circuit breaker for producer
Hey Guozhang, Thanks for the heads up! Best On Thu, May 7, 2015 at 1:26 AM, Guozhang Wang wangg...@gmail.com wrote: The metrics for checking that would better be buffer-available-bytes instead of bufferpool-wait-ratio, checking on its value approaching 0. Guozhang On Wed, May 6, 2015 at 3:02 AM, mete efk...@gmail.com wrote: Hey Guozhang, I could go with both of the options, eventually i want to detect if there is a problem and isolate it from the rest of the system. And i am trying to decide what would be the appropriate metrics to do that. Best On Wed, May 6, 2015 at 6:35 AM, Guozhang Wang wangg...@gmail.com wrote: 1. KAFKA-1955 https://issues.apache.org/jira/browse/KAFKA-1955, I think Jay has a WIP patch for it. 2. 3. On Tue, May 5, 2015 at 5:10 PM, Jason Rosenberg j...@squareup.com wrote: Guozhang, Do you have the ticket number for possibly adding in local log file failover? Is it actively being worked on? Thanks, Jason On Tue, May 5, 2015 at 6:11 PM, Guozhang Wang wangg...@gmail.com wrote: Does this log file acts as a temporary disk buffer when broker slows down, whose data will be re-send to broker later, or do you plan to use it as a separate persistent storage as Kafka brokers? For the former use case, I think there is an open ticket for integrating this kind of functionality into producer; for the latter use case, you may want to do this traffic control out of Kafka producer, i.e. upon detecting producer buffer full, do not call send() on it for a while but write to a different file, etc. Guozhang On Tue, May 5, 2015 at 11:28 AM, mete efk...@gmail.com wrote: Sure, i kind of count on that actually, i guess with this setting the sender blocks on allocate method and this bufferpool-wait-ratio increases. I want to fully compartmentalize the kafka producer from the rest of the system. Ex: writing to a log file instead of trying to send to kafka when some metric in the producer indicates that there is a performance degradation or some other problem. I was wondering what would be the ideal way of deciding that? On Tue, May 5, 2015 at 6:32 PM, Jay Kreps jay.kr...@gmail.com wrote: Does block.on.buffer.full=false do what you want? -Jay On Tue, May 5, 2015 at 1:59 AM, mete efk...@gmail.com wrote: Hello Folks, I was looking through the kafka.producer metrics on the JMX interface, to find a good indicator when to trip the circuit. So far it seems like the bufferpool-wait-ratio metric is a useful decision mechanism when to cut off the production to kafka. As far as i experienced, when kafka server slow for some reason, requests start piling up on the producer queue and if you are not willing to drop any messages on the producer, send method starts blocking because of the slow responsiveness. So this buffer pool wait ratio starts going up from 0.x up to 1.0. And i am thinking about tripping the circuit breaker using this metric, ex: if wait-ratio 0.90 etc... What do you think? Do you think there would be a better indicator to check the health overall? Best Mete -- -- Guozhang -- -- Guozhang -- -- Guozhang
Re: Kafka Client in Rust
Thanks Gwen/Ewen. I have posted to kafka-clients google group too. On Mon, May 11, 2015 at 1:40 PM Gwen Shapira gshap...@cloudera.com wrote: You may want to announce this at kafka-clie...@googlegroups.com, a mailing list specifically for Kafka clients. I'm sure they'll be thrilled to hear about it. It is also a good place for questions on client development, if you ever need help. On Mon, May 11, 2015 at 4:57 AM, Yousuf Fauzan yousuffau...@gmail.com wrote: Hi All, I have create Kafka client for Rust. The client supports Metadata, Produce, Fetch, and Offset requests. I plan to add support of Consumers and Offset management soon. Will it be possible to get it added to https://cwiki.apache.org/confluence/display/KAFKA/Clients Info: Pure Rust implementation with support for Metadata, Produce, Fetch, and Offset requests. Supports Gzip and Snappy compression Maintainer: Yousuf Fauzan (http://fauzism.co) Licence: MIT code: https://github.com/spicavigo/kafka-rust doc: http://fauzism.co/rustdoc/kafka/index.html -- Yousuf Fauzan
Kafka log compression change in 0.8.2.1?
After a recent 0.8.2.1 upgrade we noticed a significant increase in used filesystem space for our Kafka log data. We have another Kafka cluster still on 0.8.1.1 whose Kafka data is being copied over to the upgraded cluster, and it is clear that the disk consumption is higher on 0.8.2.1 for the same message data. The log retention config for the two clusters is the same also. We ran some tests to figure out what was happening, and it appears that in 0.8.2.1 the Kafka brokers re-compress each message individually (we’re using Snappy), while in 0.8.1.1 they applied the compression across an entire batch of messages written to the log. For producers sending large batches of small similar messages, the difference can be quite substantial (in our case, it looks like a little over 2x). Is this a bug, or the expected new behavior? thanks, Andrew CONFIDENTIALITY NOTICE This message and any included attachments are from Cerner Corporation and are intended only for the addressee. The information contained in this message is confidential and may constitute inside or non-public information under international, federal, or state securities laws. Unauthorized forwarding, printing, copying, distribution, or use of such information is strictly prohibited and may be unlawful. If you are not the addressee, please promptly delete this message and notify the sender of the delivery error by e-mail or you may call Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024.
Re: Kafka consumer offset checker hangs indefinitely
Hi Mayuresh, A small update. The Kafka version I'm currently using is 2.10-0.8.2.1 (not 2.11 as previously mentioned). The cluster looks fine. Not sure why the consumer offset checker does not return a valid output and gets stuck. bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test Topic:test PartitionCount:3ReplicationFactor:3 Configs:min.insync.replicas=2 Topic: test Partition: 0Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: test Partition: 1Leader: 2 Replicas: 2,0,1 Isr: 1,2,0 Topic: test Partition: 2Leader: 0 Replicas: 0,1,2 Isr: 1,2,0 On Fri, May 8, 2015 at 12:52 PM, Meghana Narasimhan mnarasim...@bandwidth.com wrote: Hi Mayuresh, Yes, the broker is up and accepting connections. Multiple consumers are consuming off topics on the broker. Also I am seeing the issue only with this particular version ( 2.11-0.8.2.1). It worked fine with the beta that I was using earlier. On Fri, May 8, 2015 at 12:45 PM, Mayuresh Gharat gharatmayures...@gmail.com wrote: Is X.X.X.X:9092 up and accepting connections? I am confused aas in why is it not connecting some other broker if connection to this broker fails. Can you check if the broker is up? The way it works is the consumer will send a ConsumerMetadataRequest to one of the brokers and get the offsetmanager for its group and then perform the offset management. Thanks, Mayuresh On Fri, May 8, 2015 at 9:22 AM, Meghana Narasimhan mnarasim...@bandwidth.com wrote: Hi, I'm using the Kafka 8.2.1 version(kafka_2.11-0.8.2.1) and the consumer offset checker hangs indefinitely and does not return any results. I enabled the debug for tools and below is the debug statements as seen on the stdout. Any thoughts or inputs on this will be much appreciated. command used : bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --group test-consumer-group or ./kafka-consumer-offset-checker.sh --zookeeper broker1:2181,broker2:2181,broker3:2181 --group test-consumer-group DEBUG Querying X.X.X.X:9092 to locate offset manager for test-consumer-group. (kafka.client.ClientUtils$) [2015-05-08 10:23:55,090] DEBUG Consumer metadata response: ConsumerMetadataResponse(None,15,0) (kafka.client.ClientUtils$) [2015-05-08 10:23:55,091] DEBUG Query to X.X.X.X:9092 to locate offset manager for test-consumer-group failed - will retry in 3000 milliseconds. (kafka.client.ClientUtils$) [2015-05-08 10:23:58,093] DEBUG Querying X.X.X.X:9092 to locate offset manager for test-consumer-group. (kafka.client.ClientUtils$) [2015-05-08 10:23:58,102] DEBUG Consumer metadata response: ConsumerMetadataResponse(None,15,0) (kafka.client.ClientUtils$) [2015-05-08 10:23:58,103] DEBUG Query to X.X.X.X:9092 to locate offset manager for test-consumer-group failed - will retry in 3000 milliseconds. (kafka.client.ClientUtils$) [2015-05-08 10:24:01,107] DEBUG Querying X.X.X.X:9092 to locate offset manager for test-consumer-group. (kafka.client.ClientUtils$) [2015-05-08 10:24:01,115] DEBUG Consumer metadata response: ConsumerMetadataResponse(None,15,0) (kafka.client.ClientUtils$) [2015-05-08 10:24:01,116] DEBUG Query to X.X.X.X:9092 to locate offset manager for test-consumer-group failed - will retry in 3000 milliseconds. (kafka.client.ClientUtils$) [2015-05-08 10:24:04,119] DEBUG Querying X.X.X.X:9092 to locate offset manager for test-consumer-group. (kafka.client.ClientUtils$) [2015-05-08 10:24:04,124] DEBUG Consumer metadata response: ConsumerMetadataResponse(None,15,0) (kafka.client.ClientUtils$) [2015-05-08 10:24:04,126] DEBUG Query to X.X.X.X:9092 to locate offset manager for test-consumer-group failed - will retry in 3000 milliseconds. (kafka.client.ClientUtils$) [2015-05-08 10:24:04,993] DEBUG Got ping response for sessionid: 0x14d33e7fbc80002 after 3ms (org.apache.zookeeper.ClientCnxn) [2015-05-08 10:24:07,127] DEBUG Querying X.X.X.X:9092 to locate offset manager for test-consumer-group. (kafka.client.ClientUtils$) [2015-05-08 10:24:07,131] DEBUG Consumer metadata response: ConsumerMetadataResponse(None,15,0) (kafka.client.ClientUtils$) [2015-05-08 10:24:07,132] DEBUG Query to X.X.X.X:9092 to locate offset manager for test-consumer-group failed - will retry in 3000 milliseconds. (kafka.client.ClientUtils$) [2015-05-08 10:24:10,132] DEBUG Querying X.X.X.X:9092 to locate offset manager for test-consumer-group. (kafka.client.ClientUtils$) [2015-05-08 10:24:10,138] DEBUG Consumer metadata response: ConsumerMetadataResponse(None,15,0) (kafka.client.ClientUtils$) [2015-05-08 10:24:10,139] DEBUG Query to X.X.X.X:9092 to locate offset manager for test-consumer-group failed - will retry in 3000 milliseconds. (kafka.client.ClientUtils$) [2015-05-08 10:24:13,143] DEBUG Querying X.X.X.X:9092 to locate offset manager for test-consumer-group. (kafka.client.ClientUtils$) [2015-05-08
Kafka listener threads - graceful shutdown
I am using the following code to help kafka stream listener threads to exit out of the blocking call of hasNext() on the consumerIterator. But the threads never exit, when they receive allDone() signal. I am not sure whether I am making any mistake. Please let me know is this right approach. public void stop() throws InterruptedException { for (ConsumerIteratorbyte[], byte[] consumer : consumerIterators) { consumer.allDone(); } shutdown(); } Thanks Regards,
Re: Kafka consumer offset checker hangs indefinitely
Hi Meghana, Let me try this out on my cluster that has latest trunk deployed. Thanks, Mayuresh On Mon, May 11, 2015 at 1:53 PM, Meghana Narasimhan mnarasim...@bandwidth.com wrote: Hi Mayuresh, A small update. The Kafka version I'm currently using is 2.10-0.8.2.1 (not 2.11 as previously mentioned). The cluster looks fine. Not sure why the consumer offset checker does not return a valid output and gets stuck. bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test Topic:test PartitionCount:3ReplicationFactor:3 Configs:min.insync.replicas=2 Topic: test Partition: 0Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: test Partition: 1Leader: 2 Replicas: 2,0,1 Isr: 1,2,0 Topic: test Partition: 2Leader: 0 Replicas: 0,1,2 Isr: 1,2,0 On Fri, May 8, 2015 at 12:52 PM, Meghana Narasimhan mnarasim...@bandwidth.com wrote: Hi Mayuresh, Yes, the broker is up and accepting connections. Multiple consumers are consuming off topics on the broker. Also I am seeing the issue only with this particular version ( 2.11-0.8.2.1). It worked fine with the beta that I was using earlier. On Fri, May 8, 2015 at 12:45 PM, Mayuresh Gharat gharatmayures...@gmail.com wrote: Is X.X.X.X:9092 up and accepting connections? I am confused aas in why is it not connecting some other broker if connection to this broker fails. Can you check if the broker is up? The way it works is the consumer will send a ConsumerMetadataRequest to one of the brokers and get the offsetmanager for its group and then perform the offset management. Thanks, Mayuresh On Fri, May 8, 2015 at 9:22 AM, Meghana Narasimhan mnarasim...@bandwidth.com wrote: Hi, I'm using the Kafka 8.2.1 version(kafka_2.11-0.8.2.1) and the consumer offset checker hangs indefinitely and does not return any results. I enabled the debug for tools and below is the debug statements as seen on the stdout. Any thoughts or inputs on this will be much appreciated. command used : bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --group test-consumer-group or ./kafka-consumer-offset-checker.sh --zookeeper broker1:2181,broker2:2181,broker3:2181 --group test-consumer-group DEBUG Querying X.X.X.X:9092 to locate offset manager for test-consumer-group. (kafka.client.ClientUtils$) [2015-05-08 10:23:55,090] DEBUG Consumer metadata response: ConsumerMetadataResponse(None,15,0) (kafka.client.ClientUtils$) [2015-05-08 10:23:55,091] DEBUG Query to X.X.X.X:9092 to locate offset manager for test-consumer-group failed - will retry in 3000 milliseconds. (kafka.client.ClientUtils$) [2015-05-08 10:23:58,093] DEBUG Querying X.X.X.X:9092 to locate offset manager for test-consumer-group. (kafka.client.ClientUtils$) [2015-05-08 10:23:58,102] DEBUG Consumer metadata response: ConsumerMetadataResponse(None,15,0) (kafka.client.ClientUtils$) [2015-05-08 10:23:58,103] DEBUG Query to X.X.X.X:9092 to locate offset manager for test-consumer-group failed - will retry in 3000 milliseconds. (kafka.client.ClientUtils$) [2015-05-08 10:24:01,107] DEBUG Querying X.X.X.X:9092 to locate offset manager for test-consumer-group. (kafka.client.ClientUtils$) [2015-05-08 10:24:01,115] DEBUG Consumer metadata response: ConsumerMetadataResponse(None,15,0) (kafka.client.ClientUtils$) [2015-05-08 10:24:01,116] DEBUG Query to X.X.X.X:9092 to locate offset manager for test-consumer-group failed - will retry in 3000 milliseconds. (kafka.client.ClientUtils$) [2015-05-08 10:24:04,119] DEBUG Querying X.X.X.X:9092 to locate offset manager for test-consumer-group. (kafka.client.ClientUtils$) [2015-05-08 10:24:04,124] DEBUG Consumer metadata response: ConsumerMetadataResponse(None,15,0) (kafka.client.ClientUtils$) [2015-05-08 10:24:04,126] DEBUG Query to X.X.X.X:9092 to locate offset manager for test-consumer-group failed - will retry in 3000 milliseconds. (kafka.client.ClientUtils$) [2015-05-08 10:24:04,993] DEBUG Got ping response for sessionid: 0x14d33e7fbc80002 after 3ms (org.apache.zookeeper.ClientCnxn) [2015-05-08 10:24:07,127] DEBUG Querying X.X.X.X:9092 to locate offset manager for test-consumer-group. (kafka.client.ClientUtils$) [2015-05-08 10:24:07,131] DEBUG Consumer metadata response: ConsumerMetadataResponse(None,15,0) (kafka.client.ClientUtils$) [2015-05-08 10:24:07,132] DEBUG Query to X.X.X.X:9092 to locate offset manager for test-consumer-group failed - will retry in 3000 milliseconds. (kafka.client.ClientUtils$) [2015-05-08 10:24:10,132] DEBUG Querying X.X.X.X:9092 to locate offset manager for test-consumer-group. (kafka.client.ClientUtils$) [2015-05-08 10:24:10,138] DEBUG Consumer metadata response: ConsumerMetadataResponse(None,15,0) (kafka.client.ClientUtils$) [2015-05-08 10:24:10,139] DEBUG
Isr reassignment issue after a dead broker
Hi Kafka-Users, We have been using kafka 2.8.0-0.8.1.1 in our cluster of 21 brokers with a replication factor of 2. When one of the broker underwent a complete shutdown, the partitions of a topic that had an in-sync-replica in the broker that died is not able to create a new Isr in a healthy node. We tried restarting the kafka processes of all the brokers, but the Replica is still attached to the dead broker which is no longer part of the kafka cluster. Even zookeeper removed the broker id at /brokers/ids path. As for the partitions whose leader used to be the broker that went down, kafka is able to replace the leader with a healthy broker (according to --describe), but the producer is not able to send data to those partitions. Due to these errors, our kafka cluster brought down the other components that are producing. Below is the error on the producer side. 16 and 35 are partitions whose leader used to be 19 (the dead broker) and 5 (healthy broker) became the leader of those partitions, but producers still cannot send data to those partitions. [WARN] Failed to send producer request with correlation id 680754045 to broker 5 with data for partitions [Topic,16],[Topic,35]java.io.IOException: Broken pipe at sun.nio.ch.FileDispatcherImpl.writev0(Native Method) ~[na:1.7.0_60] at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51) ~[na:1.7.0_60] at sun.nio.ch.IOUtil.write(IOUtil.java:148) ~[na:1.7.0_60] at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:524) ~[na:1.7.0_60] at java.nio.channels.SocketChannel.write(SocketChannel.java:493) ~[na:1.7.0_60] at kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56) ~[stormjar.jar:na] at kafka.network.Send$class.writeCompletely(Transmission.scala:75) ~[stormjar.jar:na] at kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26) ~[stormjar.jar:na] --Describe of the partitions with the issue: Topic: Topic Partition: 16 Leader: 5 Replicas: 5,19 Isr: 5 Topic: Topic Partition: 35 Leader: 5 Replicas: 19,5 Isr: 5 Thanks,Chaitanya GSK
Kafka integration with Hadoop
Hi All, How to integrate Kafka with Hadoop ecosystem. How to store Kafka messages into HDFS in parquet format Regards Raj
Re: Kafka integration with Hadoop
You could start by looking at Linkedin's Camus and go from there? On Mon, May 11, 2015 at 8:10 PM, Rajesh Datla rajeshdatla2...@gmail.com wrote: Hi All, How to integrate Kafka with Hadoop ecosystem. How to store Kafka messages into HDFS in parquet format Regards Raj
RE: Kafka listener threads - graceful shutdown
I don't think call allDone will cause hasNext() to exit. The new consumer has a timed poll() function on it's API I think. With the old consumer, interrupting the thread calling hasNext might work. Have you tried that? Aditya From: Gomathivinayagam Muthuvinayagam [sankarm...@gmail.com] Sent: Monday, May 11, 2015 6:26 PM To: users@kafka.apache.org Subject: Kafka listener threads - graceful shutdown I am using the following code to help kafka stream listener threads to exit out of the blocking call of hasNext() on the consumerIterator. But the threads never exit, when they receive allDone() signal. I am not sure whether I am making any mistake. Please let me know is this right approach. public void stop() throws InterruptedException { for (ConsumerIteratorbyte[], byte[] consumer : consumerIterators) { consumer.allDone(); } shutdown(); } Thanks Regards,
Re: Is there a way to know when I've reached the end of a partition (consumed all messages) when using the high-level consumer?
Thanks everyone. To answer Charlie's question: I'm doing some simple stream processing. I have Topics A,B, and C, all using log compaction and all recordings having primary keys. The data in Topic A is essentially a routing table that tells me which primary keys in Topics B and C I should pay attention to. So before I start consuming B and C, I need to have all/most of Topic A loaded into a local routing table. As Topic A is updated, then I will continue to update my routing table, and use it to continually process events coming from B and C. Hope that makes sense. All of the solutions look good. Will, that patch does exactly what I want, but I'm not sure I want to patch Kafka right now. I'll keep it in mind. Thanks. -James On May 9, 2015, at 10:42 AM, Charlie Knudsen charlie.knud...@smartthings.com wrote: Hi James, What are you trying to do exactly? If all you are trying to do is monitor how far behind a consumer is getting you could use the ConsumerOffsetChecker. As described in the link below. http://community.spiceworks.com/how_to/77610-how-far-behind-is-your-kafka-consumer Each message being processed will also have the offset and partition attached to it so with that data. I suppose that information plus info from a fetch response you could determine this with in an application. https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchResponse Does that help? On Fri, May 8, 2015 at 6:04 PM, James Cheng jch...@tivo.com wrote: Hi, I want to use the high level consumer to read all partitions for a topic, and know when I have reached the end. I know the end might be a little vague, since items keep showing up, but I'm trying to get as close as possible. I know that more messages might show up later, but I want to know when I've received all the items that are currently available in the topic. Is there a standard/recommended way to do this? I know one way to do it is to first issue an OffsetRequest for each partition, which would get me the last offset, and then use that information in my high level consumer to detect when I've reached that a message with that offset. Which is exactly what the SimpleConsumer example does ( https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example). That involves finding the leader for the partition, etc etc. Not hard, but a bunch of steps. I noticed that kafkacat has an option similar to what I'm looking for: -e Exit successfully when last message received Looking at the code, it appears that a FetchRequest returns the HighwaterMarkOffset mark for a partition, and the API docs confirm that: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchResponse Does the Java high-level consumer expose the HighwaterMarkOffset in any way? I looked but I couldn't find such a thing. Thanks, -James
Re: Log end offset
Vamsi, There is another thread going on right now about this exact topic: Is there a way to know when I've reached the end of a partition (consumed all messages) when using the high-level consumer? http://search-hadoop.com/m/uyzND1Eb3e42NMCWl -James On May 10, 2015, at 11:48 PM, Achanta Vamsi Subhash achanta.va...@flipkart.com wrote: Hi, What is the best way for finding out the log end offset for a topic? Currently I am using the SimpleConsumer getLastOffset logic mentioned in: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example But we are running into ClosedChannelException for some of the topics. We use Kafka for offset storage and version 0.8.2.1. What is the ideal way to compute the topic log end offset? -- Regards Vamsi Subhash