Re: FW: Mirrormaker stops consuming
Debbie, In our case, the issue was with custom SSL implementation and we need to fix a bug related to buffer over flow handling. This may not be applicable to you, if you are using open source kafka. The best way to troubleshoot this problem would be to take couple of thread dumps with few seconds delay and look at stack trace. This will help you identify the code block that it is spinning through when it is stuck. Thanks, Raja. On Fri, Feb 5, 2016 at 10:28 PM, Yu, Debbiewrote: > Hi Raja, > > I’m not used to using a mailing list, so I thought I’d try sending to you > directly instead. > We were wondering if you had fixed the problem you saw before with the > mirror maker, > Since we seem to be seeing the same problem now. > The difference is we’re not using the custom SSLSocketChannel discussed in > the thread. > Any kind of info you could share would be much appreciated. > > Thanks, > Debbie > > On 2/5/16, 7:17 PM, "Yu, Debbie" wrote: > > >Hi Raja, > > > >We seem to be encountering the same problem you were seeing where our > >producer thread becomes blocked for some reason. > >We also see our producer queue is full, > >and for some reason, the producer isn¹t pulling from the queue and sending > >to our brokers. > >We were wondering if you might be able to share how you fixed your problem > >if you fixed it. > > > >Thanks, > >Debbie > > > > -- Thanks, Raja.
Does Kafka recover all data if node is reimaged
I was wondering if a kafka broker node get reimaged and all data is wiped off, Will kafka recover all data on node from replication? -- Thanks, Raja.
Re: Painfully slow kafka recovery / cluster breaking
Thanks for updates Jörg. It's very useful. Thanks, Raja. On Wed, Aug 26, 2015 at 8:58 AM, Jörg Wagner joerg.wagn...@1und1.de wrote: Just a little feedback on our issue(s) as FYI to whoever is interested. It basically all boiled down to the configuration of topics. We noticed while performance testing (or trying to ;) ) that the partitioning was most critical to us. We originally followed the linkedin recommendation and used 600 partitions for our main topic. Testing that, the replicas always went out of sync within a short timeframe, leaders could not be determined and the cluster failed horribly (even writing several hundred lines of logs within a 1/100th second). So for our 27 log.dirs (= disks) we went with 27 partitions. And voilá: we could use kafka with around 35k requests per second (via an application accessing it). Kafka stayed stable. Currently we are testing with 81 partitions (27*3) and it's running well. No issues so far, replicas in sync and up to 50k requests per second. Cheers On 25.08.2015 15:18, Jörg Wagner wrote: So okay, this is a little embarassing but the core of the issue was that max open files was not set correctly for kafka. It was not an oversight, but a few things together caused that the system configuration was not changed correctly, resulting in the default value. No wonder that kafka behaved strangely everytime we had enough data in log.dirs and connections. Anyhow, that doesn't seem to be the last problem. The brokers get in sync with each other (within an expected time frame), everything seems fine. After a little stress testing, the kafka cluster falls apart (around 40k requests/s). Using topics describe we can see leaders missing (e.g. from 1,2,3 only 1 and 3 are leading partitions, although zookeeper lists all under /brokers/ids). This ultimately results in partitions being unavailable and massive leader not local spam in the logs. What are we missing? Cheers Jörg On 24.08.2015 10:31, Jörg Wagner wrote: Thank you for your answers. @Raja No, it also seems to happen if we stop kafka completely clean. @Gwen I was testing the situation with num.replica.fetchers set higher. If you say that was the right direction, I will try it again. What would be a good setting? I went with 50 which seemed reasonable (having 27 single disks). How long should it take to get complete ISR? Regarding no Data flowing into kafka: I just wanted to point out that the setup is not yet live. So we can completely stop the usage of kafka, and it should possibly get into sync faster without a steady stream of new messages. Kafka itself is working fine during this on the other hand, just missing ISR, hence redundancy. If I stop another broker (clean!) though, it tends to happen that the expected number of partitions have Leader -1; which should not happen as I assume. Cheers Jörg On 21.08.2015 19:18, Rajasekar Elango wrote: We are seeing same behavior in 5 broker cluster when losing one broker. In our case, we are losing broker as well as kafka data dir. Jörg Wagner, Are you losing just broker or kafka data dir as well? Gwen, We have also observed that latency of messages arriving at consumers goes up by 10x when we lose a broker. Is it because the broker is busy with handling failed fetch requests and loaded with more data thats slowing down the writes ? Also, if we had simply lost the broker not the data dir, impact would have been minimal? Thanks, Raja. On Fri, Aug 21, 2015 at 12:31 PM, Gwen Shapira g...@confluent.io wrote: By default, num.replica.fetchers = 1. This means only one thread per broker is fetching data from leaders. This means it make take a while for the recovering machine to catch up and rejoin the ISR. If you have bandwidth to spare, try increasing this value. Regarding no data flowing into kafka - If you have 3 replicas and only one is down, I'd expect writes to continue to the new leader even if one replica is not in the ISR yet. Can you see that a new leader is elected? Gwen On Fri, Aug 21, 2015 at 6:50 AM, Jörg Wagner joerg.wagn...@1und1.de wrote: Hey everyone, here's my crosspost from irc. Our setup: 3 kafka 0.8.2 brokers with zookeeper, powerful hardware (20 cores, 27 logdisks each). We use a handful of topics, but only one topic is utilized heavily. It features a replication of 2 and 600 partitions. Our issue: If one kafka was down, it takes very long ( from 1 to 10 hours) to show that all partitions have all isr again. This seems to heavily depend on the amount of data which is in the log.dirs (I have configured 27 threads - one for each dir featuring a own drive). This all takes this long while there is NO data flowing into kafka. We seem to be missing something critical here. It might be some option set wrong, or are we thinking wrong and it's not critical to have the replicas in sync. Any pointers would be great
Re: Painfully slow kafka recovery
We are seeing same behavior in 5 broker cluster when losing one broker. In our case, we are losing broker as well as kafka data dir. Jörg Wagner, Are you losing just broker or kafka data dir as well? Gwen, We have also observed that latency of messages arriving at consumers goes up by 10x when we lose a broker. Is it because the broker is busy with handling failed fetch requests and loaded with more data thats slowing down the writes ? Also, if we had simply lost the broker not the data dir, impact would have been minimal? Thanks, Raja. On Fri, Aug 21, 2015 at 12:31 PM, Gwen Shapira g...@confluent.io wrote: By default, num.replica.fetchers = 1. This means only one thread per broker is fetching data from leaders. This means it make take a while for the recovering machine to catch up and rejoin the ISR. If you have bandwidth to spare, try increasing this value. Regarding no data flowing into kafka - If you have 3 replicas and only one is down, I'd expect writes to continue to the new leader even if one replica is not in the ISR yet. Can you see that a new leader is elected? Gwen On Fri, Aug 21, 2015 at 6:50 AM, Jörg Wagner joerg.wagn...@1und1.de wrote: Hey everyone, here's my crosspost from irc. Our setup: 3 kafka 0.8.2 brokers with zookeeper, powerful hardware (20 cores, 27 logdisks each). We use a handful of topics, but only one topic is utilized heavily. It features a replication of 2 and 600 partitions. Our issue: If one kafka was down, it takes very long ( from 1 to 10 hours) to show that all partitions have all isr again. This seems to heavily depend on the amount of data which is in the log.dirs (I have configured 27 threads - one for each dir featuring a own drive). This all takes this long while there is NO data flowing into kafka. We seem to be missing something critical here. It might be some option set wrong, or are we thinking wrong and it's not critical to have the replicas in sync. Any pointers would be great. Cheers Jörg -- Thanks, Raja.
Re: java.lang.OutOfMemoryError: Java heap space
Hi Sourabh, We have seen this error, if kafka broker was running with SSL on Consumer is trying to consumer in plaintext mode. Are you using high level consumer or SimpleConsumer..? If you using using SimpleConsumer, pull latest code from my repo https://github.com/relango/kafka/commits/kafka_security_0.8.2 and pass secure parameters to SimpleConsumer constructor. Thanks, Raja. On Thu, Aug 6, 2015 at 9:01 PM, Sourabh Chandak sourabh3...@gmail.com wrote: Hi, I am trying to integrate https://github.com/relango/kafka/tree/kafka_security_0.8.2 with Spark Streaming using the SimpleConsumer. I know that the SSL patch is on its way but need to set up a prototype hence went ahead with Raja's version. So when I run my spark job to retrieve data from 1 topic with just 1 partition I get a OutOfMemoryError. Here is the stack trace: Exception in thread main java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) at kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:91) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:80) at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:103) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:126) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:125) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:346) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at org.apache.spark.streaming.kafka.KafkaCluster.org $apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342) at org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125) at org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:310) Need help from experts to resolve this. Thanks, Sourabh -- Thanks, Raja.
Re: How to read in batch using HighLevel Consumer?
Here is an example on what sharninder suggested http://ingest.tips/2014/10/12/kafka-high-level-consumer-frequently-missing-pieces/ Thanks, Raja. On Tue, Aug 4, 2015 at 12:01 PM, Sharninder sharnin...@gmail.com wrote: You can't. Kafka is essentially a queue, so you always read messages one by one. What you can do is disable auto offset commit, read 100 messages, process them and then manually commit offset. -- Sharninder On 04-Aug-2015, at 9:07 pm, shahab shahab.mok...@gmail.com wrote: Hi, While we the producer can put data as batch in kafka server, I couldn't find any API (or any document) saying how we can fetch data as batch from Kafka ? Even when data is placed as batch in kafka server, still using High Level consumer I can only read one by one, and I can not specify. for example, read 100 items at once! Is this correct observation? or I am missing something? best, /Shahab -- Thanks, Raja.
Reimaging zookeeper host
We are running 3-node zookeeper cluster and we need to re-image (re-install os) on zookeeper host. Is it ok to lose zookeeper dataDir during upgrade or should back up zookeeper dataDir and restore when zookeeper comes backup online? Will kafka and consumers work fine if we bring up zookeeper with emtpy dataDir with just myid file? -- Thanks, Raja.
Re: how to modify offsets stored in Kafka in 0.8.2.1 version?
Hi Marina, Check slide 32 in this presentation http://www.slideshare.net/jjkoshy/offset-management-in-kafka. Hope this helps. Thanks, Raja. On Fri, Jun 19, 2015 at 9:43 AM, Marina ppi...@yahoo.com.invalid wrote: Thanks, Stevo, for the quick reply, Yes, I understand how to do this programmatically - but I would like to be able to do this manually from a command line, just as before I was able to do this in the Zookeeper shell. I don't want to write and run a Java app just to set an offset :) [unless, of course, this is the only way to do this.] thanks! Marina - Original Message - From: Stevo Slavić ssla...@gmail.com To: users@kafka.apache.org; Marina ppi...@yahoo.com Cc: Sent: Friday, June 19, 2015 9:33 AM Subject: Re: how to modify offsets stored in Kafka in 0.8.2.1 version? Hello Marina, There's Kafka API to fetch and commit offsets https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka - maybe it will work for you. Kind regards, Stevo Slavic. On Fri, Jun 19, 2015 at 3:23 PM, Marina ppi...@yahoo.com.invalid wrote: Hi, in older Kafka versions where offsets were stored in Zookeeper - I could manually update the value of the Zookeeper's node: /consumers/consumer_group_name/offsets/topic_name/partition_number/offset_value. In 0.8.2.1 - there are no values in offsets anymore, but there is a new topic, __consumer_offsets, where as I understand offsets are tracked now. the ConsumerOffsetChecker tool seems to be able to get the offsets values from this topic , since I see correct value running it. So, how do I access this info myself? I tried: ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic __consumer_offsets --from-beginning but it does not show anything Also, how would I change the offset? I need to do this sometimes if I want to skip/ignore some messages and just advance offset manually. thanks, Marina -- Thanks, Raja.
Mirrormaker stops consuming
We recently upgraded to kafka 0.8.2.1 and found issues with mirrormaker that randomly stops consuming. We had to restart the mirrormaker process to resolve the problem. This problem has occurred several times in past two weeks. Here is what I found in analysis: When this problem happens: Mirrormaker log stopped rolling (ie nothing in logs) . Last couple of messages in mirrormaker log are ProducerSendThread producing to destination. No errors or exceptions. Mirrormaker consumer offset doesn't increase. ConsumerOffsetChecker shows mirrormaker consumer offset stops incrementing. Mirrormaker consumer MinFetch rate jmx metric drops to zero. ConsumerTopicMetric.BytesPerSec drops to zero. So its mirrormaker consumer should have stopped accepting new data. Can some one provide input on how to trouble shoot this problem further and identify root cause? Got Thread dump before restarting, it looks ok to me, no blocked thread. Here is thread dump output 2015-05-21 18:59:09 Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.76-b04 mixed mode): Attach Listener daemon prio=10 tid=0x7f7248002000 nid=0x2d53 waiting on condition [0x] java.lang.Thread.State: RUNNABLE Locked ownable synchronizers: - None ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-tyo.ops.sfdc.net-1431458688650-fb15f395-0-2 prio=10 tid=0x7f71e407e000 nid=0x3425 waiting on condition [0x7f72833f2000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x00042cd15cc8 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349) at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:129) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:110) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:110) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:110) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:110) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:109) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:87) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) Locked ownable synchronizers: - 0x00042ea62eb0 (a java.util.concurrent.locks.ReentrantLock$NonfairSync) ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-tyo.ops.sfdc.net-1431458688650-fb15f395-0-3 prio=10 tid=0x7f71e407b000 nid=0x3424 waiting on condition [0x7f7281f99000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x00042ccece80 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349) at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:129) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:110) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:110) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:110) at
Re: Mirrormaker stops consuming
Thanks for pointers Joel. Will look into SSLSocketChannel. Yes this was working fine before upgrade. If its just one producer thread stuck on write, it might affect only one consumer thread/partition. But we found consuming stopped for all topic/partitions. Or Is it only single data channel shared between all producer and consumer threads..? Thanks, Raja. On Fri, May 22, 2015 at 12:12 PM, Joel Koshy jjkosh...@gmail.com wrote: The threaddump suggests that one of the producers (mirrormaker-producer-6) is blocked on write for some reason. So the data-channel for that producer (which sits between the consumers and the producer) is full which blocks the consumers from progressing. This appears to be in your (custom) SSLSocketChannel code. If you take consecutive threaddumps I'm guessing you would see the same trace. If this is reproducible can you do that? You can also hook up jvisualvm or yourkit to see which threads are active and it may well be that producer in a tight loop on the writeCompletely. Just to confirm you did not see this issue before upgrading? Joel On Fri, May 22, 2015 at 11:35:19AM -0400, Rajasekar Elango wrote: We recently upgraded to kafka 0.8.2.1 and found issues with mirrormaker that randomly stops consuming. We had to restart the mirrormaker process to resolve the problem. This problem has occurred several times in past two weeks. Here is what I found in analysis: When this problem happens: Mirrormaker log stopped rolling (ie nothing in logs) . Last couple of messages in mirrormaker log are ProducerSendThread producing to destination. No errors or exceptions. Mirrormaker consumer offset doesn't increase. ConsumerOffsetChecker shows mirrormaker consumer offset stops incrementing. Mirrormaker consumer MinFetch rate jmx metric drops to zero. ConsumerTopicMetric.BytesPerSec drops to zero. So its mirrormaker consumer should have stopped accepting new data. Can some one provide input on how to trouble shoot this problem further and identify root cause? Got Thread dump before restarting, it looks ok to me, no blocked thread. Here is thread dump output 2015-05-21 18:59:09 Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.76-b04 mixed mode): Attach Listener daemon prio=10 tid=0x7f7248002000 nid=0x2d53 waiting on condition [0x] java.lang.Thread.State: RUNNABLE Locked ownable synchronizers: - None ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-tyo.ops.sfdc.net-1431458688650-fb15f395-0-2 prio=10 tid=0x7f71e407e000 nid=0x3425 waiting on condition [0x7f72833f2000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x00042cd15cc8 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349) at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:129) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:110) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:110) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:110) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:110) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:109) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:87) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) Locked ownable synchronizers: - 0x00042ea62eb0 (a java.util.concurrent.locks.ReentrantLock$NonfairSync) ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-tyo.ops.sfdc.net-1431458688650-fb15f395-0-3 prio=10 tid=0x7f71e407b000 nid=0x3424 waiting on condition [0x7f7281f99000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait
Re: Kafka consumer offset checker hangs indefinitely
Hi Meghana, We also faced similar issue and found that it returned ConsumerCoordinatorNotAvailableCode always for one broker (server id 3) and leader for all partitions of __consumer_offsets topic is same broker id 3. So wiped off kafka data dir on that broker and restarted it. After that ConsumerOffsetChecker started working. We had replication enabled so losing kafka data dir is not big deal for us. If you can't delete complete kafka data dir, you can try deleting just __consumer_offsets data. Thanks, Raja. On Thu, May 14, 2015 at 10:46 AM, Meghana Narasimhan mnarasim...@bandwidth.com wrote: Hi Mayuresh, A few more inputs that I can provide at the moment after some testing are as follows. 1. The error returned by the consumer offset checker's ConsumerMetadataResponse is ConsumerCoordinatorNotAvailableCode. Could it somehow be related to the offsets being written to zookeeper and not the internal kafka offsets topic ? 2. I see that the __consumer_offsets topic has been created on all the brokers. 3. I also tried the steps for migrating from zookeeper to kafka offset topic as specified in the documentation [using the offsets.storage and dual.commit.enabled configs] 4. Also based on a few other links I tried to commit offset using OffsetCommitRequest method and to get ConsumerMetadataResponse . But the OffsetCommitResponse also returned error and could not commit the offset successfully. On the other hand the producer and consumer are working fine and able to produce and consume data from the topic. The issue is only with the consumer offset checker tool. Thanks, Meghana On Mon, May 11, 2015 at 7:34 PM, Mayuresh Gharat gharatmayures...@gmail.com wrote: Hi Meghana, Let me try this out on my cluster that has latest trunk deployed. Thanks, Mayuresh On Mon, May 11, 2015 at 1:53 PM, Meghana Narasimhan mnarasim...@bandwidth.com wrote: Hi Mayuresh, A small update. The Kafka version I'm currently using is 2.10-0.8.2.1 (not 2.11 as previously mentioned). The cluster looks fine. Not sure why the consumer offset checker does not return a valid output and gets stuck. bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test Topic:test PartitionCount:3ReplicationFactor:3 Configs:min.insync.replicas=2 Topic: test Partition: 0Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: test Partition: 1Leader: 2 Replicas: 2,0,1 Isr: 1,2,0 Topic: test Partition: 2Leader: 0 Replicas: 0,1,2 Isr: 1,2,0 On Fri, May 8, 2015 at 12:52 PM, Meghana Narasimhan mnarasim...@bandwidth.com wrote: Hi Mayuresh, Yes, the broker is up and accepting connections. Multiple consumers are consuming off topics on the broker. Also I am seeing the issue only with this particular version ( 2.11-0.8.2.1). It worked fine with the beta that I was using earlier. On Fri, May 8, 2015 at 12:45 PM, Mayuresh Gharat gharatmayures...@gmail.com wrote: Is X.X.X.X:9092 up and accepting connections? I am confused aas in why is it not connecting some other broker if connection to this broker fails. Can you check if the broker is up? The way it works is the consumer will send a ConsumerMetadataRequest to one of the brokers and get the offsetmanager for its group and then perform the offset management. Thanks, Mayuresh On Fri, May 8, 2015 at 9:22 AM, Meghana Narasimhan mnarasim...@bandwidth.com wrote: Hi, I'm using the Kafka 8.2.1 version(kafka_2.11-0.8.2.1) and the consumer offset checker hangs indefinitely and does not return any results. I enabled the debug for tools and below is the debug statements as seen on the stdout. Any thoughts or inputs on this will be much appreciated. command used : bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --group test-consumer-group or ./kafka-consumer-offset-checker.sh --zookeeper broker1:2181,broker2:2181,broker3:2181 --group test-consumer-group DEBUG Querying X.X.X.X:9092 to locate offset manager for test-consumer-group. (kafka.client.ClientUtils$) [2015-05-08 10:23:55,090] DEBUG Consumer metadata response: ConsumerMetadataResponse(None,15,0) (kafka.client.ClientUtils$) [2015-05-08 10:23:55,091] DEBUG Query to X.X.X.X:9092 to locate offset manager for test-consumer-group failed - will retry in 3000 milliseconds. (kafka.client.ClientUtils$) [2015-05-08 10:23:58,093] DEBUG Querying X.X.X.X:9092 to locate offset manager for test-consumer-group. (kafka.client.ClientUtils$) [2015-05-08 10:23:58,102] DEBUG Consumer metadata response: ConsumerMetadataResponse(None,15,0) (kafka.client.ClientUtils$) [2015-05-08 10:23:58,103] DEBUG Query to X.X.X.X:9092 to locate
Re: How to set console consumer group ID
Yes, you pass any consumer property including group.id by having them in property file and passing path to it using --consumer.config of consumer consumer. Thanks, Raja. On Wed, Apr 22, 2015 at 1:45 AM, Lukáš Havrlant lu...@havrlant.cz wrote: Hi, is it possible to set group ID for console consumer on command line? Something like $ bin/kafka-console-consumer.sh --groupid myGroupId Lukáš -- Thanks, Raja.
Re: kafka monitoring
Hi Sa Li, You need to set environment variable $JMX_PORT to enable jmx while starting kafka. See to kafka-run-class.sh on how it is used. Then you can connect to hostname:jmxport using Jconsole. Thanks, Raja. On Thu, Jan 8, 2015 at 2:08 PM, Sa Li sal...@gmail.com wrote: Hello, All I understand many of you are using jmxtrans along with graphite/ganglia to pull out metrics, according to https://kafka.apache.org/081/ops.html, it says The easiest way to see the available metrics to fire up jconsole and point it at a running kafka client or server; this will all browsing all metrics with JMX. .. I tried to fire up a jconsole on windows attempting to access our dev and production cluster which are running good, here is the main node of my dev: 10.100.75.128, broker port:9092, zk port:2181 Jconsole shows: New Connection Remote Process: Usage: hostname:port OR service:jmx:protocol:sap Username:Password: Sorry about my naive, I tried connect base on above ip just can't be connected, do I need to do something in dev server to be able to make it work? thanks -- Alec Li -- Thanks, Raja.
Re: kafka monitoring system
Hi Sa Li, You can also try jmxtrans + graphite (for charting). jmxtrans has graphite output adapter out of the box. Regards, Raja. On Mon, Dec 22, 2014 at 10:39 PM, YuanJia Li yuanjia8...@163.com wrote: Hi Sa Li, You can try to use jmxtrans+opentsdb to monitor kafka. Jmxtrans is collecting data with JMX and sending to opentsdb. Opentsdb is graphing and alerting. YuanJia Li From: Sa Li Date: 2014-12-23 08:41 To: users Subject: kafka monitoring system Hi, all I am thinking to make a reliable monitoring system for our kafka production cluster. I read such from documents: Kafka uses Yammer Metrics for metrics reporting in both the server and the client. This can be configured to report stats using pluggable stats reporters to hook up to your monitoring system. The easiest way to see the available metrics to fire up jconsole and point it at a running kafka client or server; this will all browsing all metrics with JMX. We pay particular we do graphing and alerting on the following metrics: .. I am wondering if anyone ever use Jconsole to monitor the kafka, or anyone can recommend a good monitoring tool for kafka production. thanks -- Alec Li -- Thanks, Raja.
Re: Announcing Confluent
Congrats. Wish you all the very best and success. Thanks, Raja. On Thu, Nov 6, 2014 at 1:36 PM, Niek Sanders niek.sand...@gmail.com wrote: Congrats! On Thu, Nov 6, 2014 at 10:28 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey all, I’m happy to announce that Jun Rao, Neha Narkhede and I are creating a company around Kafka called Confluent. We are planning on productizing the kind of Kafka-based real-time data platform we built out at LinkedIn. We are doing this because we think this is a really powerful idea and we felt there was a lot to do to make this idea really take root. We wanted to make that our full time mission and focus. There is a blog post that goes into a little more depth here: http://blog.confluent.io/ LinkedIn will remain a heavy Kafka user and contributor. Combined with our additional resources from the funding of the company this should be a really good thing for the Kafka development effort. Especially when combined with the increasing contributions from the rest of the development community. This is great news, as there is a lot of work to do. We'll need to really focus on scaling this distributed development in a healthy way. One thing I do want to emphasize is that the addition of a company in the Kafka ecosystem won’t mean meddling with open source. Kafka will remain 100% open source and community focused, as of course is true of any Apache project. I have been doing open source for a long time and strongly believe it is the right model for infrastructure software development. Confluent is just getting off the ground now. We left LinkedIn, raised some money, and we have an office (but no furniture yet!). None the less, f you are interested in finding out more about the company and either getting help with your Kafka usage or joining us to help build all this, by all means reach out to us, we’d love to talk. Wish us luck! -Jay -- Thanks, Raja.
Re: [DISCUSS] Kafka Security Specific Features
Can we get the info on targeted release dates for 0.8.2 release and 0.9 release for our planning purposes? Thanks. Raja. On Wed, Jul 30, 2014 at 7:27 PM, Joe Stein joe.st...@stealth.ly wrote: The 0.8.2 release will not have the patch inside of it. Trunk already has a lot inside of it as a point release. The patch also doesn't account for all of the requirements that all of the stakeholders need/want for the feature. Instead of releasing something that is useful but only for some it is better to spend the time to get it right for everyone. We are going to have it in the 0.9 release (possibly also with authorization, encryption and more of the security features too) then. What we will do is keep the patch rebased against trunk and then then 0.8.2 branch (once we get to that point) so that folks can apply it to the 0.8.2 release and do a build from src. When we get to that I can create a write or something if folks find problems doing it. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Wed, Jul 30, 2014 at 7:10 PM, Calvin Lei ckp...@gmail.com wrote: yeah i just saw that. Looking forward to the prod release of 0.8.2 On Wed, Jul 30, 2014 at 11:01 AM, Rajasekar Elango rela...@salesforce.com wrote: We implemented security features on older snapshot version of 0.8 kafka. But Joe Stein's organization rebased it to latest version of kafka available at https://github.com/stealthly/kafka/tree/v0.8.2_KAFKA-1477 . Thanks, Raja. On Tue, Jul 29, 2014 at 10:54 PM, Calvin Lei ckp...@gmail.com wrote: Raja, Which Kafka version is your security enhancement based on? thanks, Cal On Wed, Jul 23, 2014 at 5:01 PM, Chris Neal cwn...@gmail.com wrote: Pramod, I got that same error when following the configuration from Raja's presentation earlier in this thread. If you'll notice the usage for the console_producer.sh, it is slightly different, which is also slightly different than the scala code for the ConsoleProducer. :) When I changed this: bin/kafka-console-producer.sh --broker-list n5:9092:true --topic test to this: bin/kafka-console-producer.sh --broker-list n5:9092 --secure --client.security.file config/client.security.properties --topic test I was able to push messages to the topic, although I got a WARN about the property topic not being valid, even though it is required. Also, the Producer reported this warning to me: [2014-07-23 20:45:24,509] WARN Attempt to reinitialize auth context (kafka.network.security.SecureAuth$) and the broker gave me this: [2014-07-23 20:45:24,114] INFO begin ssl handshake for n5.example.com/192.168.1.144:48817//192.168.1.144:9092 (kafka.network.security.SSLSocketChannel) [2014-07-23 20:45:24,374] INFO finished ssl handshake for n5.example.com/192.168.1.144:48817//192.168.1.144:9092 (kafka.network.security.SSLSocketChannel) [2014-07-23 20:45:24,493] INFO Closing socket connection to n5.example.com/192.168.1.144. (kafka.network.Processor) [2014-07-23 20:45:24,555] INFO begin ssl handshake for n5.example.com/192.168.1.144:48818//192.168.1.144:9092 (kafka.network.security.SSLSocketChannel) [2014-07-23 20:45:24,566] INFO finished ssl handshake for n5.example.com/192.168.1.144:48818//192.168.1.144:9092 (kafka.network.security.SSLSocketChannel) It's like it did the SSL piece twice :) Subsequent puts to the topic did not exhibit this behavior though: root@n5[937]:~/kafka_2.10-0-8-2-0.1.0.0 bin/kafka-console-producer.sh --broker-list n5:9092 --secure --client.security.file config/client.security.properties --topic test [2014-07-23 20:45:17,530] WARN Property topic is not valid (kafka.utils.VerifiableProperties) 1 [2014-07-23 20:45:24,509] WARN Attempt to reinitialize auth context (kafka.network.security.SecureAuth$) 2 3 4 Consuming worked with these options: root@n5[918]:~/kafka_2.10-0-8-2-0.1.0.0 bin/kafka-console-consumer.sh --topic test --zookeeper n5:2181 --from-beginning --security.config.file config/client.security.properties 1 2 3 4 ^CConsumed 5 messages I hope that helps! Chris On Tue, Jul 22, 2014 at 2:10 PM, Pramod Deshmukh dpram...@gmail.com wrote: Anyone getting this issue. Is it something related to environment or it is the code. Producer works fine when run with secure=false (no security) mode. pdeshmukh$ bin/kafka-console
Re: KAFKA-1477 (authentication layer) and 0.8.2
Yes we are very much interested in getting this code merged to trunk. I can also do testing once it's available on trunk. Thanks, Raja. On Fri, Jul 25, 2014 at 12:11 PM, Joe Stein joe.st...@stealth.ly wrote: Hi Chris, glad to hear that even more folks are going to (want to) use the feature. I didn't author the patch (Raja and Ivan did) and created the fork so folks could test it without much fuss. I just commented on the ticket to address Jun's last comment and think it also answers your question too. I know folks are using this now and other folks are looking to use it out of the core project. As long as it has a way to cause no harm when it is off I believe it really adds to the value Kafka brings to a number of organizations that can't use Kafka just because of this one thing. I am looking forward to being able to commit it to trunk. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Fri, Jul 25, 2014 at 11:34 AM, Chris Neal cwn...@gmail.com wrote: Hi guys, This JIRA (https://issues.apache.org/jira/browse/KAFKA-1477) leads me to believe that an authentication layer implementation is planned as part of the 0.8.2 release. I was wondering if this is still the case? There was an earlier thread talking about security, but there hasn't been activity on it in awhile. I grabbed Joe's fork and it works, but I was wondering about it getting merged back into the official 0.8.2 codebase, or is this more likely something that will be in 0.9? Thanks! -- Thanks, Raja.
Re: [DISCUSS] Kafka Security Specific Features
Pramod, I presented secure kafka configuration and usage at last meet up. So hope this video recording http://www.ustream.tv/recorded/48396701would help. You can skip to about 59 min to jump to security talk. Thanks, Raja. On Wed, Jul 16, 2014 at 5:57 PM, Pramod Deshmukh dpram...@gmail.com wrote: Hello Joe, Is there a configuration or example to test Kafka security piece? Thanks, Pramod On Wed, Jul 16, 2014 at 5:20 PM, Pramod Deshmukh dpram...@gmail.com wrote: Thanks Joe, This branch works. I was able to proceed. I still had to set scala version to 2.9.2 in kafka-run-class.sh. On Wed, Jul 16, 2014 at 3:57 PM, Joe Stein joe.st...@stealth.ly wrote: That is a very old branch. Here is a more up to date one https://github.com/stealthly/kafka/tree/v0.8.2_KAFKA-1477 (needs to be updated to latest trunk might have a chance to-do that next week). You should be using gradle now as per the README. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Wed, Jul 16, 2014 at 3:49 PM, Pramod Deshmukh dpram...@gmail.com wrote: Thanks Joe for this, I cloned this branch and tried to run zookeeper but I get Error: Could not find or load main class org.apache.zookeeper.server.quorum.QuorumPeerMain I see scala version is still set to 2.8.0 if [ -z $SCALA_VERSION ]; then SCALA_VERSION=2.8.0 fi Then I installed sbt and scala and followed your instructions for different scala versions. I was able to bring zookeeper up but brokers fail to start with error Error: Could not find or load main class kafka.Kafka I think I am doing something wrong. Can you please help me? Our current production setup is with 2.8.0 and want to stick to it. Thanks, Pramod On Tue, Jun 3, 2014 at 3:57 PM, Joe Stein joe.st...@stealth.ly wrote: Hi,I wanted to re-ignite the discussion around Apache Kafka Security. This is a huge bottleneck (non-starter in some cases) for a lot of organizations (due to regulatory, compliance and other requirements). Below are my suggestions for specific changes in Kafka to accommodate security requirements. This comes from what folks are doing in the wild to workaround and implement security with Kafka as it is today and also what I have discovered from organizations about their blockers. It also picks up from the wiki (which I should have time to update later in the week based on the below and feedback from the thread). 1) Transport Layer Security (i.e. SSL) This also includes client authentication in addition to in-transit security layer. This work has been picked up here https://issues.apache.org/jira/browse/KAFKA-1477 and do appreciate any thoughts, comments, feedback, tomatoes, whatever for this patch. It is a pickup from the fork of the work first done here https://github.com/relango/kafka/tree/kafka_security. 2) Data encryption at rest. This is very important and something that can be facilitated within the wire protocol. It requires an additional map data structure for the encrypted [data encryption key]. With this map (either in your object or in the wire protocol) you can store the dynamically generated symmetric key (for each message) and then encrypt the data using that dynamically generated key. You then encrypt the encryption key using each public key for whom is expected to be able to decrypt the encryption key to then decrypt the message. For each public key encrypted symmetric key (which is now the encrypted [data encryption key] along with which public key it was encrypted with for (so a map of [publicKey] = encryptedDataEncryptionKey) as a chain. Other patterns can be implemented but this is a pretty standard digital enveloping [0] pattern with only 1 field added. Other patterns should be able to use that field to-do their implementation too. 3) Non-repudiation and long term non-repudiation. Non-repudiation is proving data hasn't changed. This is often (if not always) done with x509 public certificates (chained to a certificate authority). Long term non-repudiation is what happens when the certificates of the certificate authority are expired (or revoked) and everything ever signed (ever) with that certificate's public key then becomes no longer provable as ever being authentic. That is where RFC3126 [1] and RFC3161 [2] come in (or worm drives [hardware], etc). For either (or both) of these it is an operation of the encryptor to sign/hash the data
Re: [DISCUSS] Kafka Security Specific Features
Hi Jay, Thanks for putting together a spec for security. Joe, Looks Securing zookeeper.. part has been deleted from assumptions section. communication with zookeeper need to be secured as well to make entire kafka cluster secure. It may or may not require changes to kafka. But it's good to have it in spec. I could not find a link to edit the page after login into wiki. Do I need any special permission to make edits? Thanks, Raja. On Wed, Jun 4, 2014 at 8:57 PM, Joe Stein joe.st...@stealth.ly wrote: I like the idea of working on the spec and prioritizing. I will update the wiki. - Joestein On Wed, Jun 4, 2014 at 1:11 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Joe, Thanks for kicking this discussion off! I totally agree that for something that acts as a central message broker security is critical feature. I think a number of people have been interested in this topic and several people have put effort into special purpose security efforts. Since most the LinkedIn folks are working on the consumer right now I think this would be a great project for any other interested people to take on. There are some challenges in doing these things distributed but it can also be a lot of fun. I think a good first step would be to get a written plan we can all agree on for how things should work. Then we can break things down into chunks that can be done independently while still aiming at a good end state. I had tried to write up some notes that summarized at least the thoughts I had had on security: https://cwiki.apache.org/confluence/display/KAFKA/Security What do you think of that? One assumption I had (which may be incorrect) is that although we want all the things in your list, the two most pressing would be authentication and authorization, and that was all that write up covered. You have more experience in this domain, so I wonder how you would prioritize? Those notes are really sketchy, so I think the first goal I would have would be to get to a real spec we can all agree on and discuss. A lot of the security stuff has a high human interaction element and needs to work in pretty different domains and different companies so getting this kind of review is important. -Jay On Tue, Jun 3, 2014 at 12:57 PM, Joe Stein joe.st...@stealth.ly wrote: Hi,I wanted to re-ignite the discussion around Apache Kafka Security. This is a huge bottleneck (non-starter in some cases) for a lot of organizations (due to regulatory, compliance and other requirements). Below are my suggestions for specific changes in Kafka to accommodate security requirements. This comes from what folks are doing in the wild to workaround and implement security with Kafka as it is today and also what I have discovered from organizations about their blockers. It also picks up from the wiki (which I should have time to update later in the week based on the below and feedback from the thread). 1) Transport Layer Security (i.e. SSL) This also includes client authentication in addition to in-transit security layer. This work has been picked up here https://issues.apache.org/jira/browse/KAFKA-1477 and do appreciate any thoughts, comments, feedback, tomatoes, whatever for this patch. It is a pickup from the fork of the work first done here https://github.com/relango/kafka/tree/kafka_security. 2) Data encryption at rest. This is very important and something that can be facilitated within the wire protocol. It requires an additional map data structure for the encrypted [data encryption key]. With this map (either in your object or in the wire protocol) you can store the dynamically generated symmetric key (for each message) and then encrypt the data using that dynamically generated key. You then encrypt the encryption key using each public key for whom is expected to be able to decrypt the encryption key to then decrypt the message. For each public key encrypted symmetric key (which is now the encrypted [data encryption key] along with which public key it was encrypted with for (so a map of [publicKey] = encryptedDataEncryptionKey) as a chain. Other patterns can be implemented but this is a pretty standard digital enveloping [0] pattern with only 1 field added. Other patterns should be able to use that field to-do their implementation too. 3) Non-repudiation and long term non-repudiation. Non-repudiation is proving data hasn't changed. This is often (if not always) done with x509 public certificates (chained to a certificate authority). Long term non-repudiation is what happens when the certificates of the certificate authority are expired (or revoked) and everything ever signed (ever) with that certificate's public key then becomes no longer provable as ever being authentic. That is where
Re: Spring integration?
Hi Michael, We are using spring integration kafkahttps://github.com/spring-projects/spring-integration-extensions/tree/master/spring-integration-kafkain production and have been working fine. We also contributed some features (for eg: support for topic filter https://jira.spring.io/browse/INTEXT-77) to back to project. The outbound adapter has a poller to make them work with enterprise integration patten Message Channels. If you need more detailed answer you could try posting your question in stack overflow. I have also seen camel component for kafkahttps://github.com/BreizhBeans/camel-kafka, but havent' tried it. Thanks, Raja. On Mon, Apr 7, 2014 at 10:26 PM, Michael Campbell michael.campb...@gmail.com wrote: Hello, My company is looking at Kafka for a backbone to microservices, and we were wondering if anyone had actually done anything with it and Spring Integration (which we are looking at also for additional things). The reason I ask is the one place I can find any code seems about a half year out of date, is targeting both an old version of Kafka (0.8-beta) and an older version of Scala, and the example seems ... odd to me (he has a poller in the outbound adapter, which I don't understand at all). Is anyone actually using this combination? Or, is there a better way to integrate with Kafka if I want to put a layer between my business code and the Kafka API?(Camel, perhaps?) -- Thanks, Raja.
Re: Kafka and authentication
Hi Vijay, We implemented mutual ssl authentication in kafka for our internal use and we have plans to it contributed back to community. But we implemented SSL over older snapshot of version of kafka 0.8 release. We have been busy with other projects and haven't got chance to merge our ssl changes to latest version of kafka. If you are interested in looking at the changes we made this, its available in my github fork of apache kafka ( https://github.com/relango/kafka/tree/kafka_security) Thanks, Raja. On Fri, Mar 28, 2014 at 10:06 PM, Neha Narkhede neha.narkh...@gmail.comwrote: Hi Vijay, The document you pointed out has our initial thoughts on Kafka security. This work is still in design and discussion phase, no code has been written as such and we hope to pick it up in a couple months. However, if you have thoughts on how it should work and/or would like to contribute patches, we would be happy to collaborate with you. Thanks, Neha On Fri, Mar 28, 2014 at 4:05 PM, Vijay Ramachandran vramachand...@apple.com wrote: Hi All, I was googling around for info on securing kafka. The best document I could find was https://cwiki.apache.org/confluence/display/KAFKA/Security, which is kind of old. It is not clear if any steps were taken after this doc was put together. Looking at the features / bug fixes in kafka also does not paint a clear picture. Hence this set of questions : Is there a way to make kafka authenticate a producer sending messages / consumer reading messages ? Is there a way to make kafka authenticate itself to the ZooKeeper ensemble ? Any info will be deeply appreciated Thanks Vijay -- Thanks, Raja.
Re: Spring Integration Kafka support
Hi Preetham, We are able to successfully use spring-integration-kafka with non-zero broker ids without any issues. Could you provide more details on what exactly problem/error you are getting.? Also, You can try this examples under samples https://github.com/spring-projects/spring-integration-extensions/tree/master/samples/kafka to get started quickly. Thanks, Raja. On Fri, Jan 10, 2014 at 4:08 PM, Premchandra, Preetham Kukillaya preethampremchan...@fico.com wrote: Hi, I was doing a poc using https://github.com/SpringSource/spring-integration-extensions/tree/master/spring-integration-kafka. I figured that code is expecting the brokerid=0 and ideally this will not be the case if multiple brokers are connecting to the same zookeeper. Regards Preetham This email and any files transmitted with it are confidential, proprietary and intended solely for the individual or entity to whom they are addressed. If you have received this email in error please delete it immediately. -- Thanks, Raja.
Re: Producer SSL?
Hi Jonathan We forked kafka to add SSL feature. It not part of kafka official release Sent from my iPhone On Nov 15, 2013, at 12:32 PM, Jonathan Hodges hodg...@gmail.com wrote: Hi, While searching the user group messages I found the following thread - http://grokbase.com/t/kafka/users/138vqq1x07/getting-leadernotavailableexception-in-console-producer-after-increasing-partitions-from-4-to-16. It shows the following stack trace with 0.8. [2013-08-27 08:29:30,372] INFO Fetching metadata from broker id:0,host:localhost,port:6667,secure:true with correlation id 8 for 1 topic(s) Set(test-41) (kafka.client.ClientUtils$) [2013-08-27 08:29:30,373] INFO begin ssl handshake for localhost/ 127.0.0.1:6667//127.0.0.1:36640 (kafka.security.SSLSocketChannel) [2013-08-27 08:29:30,375] INFO finished ssl handshake for localhost/ 127.0.0.1:6667//127.0.0.1:36640 (kafka.security.SSLSocketChannel) [2013-08-27 08:29:30,375] INFO Connected to localhost:6667:true for producing (kafka.producer.SyncProducer) [2013-08-27 08:29:30,380] INFO Disconnecting from localhost:6667:true (kafka.producer.SyncProducer) [2013-08-27 08:29:30,381] INFO Secure sockets for data transfer is enabled (kafka.producer.SyncProducerConfig) Is there a 'secure' boolean property on the broker that allows for SSL? I didn't see it on http://kafka.apache.org/08/configuration.html but maybe I missed it? Thanks, Jonathan
Mirrormaker consumer looping to offset out of range and reset offset errors
We are seeing that mirrormaker consumer started looping through offset out of range and reset offset errors for some of partitions (2 out of 8 partitions). The consumerOffsetChecker reported very high Lag for these 2 partitions. Looks like this problem has started after a consumer rebalance. Here is log lines: 2013-10-06 06:09:59,993 [ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4] WARN (kafka.consumer.ConsumerFetcherThread) - [ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4], current offset 2526006629 for partition [FunnelProto,1] out of range; reset offset to 2526006629 2013-10-06 06:09:59,993 [ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4] WARN (kafka.consumer.ConsumerFetcherThread) - [ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4], current offset 2363213504 for partition [FunnelProto,3] out of range; reset offset to 2363213504 2013-10-06 06:09:59,993 [ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4] WARN (kafka.consumer.ConsumerFetcherThread) - [ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4], current offset 2146256007 for partition [jmx,0] out of range; reset offset to 2146256007 2013-10-06 06:09:59,992 [ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4] WARN (kafka.consumer.ConsumerFetcherThread) - [ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4], current offset 2239688 for partition [tower_timing_metrics,3] out of range; reset offset to 2239688 2013-10-06 06:09:59,889 [ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4] WARN (kafka.consumer.ConsumerFetcherThread) - [ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4], current offset 1234239 for partition [agent,0] out of range; reset offset to 1234239 2013-10-06 06:09:59,889 [ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4] WARN (kafka.consumer.ConsumerFetcherThread) - [ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4], current offset 2526006629 for partition [FunnelProto,1] out of range; reset offset to 2526006629 2013-10-06 06:09:59,889 [ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4] WARN (kafka.consumer.ConsumerFetcherThread) - [ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4], current offset 2363213504 for partition [FunnelProto,3] out of range; reset offset to 2363213504 Also, as you can it's resetting offset to same value so it's looping through this offset resets again and again. After we restarted our mirrormaker process, it started consuming from beginning topic for all partitions (we started received messages 7 days ) and it caught in couple of hours.. We have couple of questions 1) What might have caused this to end up in this bad state..? 2) We had offset out of range problem only for 2 out of 8 partitions, but it started to consume from beginning for all partitions in topic after we restarted mirrormaker.. How problem with 2 partitions affected all other partitions ..? -- Thanks, Raja.
Re: Kafka consumer - Mbean for max lag
Thanks Neha, Looks like this mbean was added recently. The version we are running is from early June and it doesn't have this Mbean. Thanks, Raja. On Mon, Sep 23, 2013 at 9:15 PM, Neha Narkhede neha.narkh...@gmail.comwrote: On the consumer side, look for kafka.consumer:name=([-.\w]+)-MaxLag,type=ConsumerFetcherManager. Updated the website to reflect that. Thanks, Neha On Mon, Sep 23, 2013 at 12:48 PM, Rajasekar Elango rela...@salesforce.comwrote: In kafka documentation for monitoringhttp://kafka.apache.org/documentation.html#operations. I see we should be looking at max in messages among all partitions.. All I can see is mbeans *kafka.server.FetcherLagMetrics.*ConsumerFetcherThread* and it's value is pretty much 0. Is this the correct Mbean ? If not, Can you tell me which MBean provides that info? Thanks in advance. -- Thanks, Raja. -- Thanks, Raja.
Re: Leader doesn't get assigned for new topics
From the output of StateChangeLogMerger tool, I see only this error repeated; [2013-09-18 14:16:48,358] ERROR [KafkaApi-1] Error while fetching metadata for partition [FunnelProto,0] (kafka.server.KafkaApis) On the state-change.log itself, I see this error: [2013-09-18 14:22:48,954] ERROR Conditional update of path /brokers/topics/test-1379439240191/partitions/2/state with data { controller_epoch:10, isr:[ 1, 5, 4 ], leader:1, leader_epoch:4, version:1 } and expected version 8 fai led due to org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /brokers/topics/test-1379439240191/partitions/2/state (kafka.utils.ZkUtils$) Do you know reason for above error..? Also this problem seem to be intermittent, it started working now without any changes. I will continue to monitor. Thanks, Raja. On Tue, Sep 17, 2013 at 7:59 PM, Neha Narkhede neha.narkh...@gmail.comwrote: Raja, Could you run the StateChangeLogMerger tool and give it one topic-partition that has the above mentioned problem. This tool is documented here - https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-7.StateChangeLogMergerTool . Let me know if you run into any issues while using it. Thanks, Neha On Tue, Sep 17, 2013 at 12:27 PM, Rajasekar Elango rela...@salesforce.comwrote: Neha/Jun, The same problem started happening again although now our zookeeper cluster is configured correctly. The produce always failed with LeaderNotAvailableException and list topics shows topic is created with leader none. In the controller and stage-change log, I am seeing lot of these failures.. [2013-09-17 19:21:36,531] WARN [KafkaApi-2] Produce request with correlation id 622369865 from client on partition [FunnelProto,6] failed due to Partition [FunnelProto,6] doesn't exist on 2 (kafka.server.KafkaApis) [2013-09-17 19:21:36,531] WARN [KafkaApi-2] Produce request with correlation id 622369865 from client on partition [internal_metrics,3] failed due to Partition [internal_metrics,3] doesn't exist on 2 (kafka.server.KafkaApis) [2013-09-17 19:21:36,531] WARN [KafkaApi-2] Produce request with correlation id 622369865 from client on partition [FunnelProto,0] failed due to Partition [FunnelProto,0] doesn't exist on 2 (kafka.server.KafkaApis) [2013-09-17 19:21:36,531] WARN [KafkaApi-2] Produce request with correlation id 622369865 from client on partition [jmx,3] failed due to Partition [jmx,3] doesn't exist on 2 (kafka.server.KafkaApis) [2013-09-17 19:21:36,531] WARN [KafkaApi-2] Produce request with correlation id 622369865 from client on partition [FunnelProto,5] failed due to Partition [FunnelProto,5] doesn't exist on 2 (kafka.server.KafkaApis) When I ran listTopics command for one of above topic, all partitions are under replicated (we have replication factor set to 3). Any clues on what could be issue and how can we get it back to working? Thanks, Raja. On Fri, Sep 13, 2013 at 6:26 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Ah ok. Thanks for sharing that. On Fri, Sep 13, 2013 at 2:50 PM, Rajasekar Elango rela...@salesforce.com wrote: We have 3 zookeeper node in the cluster with a hardware load balancer . In one of the zookeeper, we did not configure ensemble correctly (server.n property in zoo.cfg) . So it ended up as like 2 nodes in one cluster, one node in other cluster. The load balancer is randomly hitting one of 2 zookeepers in two different cluster. Thanks, Raja. On Fri, Sep 13, 2013 at 1:04 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Just curious to know, what was the misconfiguration? On Fri, Sep 13, 2013 at 10:02 AM, Rajasekar Elango rela...@salesforce.comwrote: Thanks Neha and Jun, It turned out to be miss configuration in our zookeeper cluster. After correcting it everything looks good. Thanks, Raja. On Fri, Sep 13, 2013 at 10:13 AM, Jun Rao jun...@gmail.com wrote: Any error in the controller and the state-change log? Are brokers 2,3,4 alive? Thanks, Jun On Thu, Sep 12, 2013 at 4:56 PM, Rajasekar Elango rela...@salesforce.com wrote: We are seeing a problem that we we try to send messages to new topic it fails kafka.common.LeaderNotAvailableException. But usually this problem will be transient and if we re-send messages to same topic will work. But now we tried rending message to same topic several time, but still fails with same error: In the server log I see ] Auto creation of topic test-sjl2 with 8 partitions and replication factor 3 is successful!. But listTopics command shows leader
Re: Leader doesn't get assigned for new topics
Neha/Jun, The same problem started happening again although now our zookeeper cluster is configured correctly. The produce always failed with LeaderNotAvailableException and list topics shows topic is created with leader none. In the controller and stage-change log, I am seeing lot of these failures.. [2013-09-17 19:21:36,531] WARN [KafkaApi-2] Produce request with correlation id 622369865 from client on partition [FunnelProto,6] failed due to Partition [FunnelProto,6] doesn't exist on 2 (kafka.server.KafkaApis) [2013-09-17 19:21:36,531] WARN [KafkaApi-2] Produce request with correlation id 622369865 from client on partition [internal_metrics,3] failed due to Partition [internal_metrics,3] doesn't exist on 2 (kafka.server.KafkaApis) [2013-09-17 19:21:36,531] WARN [KafkaApi-2] Produce request with correlation id 622369865 from client on partition [FunnelProto,0] failed due to Partition [FunnelProto,0] doesn't exist on 2 (kafka.server.KafkaApis) [2013-09-17 19:21:36,531] WARN [KafkaApi-2] Produce request with correlation id 622369865 from client on partition [jmx,3] failed due to Partition [jmx,3] doesn't exist on 2 (kafka.server.KafkaApis) [2013-09-17 19:21:36,531] WARN [KafkaApi-2] Produce request with correlation id 622369865 from client on partition [FunnelProto,5] failed due to Partition [FunnelProto,5] doesn't exist on 2 (kafka.server.KafkaApis) When I ran listTopics command for one of above topic, all partitions are under replicated (we have replication factor set to 3). Any clues on what could be issue and how can we get it back to working? Thanks, Raja. On Fri, Sep 13, 2013 at 6:26 PM, Neha Narkhede neha.narkh...@gmail.comwrote: Ah ok. Thanks for sharing that. On Fri, Sep 13, 2013 at 2:50 PM, Rajasekar Elango rela...@salesforce.com wrote: We have 3 zookeeper node in the cluster with a hardware load balancer . In one of the zookeeper, we did not configure ensemble correctly (server.n property in zoo.cfg) . So it ended up as like 2 nodes in one cluster, one node in other cluster. The load balancer is randomly hitting one of 2 zookeepers in two different cluster. Thanks, Raja. On Fri, Sep 13, 2013 at 1:04 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Just curious to know, what was the misconfiguration? On Fri, Sep 13, 2013 at 10:02 AM, Rajasekar Elango rela...@salesforce.comwrote: Thanks Neha and Jun, It turned out to be miss configuration in our zookeeper cluster. After correcting it everything looks good. Thanks, Raja. On Fri, Sep 13, 2013 at 10:13 AM, Jun Rao jun...@gmail.com wrote: Any error in the controller and the state-change log? Are brokers 2,3,4 alive? Thanks, Jun On Thu, Sep 12, 2013 at 4:56 PM, Rajasekar Elango rela...@salesforce.com wrote: We are seeing a problem that we we try to send messages to new topic it fails kafka.common.LeaderNotAvailableException. But usually this problem will be transient and if we re-send messages to same topic will work. But now we tried rending message to same topic several time, but still fails with same error: In the server log I see ] Auto creation of topic test-sjl2 with 8 partitions and replication factor 3 is successful!. But listTopics command shows leader none like below: topic: test-sjl2partition: 0leader: nonereplicas: 2,4,3 isr: topic: test-sjl2partition: 1leader: nonereplicas: 3,2,4 isr: topic: test-sjl2partition: 2leader: nonereplicas: 4,3,2 isr: topic: test-sjl2partition: 3leader: nonereplicas: 2,3,4 isr: topic: test-sjl2partition: 4leader: nonereplicas: 3,4,2 isr: topic: test-sjl2partition: 5leader: nonereplicas: 4,2,3 isr: topic: test-sjl2partition: 6leader: nonereplicas: 2,4,3 isr: topic: test-sjl2partition: 7leader: nonereplicas: 3,2,4 isr: I also see following NotLeaderForPatritionExcetion and ZookeeperExcetion in logs kafka.common.NotLeaderForPartitionException at sun.reflect.GeneratedConstructorAccessor19.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) at java.lang.reflect.Constructor.newInstance(Constructor.java:513) at java.lang.Class.newInstance0(Class.java:355) at java.lang.Class.newInstance(Class.java:308) at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:70) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun
Re: Leader doesn't get assigned for new topics
Thanks Neha and Jun, It turned out to be miss configuration in our zookeeper cluster. After correcting it everything looks good. Thanks, Raja. On Fri, Sep 13, 2013 at 10:13 AM, Jun Rao jun...@gmail.com wrote: Any error in the controller and the state-change log? Are brokers 2,3,4 alive? Thanks, Jun On Thu, Sep 12, 2013 at 4:56 PM, Rajasekar Elango rela...@salesforce.com wrote: We are seeing a problem that we we try to send messages to new topic it fails kafka.common.LeaderNotAvailableException. But usually this problem will be transient and if we re-send messages to same topic will work. But now we tried rending message to same topic several time, but still fails with same error: In the server log I see ] Auto creation of topic test-sjl2 with 8 partitions and replication factor 3 is successful!. But listTopics command shows leader none like below: topic: test-sjl2partition: 0leader: nonereplicas: 2,4,3 isr: topic: test-sjl2partition: 1leader: nonereplicas: 3,2,4 isr: topic: test-sjl2partition: 2leader: nonereplicas: 4,3,2 isr: topic: test-sjl2partition: 3leader: nonereplicas: 2,3,4 isr: topic: test-sjl2partition: 4leader: nonereplicas: 3,4,2 isr: topic: test-sjl2partition: 5leader: nonereplicas: 4,2,3 isr: topic: test-sjl2partition: 6leader: nonereplicas: 2,4,3 isr: topic: test-sjl2partition: 7leader: nonereplicas: 3,2,4 isr: I also see following NotLeaderForPatritionExcetion and ZookeeperExcetion in logs kafka.common.NotLeaderForPartitionException at sun.reflect.GeneratedConstructorAccessor19.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) at java.lang.reflect.Constructor.newInstance(Constructor.java:513) at java.lang.Class.newInstance0(Class.java:355) at java.lang.Class.newInstance(Class.java:308) at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:70) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun$apply$5.apply(AbstractFetcherThread.scala:158) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun$apply$5.apply(AbstractFetcherThread.scala:158) at kafka.utils.Logging$class.warn(Logging.scala:88) at kafka.utils.ShutdownableThread.warn(ShutdownableThread.scala:23) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:157) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:113) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:113) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) 2013-09-12 23:54:10,838 [kafka-request-handler-2] ERROR (kafka.utils.ZkUtils$) - Conditional update of path /brokers/topics/FunnelProto/partitions/4/state with data { controller_epoch:3, isr:[ 2, 5 ], leader:2, leader_epoch:2, version:1 } and expected version 14 failed due to org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /brokers/topics/FunnelProto/partitions/4/state 2013-09-12 23:54:10,838 [kafka-request-handler-2] ERROR (kafka.utils.ZkUtils$) - Conditional update of path /brokers/topics/FunnelProto/partitions/4/state with data { controller_epoch:3, isr:[ 2, 5 ], leader:2, leader_epoch:2, version:1 } and expected version 14 failed due to org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /brokers/topics/FunnelProto/partitions/4/state Any clues on what could be problem.. ? Any for your help. -- Thanks, Raja. -- Thanks, Raja.
Re: Leader doesn't get assigned for new topics
We have 3 zookeeper node in the cluster with a hardware load balancer . In one of the zookeeper, we did not configure ensemble correctly (server.n property in zoo.cfg) . So it ended up as like 2 nodes in one cluster, one node in other cluster. The load balancer is randomly hitting one of 2 zookeepers in two different cluster. Thanks, Raja. On Fri, Sep 13, 2013 at 1:04 PM, Neha Narkhede neha.narkh...@gmail.comwrote: Just curious to know, what was the misconfiguration? On Fri, Sep 13, 2013 at 10:02 AM, Rajasekar Elango rela...@salesforce.comwrote: Thanks Neha and Jun, It turned out to be miss configuration in our zookeeper cluster. After correcting it everything looks good. Thanks, Raja. On Fri, Sep 13, 2013 at 10:13 AM, Jun Rao jun...@gmail.com wrote: Any error in the controller and the state-change log? Are brokers 2,3,4 alive? Thanks, Jun On Thu, Sep 12, 2013 at 4:56 PM, Rajasekar Elango rela...@salesforce.com wrote: We are seeing a problem that we we try to send messages to new topic it fails kafka.common.LeaderNotAvailableException. But usually this problem will be transient and if we re-send messages to same topic will work. But now we tried rending message to same topic several time, but still fails with same error: In the server log I see ] Auto creation of topic test-sjl2 with 8 partitions and replication factor 3 is successful!. But listTopics command shows leader none like below: topic: test-sjl2partition: 0leader: nonereplicas: 2,4,3 isr: topic: test-sjl2partition: 1leader: nonereplicas: 3,2,4 isr: topic: test-sjl2partition: 2leader: nonereplicas: 4,3,2 isr: topic: test-sjl2partition: 3leader: nonereplicas: 2,3,4 isr: topic: test-sjl2partition: 4leader: nonereplicas: 3,4,2 isr: topic: test-sjl2partition: 5leader: nonereplicas: 4,2,3 isr: topic: test-sjl2partition: 6leader: nonereplicas: 2,4,3 isr: topic: test-sjl2partition: 7leader: nonereplicas: 3,2,4 isr: I also see following NotLeaderForPatritionExcetion and ZookeeperExcetion in logs kafka.common.NotLeaderForPartitionException at sun.reflect.GeneratedConstructorAccessor19.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) at java.lang.reflect.Constructor.newInstance(Constructor.java:513) at java.lang.Class.newInstance0(Class.java:355) at java.lang.Class.newInstance(Class.java:308) at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:70) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun$apply$5.apply(AbstractFetcherThread.scala:158) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun$apply$5.apply(AbstractFetcherThread.scala:158) at kafka.utils.Logging$class.warn(Logging.scala:88) at kafka.utils.ShutdownableThread.warn(ShutdownableThread.scala:23) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:157) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:113) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:113) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) 2013-09-12 23:54:10,838 [kafka-request-handler-2] ERROR (kafka.utils.ZkUtils$) - Conditional update of path /brokers/topics/FunnelProto/partitions/4/state with data { controller_epoch:3, isr:[ 2, 5 ], leader:2, leader_epoch:2, version:1 } and expected version 14 failed due to org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /brokers/topics/FunnelProto/partitions/4/state 2013-09-12 23:54:10,838 [kafka-request-handler-2] ERROR (kafka.utils.ZkUtils$) - Conditional update of path /brokers/topics/FunnelProto/partitions/4/state with data { controller_epoch:3, isr:[ 2, 5 ], leader:2, leader_epoch:2, version:1 } and expected version 14 failed due to org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /brokers/topics/FunnelProto/partitions/4/state Any clues on what could
Re: Mirror maker doesn't replicate new topics
Hi Guozhang , 1) When I say I send messages to new topic - yes I am sending new messages to source cluster via console producer. 2) The log message Handling 0 events doesn't output topic name. But I would believe its for both old and new topics, because no other app is sending messages to source cluster other than me trying to test using console producer. Thanks, Raja. On Tue, Sep 10, 2013 at 1:03 PM, Guozhang Wang wangg...@gmail.com wrote: Hi Raja, When you say I send messages to new topic I guess you mean that you send messages to the source cluster right? It may be due to the fact that producers of mirror make have not catched up with the mirror maker consumer. When you say I always see Handling 0 events do you mean that you see this for both messages for the new topic and for the old topics, or it only shows this log for new topic? Guozhang On Tue, Sep 10, 2013 at 7:47 AM, Rajasekar Elango rela...@salesforce.com wrote: Thanks Guozhang, 1, 2, 3 all are true. We are using default value 200 for batch.num.messages and 5000ms queue.buffering.max.ms. I believe it should batch either if batch.num.messages is reached or queue.buffering.max.ms is reached. I see log message 5000ms elapsed , Queue time reached. Sending.on regular interval. But when I send messages to new topic, I always see Handling 0 events and it doesn't produce to target cluster. But when I resend it second time, I see Handling x events and starts producing. Any clues on how to debug further? Thanks, Raja. On Mon, Sep 9, 2013 at 6:02 PM, Guozhang Wang wangg...@gmail.com wrote: Hi Raja, So just to summarize the scenario: 1) The consumer of mirror maker is successfully consuming all partitions of the newly created topic. 2) The producer of mirror maker is not producing the new messages immediately when the topic is created (observed from ProducerSendThread's log). 3) The producer of mirror maker will start producing the new messages when more messages are sent to the source cluster. If 1) is true then KAFKA-1030 is excluded, since the consumer successfully recognize all the partitions and start consuming. If both 2) and 3) is true, I would wonder if the batch size of the mirror maker producer is large and hence will not send until enough messages are accumulated at the producer queue. Guozhang On Mon, Sep 9, 2013 at 2:36 PM, Rajasekar Elango rela...@salesforce.com wrote: yes, the data exists in source cluster, but not in target cluster. I can't replicate this problem in dev environment and it happens only in prod environment. I turned on debug logging, but not able to identify the problem. Basically, whenever I send data to new topic, I don't see any log messages from ProducerSendThread in mirrormaker log so they are not produced to target cluster. If I send more messages to same topic, the producer send thread kicks off and replicates the messages. But whatever messages send first time gets lost. How can I trouble shoot this problem further? Even this could be due to know issue https://issues.apache.org/jira/browse/KAFKA-1030, how can I confirm that? Is there config tweaking I can make to workaround this..? ConsumerOffsetChecks helps to track consumers. Its there any other tool we can use to track producers in mirrormaker. ? Thanks in advance for help. Thanks, Raja. On Fri, Sep 6, 2013 at 3:50 AM, Swapnil Ghike sgh...@linkedin.com wrote: Hi Rajasekar, You said that ConsumerOffsetChecker shows that new topics are successfully consumed and the lag is 0. If that's the case, can you verify that there is data on the source cluster for these new topics? If there is no data at the source, MirrorMaker will only assign consumer streams to the new topic, but the lag will be 0. This could otherwise be related to https://issues.apache.org/jira/browse/KAFKA-1030. Swapnil On 9/5/13 8:38 PM, Guozhang Wang wangg...@gmail.com wrote: Could you let me know the process of reproducing this issue? Guozhang On Thu, Sep 5, 2013 at 5:04 PM, Rajasekar Elango rela...@salesforce.comwrote: Yes guozhang Sent from my iPhone On Sep 5, 2013, at 7:53 PM, Guozhang Wang wangg...@gmail.com wrote: Hi Rajasekar, Is auto.create.topics.enable set to true in your target cluster? Guozhang On Thu, Sep 5, 2013 at 4:39 PM, Rajasekar Elango rela...@salesforce.com wrote: We having issues that mirormaker not longer replicate newly created topics. It continues to replicate data for existing topics and but new topics doesn't get created
Re: Mirror maker doesn't replicate new topics
yes, the data exists in source cluster, but not in target cluster. I can't replicate this problem in dev environment and it happens only in prod environment. I turned on debug logging, but not able to identify the problem. Basically, whenever I send data to new topic, I don't see any log messages from ProducerSendThread in mirrormaker log so they are not produced to target cluster. If I send more messages to same topic, the producer send thread kicks off and replicates the messages. But whatever messages send first time gets lost. How can I trouble shoot this problem further? Even this could be due to know issue https://issues.apache.org/jira/browse/KAFKA-1030, how can I confirm that? Is there config tweaking I can make to workaround this..? ConsumerOffsetChecks helps to track consumers. Its there any other tool we can use to track producers in mirrormaker. ? Thanks in advance for help. Thanks, Raja. On Fri, Sep 6, 2013 at 3:50 AM, Swapnil Ghike sgh...@linkedin.com wrote: Hi Rajasekar, You said that ConsumerOffsetChecker shows that new topics are successfully consumed and the lag is 0. If that's the case, can you verify that there is data on the source cluster for these new topics? If there is no data at the source, MirrorMaker will only assign consumer streams to the new topic, but the lag will be 0. This could otherwise be related to https://issues.apache.org/jira/browse/KAFKA-1030. Swapnil On 9/5/13 8:38 PM, Guozhang Wang wangg...@gmail.com wrote: Could you let me know the process of reproducing this issue? Guozhang On Thu, Sep 5, 2013 at 5:04 PM, Rajasekar Elango rela...@salesforce.comwrote: Yes guozhang Sent from my iPhone On Sep 5, 2013, at 7:53 PM, Guozhang Wang wangg...@gmail.com wrote: Hi Rajasekar, Is auto.create.topics.enable set to true in your target cluster? Guozhang On Thu, Sep 5, 2013 at 4:39 PM, Rajasekar Elango rela...@salesforce.com wrote: We having issues that mirormaker not longer replicate newly created topics. It continues to replicate data for existing topics and but new topics doesn't get created on target cluster. ConsumerOffsetTracker shows that new topics are successfully consumed and Lag is 0. But those topics doesn't get created in target cluster. I also don't see mbeans for this new topic under kafka.producer.ProducerTopicMetrics.topic namemetric. In logs I see warning for NotLeaderForPatition. but don't see major error. What else can we look to troubleshoot this further. -- Thanks, Raja. -- -- Guozhang -- -- Guozhang -- Thanks, Raja.
Re: Kafka Monitoring
Thanks a lot Jun. This is very helpful. Thanks, Raja. On Thu, Sep 5, 2013 at 1:12 AM, Jun Rao jun...@gmail.com wrote: Updated the doc at http://kafka.apache.org/documentation.html#monitoring Hopefully that answers your questions. Thanks, Jun On Tue, Sep 3, 2013 at 11:16 PM, Vadim Keylis vkeylis2...@gmail.com wrote: Good evening. I have read through section of monitoring. I tried to map each section to corresponding JMX attribute. I will appreciate if you answer a few questions bellow. Thanks so much in advance, Vadim What this JMX kafka.controller:type=KafkaController,name=ActiveControllerCount for? The rate of data in and out of the cluster and the number of messages written Which jmx attributes should I monitor? Since I should alert on this What are acceptable changes? What are not? The log flush rate and the time taken to flush the log kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs Which attribute I should be watching and what acceptable deviation change before I should alert The number of partitions that have replicas that are down or have fallen behind and are underreplicated. Is this the JMX kafka.cluster:type=Partition,name=buypets-0-UnderReplicated that will show replicas that are down? Unclean leader elections. This shouldn't happen. kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec. I assume that should always be 0 and if its not 0 we have problem. Number of partitions each node is the leader for. Which JMX attribute(s) monitors this? Leader elections: we track each time this happens and how long it took: kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs Any changes to the ISR Which JMX attribute I should monitor for this? Should I alert on this? What are reasonable changes? Which are not? The number of produce requests waiting on replication to report back Which JMX attribute I should monitor for this? Should I alert on this? What are reasonable changes? Which are not? The number of fetch requests waiting on data to arrive Which JMX attribute I should monitor for this? Should I alert on this? What are reasonable changes? Which are not? -- Thanks, Raja.
Mirror maker doesn't replicate new topics
We having issues that mirormaker not longer replicate newly created topics. It continues to replicate data for existing topics and but new topics doesn't get created on target cluster. ConsumerOffsetTracker shows that new topics are successfully consumed and Lag is 0. But those topics doesn't get created in target cluster. I also don't see mbeans for this new topic under kafka.producer.ProducerTopicMetrics.topic namemetric. In logs I see warning for NotLeaderForPatition. but don't see major error. What else can we look to troubleshoot this further. -- Thanks, Raja.
Re: Mirror maker doesn't replicate new topics
Yes guozhang Sent from my iPhone On Sep 5, 2013, at 7:53 PM, Guozhang Wang wangg...@gmail.com wrote: Hi Rajasekar, Is auto.create.topics.enable set to true in your target cluster? Guozhang On Thu, Sep 5, 2013 at 4:39 PM, Rajasekar Elango rela...@salesforce.comwrote: We having issues that mirormaker not longer replicate newly created topics. It continues to replicate data for existing topics and but new topics doesn't get created on target cluster. ConsumerOffsetTracker shows that new topics are successfully consumed and Lag is 0. But those topics doesn't get created in target cluster. I also don't see mbeans for this new topic under kafka.producer.ProducerTopicMetrics.topic namemetric. In logs I see warning for NotLeaderForPatition. but don't see major error. What else can we look to troubleshoot this further. -- Thanks, Raja. -- -- Guozhang
Re: Number of file handles increases indefinitely in producer if broker host is unresolvable
I can easily reproduce this with console producer, If I run console producer with right hostname and if broker is not running, the console producer will exit after three tries. But If I run console producer with unresolvable broker, it throws below exception and continues to wait for user input, every time I enter new message, it opens socket and file handle count keeps increasing.. Here is Exception in producer ERROR fetching topic metadata for topics [Set(test-1378245487417)] from broker [ArrayBuffer(id:0,host:localhost1,port:6667)] failed (kafka.utils.Utils$) kafka.common.KafkaException: fetching topic metadata for topics [Set(test-1378245487417)] from broker [ArrayBuffer(id:0,host:localhost1,port:6667)] failed at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:51) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) at kafka.utils.Utils$.swallow(Utils.scala:186) at kafka.utils.Logging$class.swallowError(Logging.scala:105) at kafka.utils.Utils$.swallowError(Utils.scala:45) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) at scala.collection.immutable.Stream.foreach(Stream.scala:526) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) Caused by: java.nio.channels.UnresolvedAddressException at sun.nio.ch.Net.checkAddress(Net.java:30) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:487) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:59) at kafka.producer.SyncProducer.connect(SyncProducer.scala:151) at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:166) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:73) at kafka.producer.SyncProducer.send(SyncProducer.scala:117) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:37) ... 12 more On Tue, Sep 3, 2013 at 9:29 PM, Neha Narkhede neha.narkh...@gmail.comwrote: Interesting. What errors/exceptions do you see in the producer logs? Thanks, Neha On Tue, Sep 3, 2013 at 3:28 PM, Rajasekar Elango rela...@salesforce.com wrote: We found a issue that if broker host is un resolvable, the number of file handle keep increasing for every message we produce and eventually it uses up all available files handles in operating system. If broker itself is not running and broker host name is resolvable, open file handles count stays flat. lsof output shows number of these open file handles continue to grow for every message we produce. java 19631relango 81u sock0,6 0t0 196966526 can't identify protocol Is this a bug is producer API..? What is best way to self protect our self ? -- Thanks, Raja. -- Thanks, Raja.
Re: Kafka-0.8.0-beta1-src Has ObjectName starting with Double Quotes
We have the same problem, it doesn't work with restful JMX console jiminixhttps://code.google.com/p/jminix/. Is it possible to change kafka to expose mbeans without quotes? Thanks, Raja. On Wed, Sep 4, 2013 at 10:30 AM, Neha Narkhede neha.narkh...@gmail.comwrote: I had the same problem and could not use jmxterm to inspect jmx beans due to this issue. But we tried escaping the quotes and it works with our internal monitoring system as well as JmxTool that ships with Kafka. Thanks, Neha On Sep 4, 2013 7:16 AM, Monika Garg monika.g...@impetus.co.in wrote: Hi, Kafka-0.8.0-beta1-src is having doublequotes in the start of Objects name obtained from jConsole.Due to this I am not able to use jmxTrans to monitor my kafka-0.8.0 cluster. Please help in solving the issue. Regards Monika Garg Associate Software Engineer Impetus Infotech (India) Pvt. Ltd. D-40, Sector-59, Noida - 201307, UP (O) +91-120-4363300 x 2858 (M) +91-8588075977 www.impetus.comhttp://www.impetus.com/ NOTE: This message may contain information that is confidential, proprietary, privileged or otherwise protected by law. The message is intended solely for the named addressee. If received in error, please destroy and notify the sender. Any use of this email is prohibited when received in error. Impetus does not represent, warrant and/or guarantee, that the integrity of this communication has been maintained nor that the communication is free of errors, virus, interception or interference. -- Thanks, Raja.
Re: Number of file handles increases indefinitely in producer if broker host is unresolvable
Sure. Thanks Neha. Created Issue: https://issues.apache.org/jira/browse/KAFKA-1041 On Wed, Sep 4, 2013 at 10:32 AM, Neha Narkhede neha.narkh...@gmail.comwrote: Ideally if the producer runs into any error, it should close the previous socket and open a new one. Seems like that is not happening here. I will take a closer look at this today. Do you mind filing a bug? Thanks, Neha On Sep 4, 2013 7:23 AM, Rajasekar Elango rela...@salesforce.com wrote: I can easily reproduce this with console producer, If I run console producer with right hostname and if broker is not running, the console producer will exit after three tries. But If I run console producer with unresolvable broker, it throws below exception and continues to wait for user input, every time I enter new message, it opens socket and file handle count keeps increasing.. Here is Exception in producer ERROR fetching topic metadata for topics [Set(test-1378245487417)] from broker [ArrayBuffer(id:0,host:localhost1,port:6667)] failed (kafka.utils.Utils$) kafka.common.KafkaException: fetching topic metadata for topics [Set(test-1378245487417)] from broker [ArrayBuffer(id:0,host:localhost1,port:6667)] failed at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:51) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) at kafka.utils.Utils$.swallow(Utils.scala:186) at kafka.utils.Logging$class.swallowError(Logging.scala:105) at kafka.utils.Utils$.swallowError(Utils.scala:45) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) at scala.collection.immutable.Stream.foreach(Stream.scala:526) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) Caused by: java.nio.channels.UnresolvedAddressException at sun.nio.ch.Net.checkAddress(Net.java:30) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:487) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:59) at kafka.producer.SyncProducer.connect(SyncProducer.scala:151) at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:166) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:73) at kafka.producer.SyncProducer.send(SyncProducer.scala:117) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:37) ... 12 more On Tue, Sep 3, 2013 at 9:29 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Interesting. What errors/exceptions do you see in the producer logs? Thanks, Neha On Tue, Sep 3, 2013 at 3:28 PM, Rajasekar Elango rela...@salesforce.com wrote: We found a issue that if broker host is un resolvable, the number of file handle keep increasing for every message we produce and eventually it uses up all available files handles in operating system. If broker itself is not running and broker host name is resolvable, open file handles count stays flat. lsof output shows number of these open file handles continue to grow for every message we produce. java 19631relango 81u sock0,6 0t0 196966526 can't identify protocol Is this a bug is producer API..? What is best way to self protect our self ? -- Thanks, Raja. -- Thanks, Raja. -- Thanks, Raja.
Re: Mirrormaker stopped consuming
Thanks Neha, I did not take a thread dump before restarting, will get it when it happens again. We are using 16 Gigs of jvm heap. Do you have a recommendation on jvm GC options.? Thanks, Raja. On Tue, Sep 3, 2013 at 12:26 PM, Neha Narkhede neha.narkh...@gmail.comwrote: 2013-09-01 05:59:27,792 [main-EventThread] INFO (org.I0Itec.zkclient.ZkClient) - zookeeper state changed (Disconnected) 2013-09-01 05:59:27,692 [main-SendThread( mandm-zookeeper-asg.data.sfdc.net:2181)] INFO (org.apache.zookeeper. ClientCnxn) - Client session timed out, have not heard from server in 4002ms for sessionid 0x140c603da5b0032, closing socket connection and attempting reconnect This indicates that your mirror maker and/or your zookeeper cluster is GCing for long periods of time. I have observed that if client session timed out happens too many times, the client tends to lose zookeeper watches. This is a potential bug in zookeeper. If this happens, your mirror maker instance might not rebalance correctly and will start losing data. You mentioned consumption/production stopped on your mirror maker, could you please take a thread dump and point us to it? Meanwhile, you might want to fix the GC pauses. Thanks, Neha On Tue, Sep 3, 2013 at 8:59 AM, Rajasekar Elango rela...@salesforce.com wrote: We found that mirrormaker stopped consuming and producing over the week end (09/01). Just seeing Client session timed out messages in mirrormaker log. I restarted to it today 09/03 to resume processing. Here is the logs line in reverse order. 2013-09-03 14:20:40,918 [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor] INFO (kafka.utils.VerifiableProperties) - Verifying properties 2013-09-03 14:20:40,877 [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor] INFO (kafka.consumer.ZookeeperConsumerConnector) - [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506], begin rebalancing consumer mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506 try #1 2013-09-03 14:20:38,877 [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor] INFO (kafka.consumer.ZookeeperConsumerConnector) - [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506], Committing all offsets after clearing the fetcher queues 2013-09-03 14:20:38,877 [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor] INFO (kafka.consumer.ZookeeperConsumerConnector) - [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506], Cleared the data chunks in all the consumer message iterators 2013-09-03 14:20:38,877 [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor] INFO (kafka.consumer.ZookeeperConsumerConnector) - [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506], Cleared all relevant queues for this fetcher 2013-09-03 14:20:38,877 [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor] INFO (kafka.consumer.ConsumerFetcherManager) - [ConsumerFetcherManager-1378218012760] All connections stopped 2013-09-03 14:20:38,877 [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor] INFO (kafka.consumer.ConsumerFetcherManager) - [ConsumerFetcherManager-1378218012760] Stopping all fetchers 2013-09-03 14:20:38,877 [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor] INFO (kafka.consumer.ConsumerFetcherManager) - [ConsumerFetcherManager-1378218012760] Stopping leader finder thread 2013-09-03 14:20:38,877 [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor] INFO (kafka.consumer.ZookeeperConsumerConnector) - [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506], Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered 2013-09-03 14:20:38,876 [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor] INFO (kafka.consumer.ZookeeperConsumerConnector) - [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506], end rebalancing consumer mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506 try #0 2013-09-01 05:59:29,069 [main-SendThread( mandm-zookeeper-asg.data.sfdc.net:2181)] INFO (org.apache.zookeeper.ClientCnxn) - Socket connection established to mandm-zookeeper-asg.data.sfdc.net/10.228.48.38:2181, initiating session 2013-09-01 05:59:29,069 [main-SendThread( mandm-zookeeper-asg.data.sfdc.net:2181)] INFO (org.apache.zookeeper.ClientCnxn) - Opening socket connection to server mandm-zookeeper-asg.data.sfdc.net/10.228.48.38:2181 2013-09-01 05:59:27,792 [main-EventThread] INFO (org.I0Itec.zkclient.ZkClient) - zookeeper state
Number of file handles increases indefinitely in producer if broker host is unresolvable
We found a issue that if broker host is un resolvable, the number of file handle keep increasing for every message we produce and eventually it uses up all available files handles in operating system. If broker itself is not running and broker host name is resolvable, open file handles count stays flat. lsof output shows number of these open file handles continue to grow for every message we produce. java 19631relango 81u sock0,6 0t0 196966526 can't identify protocol Is this a bug is producer API..? What is best way to self protect our self ? -- Thanks, Raja.
Re: Num of streams for consumers using TopicFilter.
Yeah. The actual bottleneck is actually number of topics that match the topic filter. Num of streams is going be shared between all topics it's consuming from. I thought about following ideas to work around this. (I am basically referring to mirrormaker consumer in examples). Option 1). Instead of running one mirrormaker process with topic filter .+, We can start multiple mirrormaker process with topic filter matching each topic (Eg: mirrormaker1 = whitelist topic1.* , mirrormaker2 = whitelist topic2.* etc) But this adds some operations overhead to start and manage multiple processes on the host. Option 2) Modify mirrormaker code to support list of whitelist filters and it should create message streams for each filter (call createMessageStreamsByFilter for each filter). What would be your recommendation..? If adding feature to mirrormaker is worth kafka, we can do option 2. Thanks, Raja. On Fri, Aug 30, 2013 at 10:34 AM, Jun Rao jun...@gmail.com wrote: Right, but if you set #partitions in each topic to 16, you can use a total of 16 streams. Thanks, Jun On Thu, Aug 29, 2013 at 9:08 PM, Rajasekar Elango rela...@salesforce.com wrote: With option 1) I can't really use 8 streams in each consumer, If I do only one consumer seem to be doing all work. So I had to actually use total 8 streams with 4 for each consumer. On Fri, Aug 30, 2013 at 12:01 AM, Jun Rao jun...@gmail.com wrote: The drawback of 2), as you said is no auto failover. I was suggesting that you use 16 partitions. Then you can use option 1) with 8 streams in each consumer. Thanks, Jun On Thu, Aug 29, 2013 at 8:51 PM, Rajasekar Elango rela...@salesforce.com wrote: Hi Jun, If you read my previous posts, based on current re balancing logic, if we consumer from topic filter, consumer actively use all streams. Can you provide your recommendation of option 1 vs option 2 in my previous post? Thanks, Raja. On Thu, Aug 29, 2013 at 11:42 PM, Jun Rao jun...@gmail.com wrote: You can always use more partitions to get more parallelism in the consumers. Thanks, Jun On Thu, Aug 29, 2013 at 12:44 PM, Rajasekar Elango rela...@salesforce.comwrote: So what is best way to load balance multiple consumers consuming from topic filter. Let's say we have 4 topics with 8 partitions and 2 consumers. Option 1) To load balance consumers, we can set num.streams=4 so that both consumers split 8 partitions. but can only use half of consumer streams. Option 2) Configure mutually exclusive topic filter regex such that 2 topics will match consumer1 and 2 topics will match consumer2. Now we can set num.streams=8 and fully utilize consumer streams. I believe this will improve performance, but if consumer dies, we will not get any data from the topic used by that consumer. What would be your recommendation? Thanks, Raja. On Thu, Aug 29, 2013 at 12:42 PM, Neha Narkhede neha.narkh...@gmail.com wrote: 2) When I started mirrormaker with num.streams=16, looks like 16 consumer threads were created, but only 8 are showing up as active as owner in consumer offset tracker and all topics/partitions are distributed between 8 consumer threads. This is because currently the consumer rebalancing process of assigning partitions to consumer streams is at a per topic level. Unless you have at least one topic with 16 partitions, the remaining 8 threads will not do any work. This is not ideal and we want to look into a better rebalancing algorithm. Though it is a big change and we prefer doing it as part of the consumer client rewrite. Thanks, Neha On Thu, Aug 29, 2013 at 8:03 AM, Rajasekar Elango rela...@salesforce.com wrote: So my understanding is num of active streams that a consumer can utilize is number of partitions in topic. This is fine if we consumer from specific topic. But if we consumer from TopicFilter, I thought consumer should able to utilize (number of topics that match filter * number of partitions in topic) . But looks like number of streams that consumer can use is limited by just number if partitions in topic although it's consuming from multiple topic. Here what I observed with 1 mirrormaker consuming from whitelist '.+'. The white list matches 5 topics and each topic has 8 partitions. I used consumer offset checker to look at owner of each
Num of streams for consumers using TopicFilter.
So my understanding is num of active streams that a consumer can utilize is number of partitions in topic. This is fine if we consumer from specific topic. But if we consumer from TopicFilter, I thought consumer should able to utilize (number of topics that match filter * number of partitions in topic) . But looks like number of streams that consumer can use is limited by just number if partitions in topic although it's consuming from multiple topic. Here what I observed with 1 mirrormaker consuming from whitelist '.+'. The white list matches 5 topics and each topic has 8 partitions. I used consumer offset checker to look at owner of each/topic partition. 1) When I started mirrormaker with num.streams=8, all topics/partitions are distributed between 8 consumer threads. 2) When I started mirrormaker with num.streams=16, looks like 16 consumer threads were created, but only 8 are showing up as active as owner in consumer offset tracker and all topics/partitions are distributed between 8 consumer threads. So this could be bottleneck for consumers as although we partitioned topic, if we are consuming from topic filter it can't utilize much of parallelism with num of streams. Am i missing something, is there a way to make cosumers/mirrormakers to utilize more number of active streams? -- Thanks, Raja.
Re: Getting LeaderNotAvailableException in console producer after increasing partitions from 4 to 16.
Created JIRA https://issues.apache.org/jira/browse/KAFKA-1035 and attached patch to it. Please review. On Wed, Aug 28, 2013 at 1:11 PM, Guozhang Wang wangg...@gmail.com wrote: I think this patch can be made in trunk. You can mark it as 0.8.1 Guozhang On Wednesday, August 28, 2013, Rajasekar Elango rela...@salesforce.com wrote: Guozhang , *The documentation says I need to work off of trunk. Can you confirm If I should be working in trunk or different branch.* * * *Thanks,* *Raja.* On Tue, Aug 27, 2013 at 8:33 PM, Guozhang Wang wangg...@gmail.com wrote: Cool! You can follow the process of creating a JIRA here: http://kafka.apache.org/contributing.html And submit patch here: https://cwiki.apache.org/confluence/display/KAFKA/Git+Workflow It will be great if you can also add an entry for this issue in FAQ since I think this is a common question: https://cwiki.apache.org/confluence/display/KAFKA/FAQ Guozhang On Tue, Aug 27, 2013 at 2:38 PM, Rajasekar Elango rela...@salesforce.com wrote: Thanks Guozhang, Changing max retry to 5 worked. Since I am changing console producer code, I can also submit patch adding both message.send.max.retries and retry.backoff.ms to console producer. Can you let me know process for submitting patch? Thanks, Raja. On Tue, Aug 27, 2013 at 4:03 PM, Guozhang Wang wangg...@gmail.com wrote: Hello Rajasekar, The remove fetcher log entry is normal under addition of partitions, since they indicate that some leader changes have happened so brokers are closing the fetchers to the old leaders. I just realized that the console Producer does not have the message.send.max.retries options yet. Could you file a JIRA for this and I will followup to add this option? As for now you can hard modify the default value from 3 to a larger number. Guozhang On Tue, Aug 27, 2013 at 12:37 PM, Rajasekar Elango rela...@salesforce.comwrote: Thanks Neha Guozhang, When I ran StateChangeLogMerger, I am seeing this message repeated 16 times for each partition: [2013-08-27 12:30:02,535] INFO [ReplicaFetcherManager on broker 1] Removing fetcher for partition [test-60,13] (kafka.server.ReplicaFetcherManager) [2013-08-27 12:30:02,536] INFO [Log Manager on Broker 1] Created log for partition [test-60,13] in /home/relango/dev/mandm/kafka/main/target/dist/mandm-kafka/kafka-data. (kafka.log.LogManager) I am also seeing .log and .index files created for this topic in data dir. Also list topic command shows leaders, replicas and isrs for all partitions. Do you still think increasing num of retries would help or is it some other issue..? Also console Producer doesn't seem to have option to set num of retries. Is there a way to configure num of retries for console producer ? Thanks, Raja. On Tue, Aug 27, 2013 at 12:52 PM, Neha Narkhede neha.narkh...@gmail.com wrote: As Guozhang said, your producer might give up sooner than the leader election completes for the new topic. To confirm if your producer gave up too soon, you can run the state-- Thanks, Raja. -- -- Guozhang -- Thanks, Raja.
Re: Num of streams for consumers using TopicFilter.
Hi Jun, If you read my previous posts, based on current re balancing logic, if we consumer from topic filter, consumer actively use all streams. Can you provide your recommendation of option 1 vs option 2 in my previous post? Thanks, Raja. On Thu, Aug 29, 2013 at 11:42 PM, Jun Rao jun...@gmail.com wrote: You can always use more partitions to get more parallelism in the consumers. Thanks, Jun On Thu, Aug 29, 2013 at 12:44 PM, Rajasekar Elango rela...@salesforce.comwrote: So what is best way to load balance multiple consumers consuming from topic filter. Let's say we have 4 topics with 8 partitions and 2 consumers. Option 1) To load balance consumers, we can set num.streams=4 so that both consumers split 8 partitions. but can only use half of consumer streams. Option 2) Configure mutually exclusive topic filter regex such that 2 topics will match consumer1 and 2 topics will match consumer2. Now we can set num.streams=8 and fully utilize consumer streams. I believe this will improve performance, but if consumer dies, we will not get any data from the topic used by that consumer. What would be your recommendation? Thanks, Raja. On Thu, Aug 29, 2013 at 12:42 PM, Neha Narkhede neha.narkh...@gmail.com wrote: 2) When I started mirrormaker with num.streams=16, looks like 16 consumer threads were created, but only 8 are showing up as active as owner in consumer offset tracker and all topics/partitions are distributed between 8 consumer threads. This is because currently the consumer rebalancing process of assigning partitions to consumer streams is at a per topic level. Unless you have at least one topic with 16 partitions, the remaining 8 threads will not do any work. This is not ideal and we want to look into a better rebalancing algorithm. Though it is a big change and we prefer doing it as part of the consumer client rewrite. Thanks, Neha On Thu, Aug 29, 2013 at 8:03 AM, Rajasekar Elango rela...@salesforce.com wrote: So my understanding is num of active streams that a consumer can utilize is number of partitions in topic. This is fine if we consumer from specific topic. But if we consumer from TopicFilter, I thought consumer should able to utilize (number of topics that match filter * number of partitions in topic) . But looks like number of streams that consumer can use is limited by just number if partitions in topic although it's consuming from multiple topic. Here what I observed with 1 mirrormaker consuming from whitelist '.+'. The white list matches 5 topics and each topic has 8 partitions. I used consumer offset checker to look at owner of each/topic partition. 1) When I started mirrormaker with num.streams=8, all topics/partitions are distributed between 8 consumer threads. 2) When I started mirrormaker with num.streams=16, looks like 16 consumer threads were created, but only 8 are showing up as active as owner in consumer offset tracker and all topics/partitions are distributed between 8 consumer threads. So this could be bottleneck for consumers as although we partitioned topic, if we are consuming from topic filter it can't utilize much of parallelism with num of streams. Am i missing something, is there a way to make cosumers/mirrormakers to utilize more number of active streams? -- Thanks, Raja. -- Thanks, Raja. -- Thanks, Raja.
Re: Num of streams for consumers using TopicFilter.
With option 1) I can't really use 8 streams in each consumer, If I do only one consumer seem to be doing all work. So I had to actually use total 8 streams with 4 for each consumer. On Fri, Aug 30, 2013 at 12:01 AM, Jun Rao jun...@gmail.com wrote: The drawback of 2), as you said is no auto failover. I was suggesting that you use 16 partitions. Then you can use option 1) with 8 streams in each consumer. Thanks, Jun On Thu, Aug 29, 2013 at 8:51 PM, Rajasekar Elango rela...@salesforce.com wrote: Hi Jun, If you read my previous posts, based on current re balancing logic, if we consumer from topic filter, consumer actively use all streams. Can you provide your recommendation of option 1 vs option 2 in my previous post? Thanks, Raja. On Thu, Aug 29, 2013 at 11:42 PM, Jun Rao jun...@gmail.com wrote: You can always use more partitions to get more parallelism in the consumers. Thanks, Jun On Thu, Aug 29, 2013 at 12:44 PM, Rajasekar Elango rela...@salesforce.comwrote: So what is best way to load balance multiple consumers consuming from topic filter. Let's say we have 4 topics with 8 partitions and 2 consumers. Option 1) To load balance consumers, we can set num.streams=4 so that both consumers split 8 partitions. but can only use half of consumer streams. Option 2) Configure mutually exclusive topic filter regex such that 2 topics will match consumer1 and 2 topics will match consumer2. Now we can set num.streams=8 and fully utilize consumer streams. I believe this will improve performance, but if consumer dies, we will not get any data from the topic used by that consumer. What would be your recommendation? Thanks, Raja. On Thu, Aug 29, 2013 at 12:42 PM, Neha Narkhede neha.narkh...@gmail.com wrote: 2) When I started mirrormaker with num.streams=16, looks like 16 consumer threads were created, but only 8 are showing up as active as owner in consumer offset tracker and all topics/partitions are distributed between 8 consumer threads. This is because currently the consumer rebalancing process of assigning partitions to consumer streams is at a per topic level. Unless you have at least one topic with 16 partitions, the remaining 8 threads will not do any work. This is not ideal and we want to look into a better rebalancing algorithm. Though it is a big change and we prefer doing it as part of the consumer client rewrite. Thanks, Neha On Thu, Aug 29, 2013 at 8:03 AM, Rajasekar Elango rela...@salesforce.com wrote: So my understanding is num of active streams that a consumer can utilize is number of partitions in topic. This is fine if we consumer from specific topic. But if we consumer from TopicFilter, I thought consumer should able to utilize (number of topics that match filter * number of partitions in topic) . But looks like number of streams that consumer can use is limited by just number if partitions in topic although it's consuming from multiple topic. Here what I observed with 1 mirrormaker consuming from whitelist '.+'. The white list matches 5 topics and each topic has 8 partitions. I used consumer offset checker to look at owner of each/topic partition. 1) When I started mirrormaker with num.streams=8, all topics/partitions are distributed between 8 consumer threads. 2) When I started mirrormaker with num.streams=16, looks like 16 consumer threads were created, but only 8 are showing up as active as owner in consumer offset tracker and all topics/partitions are distributed between 8 consumer threads. So this could be bottleneck for consumers as although we partitioned topic, if we are consuming from topic filter it can't utilize much of parallelism with num of streams. Am i missing something, is there a way to make cosumers/mirrormakers to utilize more number of active streams? -- Thanks, Raja. -- Thanks, Raja. -- Thanks, Raja. -- Thanks, Raja.
Re: Changing the number of partitions after a topic is created
Hi Jun, It's been some time since you last post, is this patch available now. Also we are doing following manual steps to update existing topic to use new partitions. 1) stop all zookeepers 2) stop all kafka brokers 3) clean data dir or zookeepers and kafka 4) start zookeepers and kafka. This involves both downtime and loosing data. Is there a better way to update existing topics to use new partitions until patch is available..? -- Thanks, Raja.
Re: Getting LeaderNotAvailableException in console producer after increasing partitions from 4 to 16.
Thanks, This is small fix to ConsoleProducer.scala only. Will use 0.8 branch. Thanks, Raja. On Wed, Aug 28, 2013 at 12:49 PM, Neha Narkhede neha.narkh...@gmail.comwrote: Rajasekar, We are trying to minimize the number of patches in 0.8 to critical bug fixes or broken tooling. If the patch involves significant code changes, we would encourage taking it on trunk. If you want to just fix the console producer to take the retry argument, I would think it is small enough to consider taking it on 0.8 branch since it affects the usability of the console producer. Thanks, Neha On Wed, Aug 28, 2013 at 8:36 AM, Rajasekar Elango rela...@salesforce.com wrote: Guozhang , *The documentation says I need to work off of trunk. Can you confirm If I should be working in trunk or different branch.* * * *Thanks,* *Raja.* On Tue, Aug 27, 2013 at 8:33 PM, Guozhang Wang wangg...@gmail.com wrote: Cool! You can follow the process of creating a JIRA here: http://kafka.apache.org/contributing.html And submit patch here: https://cwiki.apache.org/confluence/display/KAFKA/Git+Workflow It will be great if you can also add an entry for this issue in FAQ since I think this is a common question: https://cwiki.apache.org/confluence/display/KAFKA/FAQ Guozhang On Tue, Aug 27, 2013 at 2:38 PM, Rajasekar Elango rela...@salesforce.com wrote: Thanks Guozhang, Changing max retry to 5 worked. Since I am changing console producer code, I can also submit patch adding both message.send.max.retries and retry.backoff.ms to console producer. Can you let me know process for submitting patch? Thanks, Raja. On Tue, Aug 27, 2013 at 4:03 PM, Guozhang Wang wangg...@gmail.com wrote: Hello Rajasekar, The remove fetcher log entry is normal under addition of partitions, since they indicate that some leader changes have happened so brokers are closing the fetchers to the old leaders. I just realized that the console Producer does not have the message.send.max.retries options yet. Could you file a JIRA for this and I will followup to add this option? As for now you can hard modify the default value from 3 to a larger number. Guozhang On Tue, Aug 27, 2013 at 12:37 PM, Rajasekar Elango rela...@salesforce.comwrote: Thanks Neha Guozhang, When I ran StateChangeLogMerger, I am seeing this message repeated 16 times for each partition: [2013-08-27 12:30:02,535] INFO [ReplicaFetcherManager on broker 1] Removing fetcher for partition [test-60,13] (kafka.server.ReplicaFetcherManager) [2013-08-27 12:30:02,536] INFO [Log Manager on Broker 1] Created log for partition [test-60,13] in /home/relango/dev/mandm/kafka/main/target/dist/mandm-kafka/kafka-data. (kafka.log.LogManager) I am also seeing .log and .index files created for this topic in data dir. Also list topic command shows leaders, replicas and isrs for all partitions. Do you still think increasing num of retries would help or is it some other issue..? Also console Producer doesn't seem to have option to set num of retries. Is there a way to configure num of retries for console producer ? Thanks, Raja. On Tue, Aug 27, 2013 at 12:52 PM, Neha Narkhede neha.narkh...@gmail.com wrote: As Guozhang said, your producer might give up sooner than the leader election completes for the new topic. To confirm if your producer gave up too soon, you can run the state change log merge tool for this topic and see when the leader election finished for all partitions ./bin/kafka-run-class.sh kafka.tools.StateChangeLogMerger --logs location to all state change logs --topic topic Note that this tool requires you to give the state change logs for all brokers in the cluster. Thanks, Neha On Tue, Aug 27, 2013 at 9:45 AM, Guozhang Wang wangg...@gmail.com wrote: Hello Rajasekar, In 0.8 producers keep a cache of the partition - leader_broker_id map which is used to determine to which brokers should the messages be sent. After new partitions are added, the cache on the producer has not populated yet hence it will throw this exception. The producer will then try to refresh its cache by asking the brokers who are the leaders of these new partitions that I do not know of before. The brokers at the beginning also do not know this information, and will only get this information from
Getting LeaderNotAvailableException in console producer after increasing partitions from 4 to 16.
Hello everyone, We recently increased number of partitions from 4 to 16 and after that console producer mostly fails with LeaderNotAvailableException and exits after 3 tries: Here is last few lines of console producer log: No partition metadata for topic test-41 due to kafka.common.LeaderNotAvailableException}] for topic [test-41]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2013-08-27 08:29:30,271] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test-41 (kafka.producer.async.DefaultEventHandler) [2013-08-27 08:29:30,271] INFO Back off for 100 ms before retrying send. Remaining retries = 0 (kafka.producer.async.DefaultEventHandler) [2013-08-27 08:29:30,372] INFO Secure sockets for data transfer is enabled (kafka.producer.SyncProducerConfig) [2013-08-27 08:29:30,372] INFO Fetching metadata from broker id:0,host:localhost,port:6667,secure:true with correlation id 8 for 1 topic(s) Set(test-41) (kafka.client.ClientUtils$) [2013-08-27 08:29:30,373] INFO begin ssl handshake for localhost/ 127.0.0.1:6667//127.0.0.1:36640 (kafka.security.SSLSocketChannel) [2013-08-27 08:29:30,375] INFO finished ssl handshake for localhost/ 127.0.0.1:6667//127.0.0.1:36640 (kafka.security.SSLSocketChannel) [2013-08-27 08:29:30,375] INFO Connected to localhost:6667:true for producing (kafka.producer.SyncProducer) [2013-08-27 08:29:30,380] INFO Disconnecting from localhost:6667:true (kafka.producer.SyncProducer) [2013-08-27 08:29:30,381] INFO Secure sockets for data transfer is enabled (kafka.producer.SyncProducerConfig) [2013-08-27 08:29:30,381] ERROR Failed to send requests for topics test-41 with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler) kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) at kafka.producer.Producer.send(Producer.scala:74) at kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:168) at kafka.producer.ConsoleProducer.main(ConsoleProducer.scala) [2013-08-27 08:29:30,383] INFO Shutting down producer (kafka.producer.Producer) [2013-08-27 08:29:30,384] INFO Closing all sync producers (kafka.producer.ProducerPool) Also, this happens only for new topics (we have auto.create.topic set to true), If retry sending message to existing topic, it works fine. Is there any tweaking I need to do to broker or to producer to scale based on number of partitions? -- Thanks in advance for help, Raja.
Re: Getting LeaderNotAvailableException in console producer after increasing partitions from 4 to 16.
Thanks Neha Guozhang, When I ran StateChangeLogMerger, I am seeing this message repeated 16 times for each partition: [2013-08-27 12:30:02,535] INFO [ReplicaFetcherManager on broker 1] Removing fetcher for partition [test-60,13] (kafka.server.ReplicaFetcherManager) [2013-08-27 12:30:02,536] INFO [Log Manager on Broker 1] Created log for partition [test-60,13] in /home/relango/dev/mandm/kafka/main/target/dist/mandm-kafka/kafka-data. (kafka.log.LogManager) I am also seeing .log and .index files created for this topic in data dir. Also list topic command shows leaders, replicas and isrs for all partitions. Do you still think increasing num of retries would help or is it some other issue..? Also console Producer doesn't seem to have option to set num of retries. Is there a way to configure num of retries for console producer ? Thanks, Raja. On Tue, Aug 27, 2013 at 12:52 PM, Neha Narkhede neha.narkh...@gmail.comwrote: As Guozhang said, your producer might give up sooner than the leader election completes for the new topic. To confirm if your producer gave up too soon, you can run the state change log merge tool for this topic and see when the leader election finished for all partitions ./bin/kafka-run-class.sh kafka.tools.StateChangeLogMerger --logs location to all state change logs --topic topic Note that this tool requires you to give the state change logs for all brokers in the cluster. Thanks, Neha On Tue, Aug 27, 2013 at 9:45 AM, Guozhang Wang wangg...@gmail.com wrote: Hello Rajasekar, In 0.8 producers keep a cache of the partition - leader_broker_id map which is used to determine to which brokers should the messages be sent. After new partitions are added, the cache on the producer has not populated yet hence it will throw this exception. The producer will then try to refresh its cache by asking the brokers who are the leaders of these new partitions that I do not know of before. The brokers at the beginning also do not know this information, and will only get this information from controller which will only propagation the leader information after the leader elections have all been finished. If you set num.retries to 3 then it is possible that producer gives up too soon before the leader info ever propagated to producers, hence to producers also. Could you try to increase producer.num.retries and see if the producer can eventually succeed in re-trying? Guozhang On Tue, Aug 27, 2013 at 8:53 AM, Rajasekar Elango rela...@salesforce.com wrote: Hello everyone, We recently increased number of partitions from 4 to 16 and after that console producer mostly fails with LeaderNotAvailableException and exits after 3 tries: Here is last few lines of console producer log: No partition metadata for topic test-41 due to kafka.common.LeaderNotAvailableException}] for topic [test-41]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2013-08-27 08:29:30,271] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test-41 (kafka.producer.async.DefaultEventHandler) [2013-08-27 08:29:30,271] INFO Back off for 100 ms before retrying send. Remaining retries = 0 (kafka.producer.async.DefaultEventHandler) [2013-08-27 08:29:30,372] INFO Secure sockets for data transfer is enabled (kafka.producer.SyncProducerConfig) [2013-08-27 08:29:30,372] INFO Fetching metadata from broker id:0,host:localhost,port:6667,secure:true with correlation id 8 for 1 topic(s) Set(test-41) (kafka.client.ClientUtils$) [2013-08-27 08:29:30,373] INFO begin ssl handshake for localhost/ 127.0.0.1:6667//127.0.0.1:36640 (kafka.security.SSLSocketChannel) [2013-08-27 08:29:30,375] INFO finished ssl handshake for localhost/ 127.0.0.1:6667//127.0.0.1:36640 (kafka.security.SSLSocketChannel) [2013-08-27 08:29:30,375] INFO Connected to localhost:6667:true for producing (kafka.producer.SyncProducer) [2013-08-27 08:29:30,380] INFO Disconnecting from localhost:6667:true (kafka.producer.SyncProducer) [2013-08-27 08:29:30,381] INFO Secure sockets for data transfer is enabled (kafka.producer.SyncProducerConfig) [2013-08-27 08:29:30,381] ERROR Failed to send requests for topics test-41 with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler) kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) at kafka.producer.Producer.send(Producer.scala:74) at kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:168) at kafka.producer.ConsoleProducer.main(ConsoleProducer.scala) [2013-08-27 08:29:30,383] INFO Shutting down producer (kafka.producer.Producer) [2013-08-27 08:29:30,384] INFO Closing all sync producers
Re: Getting LeaderNotAvailableException in console producer after increasing partitions from 4 to 16.
Thanks Guozhang, Changing max retry to 5 worked. Since I am changing console producer code, I can also submit patch adding both message.send.max.retries and retry.backoff.ms to console producer. Can you let me know process for submitting patch? Thanks, Raja. On Tue, Aug 27, 2013 at 4:03 PM, Guozhang Wang wangg...@gmail.com wrote: Hello Rajasekar, The remove fetcher log entry is normal under addition of partitions, since they indicate that some leader changes have happened so brokers are closing the fetchers to the old leaders. I just realized that the console Producer does not have the message.send.max.retries options yet. Could you file a JIRA for this and I will followup to add this option? As for now you can hard modify the default value from 3 to a larger number. Guozhang On Tue, Aug 27, 2013 at 12:37 PM, Rajasekar Elango rela...@salesforce.comwrote: Thanks Neha Guozhang, When I ran StateChangeLogMerger, I am seeing this message repeated 16 times for each partition: [2013-08-27 12:30:02,535] INFO [ReplicaFetcherManager on broker 1] Removing fetcher for partition [test-60,13] (kafka.server.ReplicaFetcherManager) [2013-08-27 12:30:02,536] INFO [Log Manager on Broker 1] Created log for partition [test-60,13] in /home/relango/dev/mandm/kafka/main/target/dist/mandm-kafka/kafka-data. (kafka.log.LogManager) I am also seeing .log and .index files created for this topic in data dir. Also list topic command shows leaders, replicas and isrs for all partitions. Do you still think increasing num of retries would help or is it some other issue..? Also console Producer doesn't seem to have option to set num of retries. Is there a way to configure num of retries for console producer ? Thanks, Raja. On Tue, Aug 27, 2013 at 12:52 PM, Neha Narkhede neha.narkh...@gmail.com wrote: As Guozhang said, your producer might give up sooner than the leader election completes for the new topic. To confirm if your producer gave up too soon, you can run the state change log merge tool for this topic and see when the leader election finished for all partitions ./bin/kafka-run-class.sh kafka.tools.StateChangeLogMerger --logs location to all state change logs --topic topic Note that this tool requires you to give the state change logs for all brokers in the cluster. Thanks, Neha On Tue, Aug 27, 2013 at 9:45 AM, Guozhang Wang wangg...@gmail.com wrote: Hello Rajasekar, In 0.8 producers keep a cache of the partition - leader_broker_id map which is used to determine to which brokers should the messages be sent. After new partitions are added, the cache on the producer has not populated yet hence it will throw this exception. The producer will then try to refresh its cache by asking the brokers who are the leaders of these new partitions that I do not know of before. The brokers at the beginning also do not know this information, and will only get this information from controller which will only propagation the leader information after the leader elections have all been finished. If you set num.retries to 3 then it is possible that producer gives up too soon before the leader info ever propagated to producers, hence to producers also. Could you try to increase producer.num.retries and see if the producer can eventually succeed in re-trying? Guozhang On Tue, Aug 27, 2013 at 8:53 AM, Rajasekar Elango rela...@salesforce.com wrote: Hello everyone, We recently increased number of partitions from 4 to 16 and after that console producer mostly fails with LeaderNotAvailableException and exits after 3 tries: Here is last few lines of console producer log: No partition metadata for topic test-41 due to kafka.common.LeaderNotAvailableException}] for topic [test-41]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2013-08-27 08:29:30,271] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test-41 (kafka.producer.async.DefaultEventHandler) [2013-08-27 08:29:30,271] INFO Back off for 100 ms before retrying send. Remaining retries = 0 (kafka.producer.async.DefaultEventHandler) [2013-08-27 08:29:30,372] INFO Secure sockets for data transfer is enabled (kafka.producer.SyncProducerConfig) [2013-08-27 08:29:30,372] INFO Fetching metadata from broker id:0,host:localhost,port:6667,secure:true with correlation id 8 for 1 topic(s) Set(test-41) (kafka.client.ClientUtils$) [2013-08-27 08:29:30,373] INFO begin ssl handshake for localhost/ 127.0.0.1:6667//127.0.0.1:36640 (kafka.security.SSLSocketChannel) [2013-08-27 08:29:30,375] INFO finished ssl handshake for localhost/ 127.0.0.1:6667
Fwd: Tuning mirror maker performance
Thanks Jun, What trouble shooting steps can we do to identify if bottleneck is with consuming or producing..? Does changing anything in log4j configuration or a jmx mbeans provide insight into it..? Does Metadata refresh interval affect picking up new partitions for only existing topic or it affect picking up any new topics..? Thanks, Raja. -- Forwarded message -- From: Jun Rao jun...@gmail.com Date: Fri, Aug 23, 2013 at 12:08 AM Subject: Re: Tuning mirror maker performance To: users@kafka.apache.org users@kafka.apache.org You have to determine whether the bottleneck is in the consumer or the producer. To improve the performance of the latter, you can increase the # of total consumer streams. # streams is capped by total # partitions. So, you may need to increase the # of partitions. To improve the performance of the latter, you can (a) increase the batch size in async mode and/or (b) run more instances of producers. Metadata refresh interval is configurable. It's mainly for the producer to pick up newly available partitions. Thanks, Jun On Thu, Aug 22, 2013 at 1:44 PM, Rajasekar Elango rela...@salesforce.com wrote: I am trying to tune mirrormaker configurations based on this doc https://cwiki.apache.org/confluence/display/KAFKA/Kafka+mirroring+(MirrorMaker)#Kafkamirroring%28MirrorMaker%29-Consumerandsourceclustersocketbuffersizes and would like know your recommendations. Our configuration: We are doing inter datacenter replication with 5 brokers in source and destination DC and 2 mirrormakers doing replication. We have about 4 topics with 4 partitions each. I have been consumerOffsetChecker to analysis lag based on tuning. 1. num.streams : - We have set num.streams=2 so that 4 partitions will be shared between 2 mirrormaker. Increasing num.streams more than this did not improve any performance, is this correct? 2. num.producers:- We initially set num.producers = 4 (assuming one producer thread per topic), then we bumped num.producers = 16, but did not see any improvement in performance..? Is this correct..? How do we determine optimum value for num.producers ? 3. *socket.buffersize : *We initially had default values for these, then I changed socket.send.buffer.bytes on source broker, socket.receive.buffer.bytes, fetch.message.max.bytes on mirrormaker consumer properties, socket.receive.buffer.bytes, socket.request.max.bytes on destination broker all to 1024*1024*1024(1073741824) . This did improve the performance, but I could not get Lag to 100. Here is how our lag looks like after above changes: Group Topic Pid Offset logSize Lag Owner mirrormakerProd FunnelProto0 554704539 554717088 12549 mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-0 mirrormakerProd FunnelProto1 547370573 547383136 12563 mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-1 mirrormakerProd FunnelProto2 553124930 553125742 812 mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-0 mirrormakerProd FunnelProto3 552990834 552991650 816 mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-1 mirrormakerProd agent 0 35438 35440 2 mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-0 mirrormakerProd agent 1 35447 35448 1 mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-1 mirrormakerProd agent 2 35375 35375 0 mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-0 mirrormakerProd agent 3 35336 35336 0 mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-1 mirrormakerProd internal_metrics 0 1930852823 1930917418 64595 mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-0 mirrormakerProd internal_metrics 1 1937237324 1937301841 64517 mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-1 mirrormakerProd internal_metrics 2 1945894901 1945904067 9166 mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-0 mirrormakerProd internal_metrics 3 1946906932 1946915928 8996 mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-1 mirrormakerProd jmx0 485270038 485280882 10844 mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-0 mirrormakerProd jmx1 486363914 486374759 10845 mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-1 mirrormakerProd jmx
Re: Differences in size of data replicated by mirror maker
Thanks Guazhang, Jun, Yes we doing gzip compression and that should be reason for difference in disk usage. I had a typo that the size is actually 91G in source cluster.So 25G/91G ratio makes sense for compression. Thanks, Raja. On Thu, Aug 22, 2013 at 7:00 PM, Guozhang Wang wangg...@gmail.com wrote: When you state the numbers, are they the same across instances in the cluster, meaning that Topic-0 would have 910*5 GB in source cluster and 25*5 GB in target cluster? Another possibility is that MirrorMaker uses compression on the producer side, but I would be surprised if the compression rate could be 25/910. Guozhang On Thu, Aug 22, 2013 at 3:48 PM, Rajasekar Elango rela...@salesforce.com wrote: Yes, both source and target clusters have 5 brokers in cluster. Sent from my iPhone On Aug 22, 2013, at 6:11 PM, Guozhang Wang wangg...@gmail.com wrote: Hello Rajasekar, Are the size of the source cluster and target cluster the same? Guozhang On Thu, Aug 22, 2013 at 2:14 PM, Rajasekar Elango rela...@salesforce.comwrote: Hi, We are using mirrormaker to replicate data between two kafka clusters. I am seeing huge difference in size of log in data dir between the broker in source cluster vs broker in destination cluster: For eg: Size of ~/data/Topic-0/ is about 910 G in source broker, but only its only 25G in destination broker. I see segmented log files (~500 M) is created for about every 2 or 3 mins in source brokers, but I see segmented log files is created for about every 25 mins in destination broker. I verified mirrormaker is doing fine using consumer offset checker, not much lag, offsets are incrementing. I also verified that topics/partitions are not under replicated in both source and target cluster. What is the reason for this difference in disk usage? -- Thanks, Raja. -- -- Guozhang -- -- Guozhang -- Thanks, Raja.
Tuning mirror maker performance
I am trying to tune mirrormaker configurations based on this doc https://cwiki.apache.org/confluence/display/KAFKA/Kafka+mirroring+(MirrorMaker)#Kafkamirroring%28MirrorMaker%29-Consumerandsourceclustersocketbuffersizes and would like know your recommendations. Our configuration: We are doing inter datacenter replication with 5 brokers in source and destination DC and 2 mirrormakers doing replication. We have about 4 topics with 4 partitions each. I have been consumerOffsetChecker to analysis lag based on tuning. 1. num.streams : - We have set num.streams=2 so that 4 partitions will be shared between 2 mirrormaker. Increasing num.streams more than this did not improve any performance, is this correct? 2. num.producers:- We initially set num.producers = 4 (assuming one producer thread per topic), then we bumped num.producers = 16, but did not see any improvement in performance..? Is this correct..? How do we determine optimum value for num.producers ? 3. *socket.buffersize : *We initially had default values for these, then I changed socket.send.buffer.bytes on source broker, socket.receive.buffer.bytes, fetch.message.max.bytes on mirrormaker consumer properties, socket.receive.buffer.bytes, socket.request.max.bytes on destination broker all to 1024*1024*1024(1073741824) . This did improve the performance, but I could not get Lag to 100. Here is how our lag looks like after above changes: Group Topic Pid Offset logSize Lag Owner mirrormakerProd FunnelProto0 554704539 554717088 12549 mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-0 mirrormakerProd FunnelProto1 547370573 547383136 12563 mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-1 mirrormakerProd FunnelProto2 553124930 553125742 812 mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-0 mirrormakerProd FunnelProto3 552990834 552991650 816 mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-1 mirrormakerProd agent 0 35438 35440 2 mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-0 mirrormakerProd agent 1 35447 35448 1 mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-1 mirrormakerProd agent 2 35375 35375 0 mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-0 mirrormakerProd agent 3 35336 35336 0 mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-1 mirrormakerProd internal_metrics 0 1930852823 1930917418 64595 mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-0 mirrormakerProd internal_metrics 1 1937237324 1937301841 64517 mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-1 mirrormakerProd internal_metrics 2 1945894901 1945904067 9166 mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-0 mirrormakerProd internal_metrics 3 1946906932 1946915928 8996 mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-1 mirrormakerProd jmx0 485270038 485280882 10844 mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-0 mirrormakerProd jmx1 486363914 486374759 10845 mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-1 mirrormakerProd jmx2 491783842 491784826 984 mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-0 mirrormakerProd jmx3 485675629 485676643 1014 mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-1 In mirrormaker logs, I see topic metadata is fetched after every 10mins and connection reestablished with producers for producing. Is this normal? If it's continuously producing, why does it need to reconnect to destination brokers for producing.? What else can we tune to bring lag 100 ..? This is just small set of data we are currently testing, the real production traffic will be very large. How can compute optimum configuration as data traffic increases.? Thanks for help, Thanks, Raja.
Re: Tuning mirror maker performance
Hi, I am trying to tune mirrormaker configurations based on this doc https://cwiki.apache.org/confluence/display/KAFKA/Kafka+mirroring+(MirrorMaker)#Kafkamirroring%28MirrorMaker%29-Consumerandsourceclustersocketbuffersizes and would like know your recommendations. Our configuration: We are doing inter datacenter replication with 5 brokers in source and destination DC and 2 mirrormakers doing replication. We have about 4 topics with 4 partitions each. I have been consumerOffsetChecker to analysis lag based on tuning. 1. num.streams : - We have set num.streams=2 so that 4 partitions will be shared between 2 mirrormaker. Increasing num.streams more than this did not improve any performance, is this correct? 2. num.producers:- We initially set num.producers = 4 (assuming one producer thread per topic), then we bumped num.producers = 16, but did not see any improvement in performance..? Is this correct..? How do we determine optimum value for num.producers ? 3. *socket.buffersize : *We initially had default values for these, then I changed socket.send.buffer.bytes on source broker, socket.receive.buffer.bytes, fetch.message.max.bytes on mirrormaker consumer properties, socket.receive.buffer.bytes, socket.request.max.bytes on destination broker all to 1024*1024*1024(1073741824) . This did improve the performance, but I could not get Lag to 100. Here is how our lag looks like after above changes: Group Topic Pid Offset logSize Lag Owner mirrormakerProd FunnelProto0 554704539 554717088 12549 mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-0 mirrormakerProd FunnelProto1 547370573 547383136 12563 mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-1 mirrormakerProd FunnelProto2 553124930 553125742 812 mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-0 mirrormakerProd FunnelProto3 552990834 552991650 816 mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-1 mirrormakerProd agent 0 35438 35440 2 mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-0 mirrormakerProd agent 1 35447 35448 1 mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-1 mirrormakerProd agent 2 35375 35375 0 mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-0 mirrormakerProd agent 3 35336 35336 0 mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-1 mirrormakerProd internal_metrics 0 1930852823 1930917418 64595 mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-0 mirrormakerProd internal_metrics 1 1937237324 1937301841 64517 mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-1 mirrormakerProd internal_metrics 2 1945894901 1945904067 9166 mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-0 mirrormakerProd internal_metrics 3 1946906932 1946915928 8996 mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-1 mirrormakerProd jmx0 485270038 485280882 10844 mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-0 mirrormakerProd jmx1 486363914 486374759 10845 mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-1 mirrormakerProd jmx2 491783842 491784826 984 mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-0 mirrormakerProd jmx3 485675629 485676643 1014 mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-1 In mirrormaker logs, I see topic metadata is fetched after every 10mins and connection reestablished with producers for producing. Is this normal? If it's continuously producing, why does it need to reconnect to destination brokers for producing.? What else can we tune to bring lag 100 ..? This is just small set of data we are currently testing, the real production traffic will be very large. How can compute optimum configuration as data traffic increases.? Thanks for help, Thanks, Raja. On Thu, Aug 22, 2013 at 4:44 PM, Rajasekar Elango rela...@salesforce.comwrote: I am trying to tune mirrormaker configurations based on this doc https://cwiki.apache.org/confluence/display/KAFKA/Kafka+mirroring+(MirrorMaker)#Kafkamirroring%28MirrorMaker%29-Consumerandsourceclustersocketbuffersizes and would like know your recommendations. Our configuration: We are doing inter datacenter replication with 5 brokers in source and destination DC and 2 mirrormakers
Differences in size of data replicated by mirror maker
Hi, We are using mirrormaker to replicate data between two kafka clusters. I am seeing huge difference in size of log in data dir between the broker in source cluster vs broker in destination cluster: For eg: Size of ~/data/Topic-0/ is about 910 G in source broker, but only its only 25G in destination broker. I see segmented log files (~500 M) is created for about every 2 or 3 mins in source brokers, but I see segmented log files is created for about every 25 mins in destination broker. I verified mirrormaker is doing fine using consumer offset checker, not much lag, offsets are incrementing. I also verified that topics/partitions are not under replicated in both source and target cluster. What is the reason for this difference in disk usage? -- Thanks, Raja.
Re: Differences in size of data replicated by mirror maker
Yes, both source and target clusters have 5 brokers in cluster. Sent from my iPhone On Aug 22, 2013, at 6:11 PM, Guozhang Wang wangg...@gmail.com wrote: Hello Rajasekar, Are the size of the source cluster and target cluster the same? Guozhang On Thu, Aug 22, 2013 at 2:14 PM, Rajasekar Elango rela...@salesforce.comwrote: Hi, We are using mirrormaker to replicate data between two kafka clusters. I am seeing huge difference in size of log in data dir between the broker in source cluster vs broker in destination cluster: For eg: Size of ~/data/Topic-0/ is about 910 G in source broker, but only its only 25G in destination broker. I see segmented log files (~500 M) is created for about every 2 or 3 mins in source brokers, but I see segmented log files is created for about every 25 mins in destination broker. I verified mirrormaker is doing fine using consumer offset checker, not much lag, offsets are incrementing. I also verified that topics/partitions are not under replicated in both source and target cluster. What is the reason for this difference in disk usage? -- Thanks, Raja. -- -- Guozhang