Re: FW: Mirrormaker stops consuming

2016-02-08 Thread Rajasekar Elango
Debbie,

In our  case, the issue was with custom SSL implementation and we need to
fix a bug related to buffer over flow handling. This may not be applicable
to you, if you are using open source kafka. The best way to troubleshoot
this problem would be to take couple of thread dumps with few seconds delay
and look at stack trace. This will help you identify the code block that it
is spinning through when it is stuck.

Thanks,
Raja.

On Fri, Feb 5, 2016 at 10:28 PM, Yu, Debbie  wrote:

> Hi Raja,
>
> I’m not used to using a mailing list, so I thought I’d try sending to you
> directly instead.
> We were wondering if you had fixed the problem you saw before with the
> mirror maker,
> Since we seem to be seeing the same problem now.
> The difference is we’re not using the custom SSLSocketChannel discussed in
> the thread.
> Any kind of info you could share would be much appreciated.
>
> Thanks,
> Debbie
>
> On 2/5/16, 7:17 PM, "Yu, Debbie"  wrote:
>
> >Hi Raja,
> >
> >We seem to be encountering the same problem you were seeing where our
> >producer thread becomes blocked for some reason.
> >We also see our producer queue is full,
> >and for some reason, the producer isn¹t pulling from the queue and sending
> >to our brokers.
> >We were wondering if you might be able to share how you fixed your problem
> >if you fixed it.
> >
> >Thanks,
> >Debbie
> >
>
>


-- 
Thanks,
Raja.


Does Kafka recover all data if node is reimaged

2015-10-12 Thread Rajasekar Elango
I was wondering if a kafka broker node get reimaged and all data is wiped
off, Will kafka recover all data on node from replication?

-- 
Thanks,
Raja.


Re: Painfully slow kafka recovery / cluster breaking

2015-08-26 Thread Rajasekar Elango
Thanks for updates Jörg. It's very useful.

Thanks,
Raja.

On Wed, Aug 26, 2015 at 8:58 AM, Jörg Wagner joerg.wagn...@1und1.de wrote:

 Just a little feedback on our issue(s) as FYI to whoever is interested.

 It basically all boiled down to the configuration of topics. We noticed
 while performance testing (or trying to ;) ) that the partitioning was most
 critical to us.

 We originally followed the linkedin recommendation and used 600 partitions
 for our main topic. Testing that, the replicas always went out of sync
 within a short timeframe, leaders could not be determined and the cluster
 failed horribly (even writing several hundred lines of logs within a
 1/100th second).

 So for our 27 log.dirs (= disks) we went with 27 partitions. And voilá: we
 could use kafka with around 35k requests per second (via an application
 accessing it). Kafka stayed stable.

 Currently we are testing with 81 partitions (27*3) and it's running well.
 No issues so far, replicas in sync and up to 50k requests per second.

 Cheers

 On 25.08.2015 15:18, Jörg Wagner wrote:

 So okay, this is a little embarassing but the core of the issue was that
 max open files was not set correctly for kafka. It was not an oversight,
 but a few things together caused that the system configuration was not
 changed correctly, resulting in the default value.

 No wonder that kafka behaved strangely everytime we had enough data in
 log.dirs and connections.

 Anyhow, that doesn't seem to be the last problem. The brokers get in sync
 with each other (within an expected time frame), everything seems fine.

 After a little stress testing, the kafka cluster falls apart (around 40k
 requests/s). Using topics describe we can see leaders missing (e.g. from
 1,2,3 only 1 and 3 are leading partitions, although zookeeper lists all
 under /brokers/ids). This ultimately results in partitions being
 unavailable and massive leader not local spam in the logs.

 What are we missing?

 Cheers
 Jörg

 On 24.08.2015 10:31, Jörg Wagner wrote:

 Thank you for your answers.

 @Raja
 No, it also seems to happen if we stop kafka completely clean.

 @Gwen
 I was testing the situation with num.replica.fetchers set higher. If you
 say that was the right direction, I will try it again. What would be a good
 setting? I went with 50 which seemed reasonable (having 27 single disks).
 How long should it take to get complete ISR?

 Regarding no Data flowing into kafka: I just wanted to point out that
 the setup is not yet live. So we can completely stop the usage of kafka,
 and it should possibly get into sync faster without a steady stream of new
 messages.
 Kafka itself is working fine during this on the other hand, just
 missing ISR, hence redundancy. If I stop another broker (clean!) though, it
 tends to happen that the expected number of partitions have Leader -1;
 which should not happen as I assume.

 Cheers
 Jörg

 On 21.08.2015 19:18, Rajasekar Elango wrote:

 We are seeing same behavior in 5 broker cluster when losing one broker.

 In our case, we are losing broker as well as kafka data dir.

 Jörg Wagner,

 Are you losing just broker or kafka data dir as well?

 Gwen,

 We have also observed that latency of messages arriving at consumers
 goes
 up by 10x when we lose a broker. Is it because the broker is busy with
 handling failed fetch requests and loaded with more data thats slowing
 down
 the writes ? Also, if we had simply lost the broker not the data dir,
 impact would have been minimal?

 Thanks,
 Raja.



 On Fri, Aug 21, 2015 at 12:31 PM, Gwen Shapira g...@confluent.io
 wrote:

 By default, num.replica.fetchers = 1. This means only one thread per
 broker
 is fetching data from leaders. This means it make take a while for the
 recovering machine to catch up and rejoin the ISR.

 If you have bandwidth to spare, try increasing this value.

 Regarding no data flowing into kafka - If you have 3 replicas and
 only
 one is down, I'd expect writes to continue to the new leader even if
 one
 replica is not in the ISR yet. Can you see that a new leader is
 elected?

 Gwen

 On Fri, Aug 21, 2015 at 6:50 AM, Jörg Wagner joerg.wagn...@1und1.de
 wrote:

 Hey everyone,

 here's my crosspost from irc.

 Our setup:
 3 kafka 0.8.2 brokers with zookeeper, powerful hardware (20 cores, 27
 logdisks each). We use a handful of topics, but only one topic is

 utilized

 heavily. It features a replication of 2 and 600 partitions.

 Our issue:
 If one kafka was down, it takes very long ( from 1 to 10 hours) to
 show
 that all partitions have all isr again. This seems to heavily depend
 on

 the

 amount of data which is in the log.dirs (I have configured 27 threads
 -

 one

 for each dir featuring a own drive).
 This all takes this long while there is NO data flowing into kafka.

 We seem to be missing something critical here. It might be some option

 set

 wrong, or are we thinking wrong and it's not critical to have the

 replicas

 in sync.

 Any pointers would be great

Re: Painfully slow kafka recovery

2015-08-21 Thread Rajasekar Elango
We are seeing same behavior in 5 broker cluster when losing one broker.

In our case, we are losing broker as well as kafka data dir.

Jörg Wagner,

Are you losing just broker or kafka data dir as well?

Gwen,

We have also observed that latency of messages arriving at consumers goes
up by 10x when we lose a broker. Is it because the broker is busy with
handling failed fetch requests and loaded with more data thats slowing down
the writes ? Also, if we had simply lost the broker not the data dir,
impact would have been minimal?

Thanks,
Raja.



On Fri, Aug 21, 2015 at 12:31 PM, Gwen Shapira g...@confluent.io wrote:

 By default, num.replica.fetchers = 1. This means only one thread per broker
 is fetching data from leaders. This means it make take a while for the
 recovering machine to catch up and rejoin the ISR.

 If you have bandwidth to spare, try increasing this value.

 Regarding no data flowing into kafka - If you have 3 replicas and only
 one is down, I'd expect writes to continue to the new leader even if one
 replica is not in the ISR yet. Can you see that a new leader is elected?

 Gwen

 On Fri, Aug 21, 2015 at 6:50 AM, Jörg Wagner joerg.wagn...@1und1.de
 wrote:

  Hey everyone,
 
  here's my crosspost from irc.
 
  Our setup:
  3 kafka 0.8.2 brokers with zookeeper, powerful hardware (20 cores, 27
  logdisks each). We use a handful of topics, but only one topic is
 utilized
  heavily. It features a replication of 2 and 600 partitions.
 
  Our issue:
  If one kafka was down, it takes very long ( from 1 to 10 hours) to show
  that all partitions have all isr again. This seems to heavily depend on
 the
  amount of data which is in the log.dirs (I have configured 27 threads -
 one
  for each dir featuring a own drive).
  This all takes this long while there is NO data flowing into kafka.
 
  We seem to be missing something critical here. It might be some option
 set
  wrong, or are we thinking wrong and it's not critical to have the
 replicas
  in sync.
 
  Any pointers would be great.
 
  Cheers
  Jörg
 




-- 
Thanks,
Raja.


Re: java.lang.OutOfMemoryError: Java heap space

2015-08-06 Thread Rajasekar Elango
Hi Sourabh,

We have seen this error, if kafka broker was running with SSL on Consumer
is trying to consumer in plaintext mode. Are you using high level consumer
or SimpleConsumer..? If you using using SimpleConsumer, pull latest code
from my repo https://github.com/relango/kafka/commits/kafka_security_0.8.2
and
pass secure parameters to SimpleConsumer constructor.

Thanks,
Raja.

On Thu, Aug 6, 2015 at 9:01 PM, Sourabh Chandak sourabh3...@gmail.com
wrote:

 Hi,

 I am trying to integrate
 https://github.com/relango/kafka/tree/kafka_security_0.8.2 with Spark
 Streaming using the SimpleConsumer. I know that the SSL patch is on its way
 but need to set up a prototype hence went ahead with Raja's version.

 So when I run my spark job to retrieve data from 1 topic with just 1
 partition I get a OutOfMemoryError.

 Here is the stack trace:

 Exception in thread main java.lang.OutOfMemoryError: Java heap space
 at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57)
 at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
 at

 kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
 at

 kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
 at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
 at

 kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
 at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
 at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:91)
 at

 kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:80)
 at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:103)
 at

 org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:126)
 at

 org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:125)
 at

 org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:346)
 at

 org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342)
 at

 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
 at org.apache.spark.streaming.kafka.KafkaCluster.org

 $apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
 at

 org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125)
 at

 org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
 at

 org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:310)

 Need help from experts to resolve this.

 Thanks,
 Sourabh




-- 
Thanks,
Raja.


Re: How to read in batch using HighLevel Consumer?

2015-08-04 Thread Rajasekar Elango
Here is an example on what sharninder suggested
http://ingest.tips/2014/10/12/kafka-high-level-consumer-frequently-missing-pieces/

Thanks,
Raja.

On Tue, Aug 4, 2015 at 12:01 PM, Sharninder sharnin...@gmail.com wrote:

 You can't. Kafka is essentially a queue, so you always read messages one
 by one. What you can do is disable auto offset commit, read 100 messages,
 process them and then manually commit offset.

 --
 Sharninder

  On 04-Aug-2015, at 9:07 pm, shahab shahab.mok...@gmail.com wrote:
 
  Hi,
 
  While we the producer can put data as batch in kafka server,  I couldn't
  find any API (or any document) saying how we can fetch data as batch from
  Kafka ?
  Even when data is placed as batch in kafka server, still using High Level
  consumer I can only read one by one, and I can not specify. for example,
  read 100 items at once!
 
  Is this correct observation? or I am missing something?
 
  best,
  /Shahab




-- 
Thanks,
Raja.


Reimaging zookeeper host

2015-06-30 Thread Rajasekar Elango
We are running 3-node zookeeper cluster and we need to re-image (re-install
os) on zookeeper host. Is it ok to lose zookeeper dataDir during upgrade or
should back up zookeeper dataDir and restore when zookeeper comes backup
online? Will kafka and consumers work fine if we bring up zookeeper with
emtpy dataDir with just myid file?



-- 
Thanks,
Raja.


Re: how to modify offsets stored in Kafka in 0.8.2.1 version?

2015-06-19 Thread Rajasekar Elango
Hi Marina,

Check slide 32 in this presentation
http://www.slideshare.net/jjkoshy/offset-management-in-kafka.

Hope this helps.

Thanks,
Raja.

On Fri, Jun 19, 2015 at 9:43 AM, Marina ppi...@yahoo.com.invalid wrote:

 Thanks, Stevo, for the quick reply,
 Yes, I understand how to do this programmatically - but I would like to be
 able to do this manually from a command line, just as before I was able to
 do this in the Zookeeper shell. I don't want to write and run a Java app
 just to set an offset :)

 [unless, of course, this is the only way to do this.]

 thanks!
 Marina




 - Original Message -
 From: Stevo Slavić ssla...@gmail.com
 To: users@kafka.apache.org; Marina ppi...@yahoo.com
 Cc:
 Sent: Friday, June 19, 2015 9:33 AM
 Subject: Re: how to modify offsets stored in Kafka in 0.8.2.1 version?

 Hello Marina,

 There's Kafka API to fetch and commit offsets

 https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
 - maybe it will work for you.

 Kind regards,
 Stevo Slavic.


 On Fri, Jun 19, 2015 at 3:23 PM, Marina ppi...@yahoo.com.invalid wrote:

  Hi,
 
  in older Kafka versions where offsets were stored in Zookeeper - I could
  manually update the value of the Zookeeper's node:
 
 
 /consumers/consumer_group_name/offsets/topic_name/partition_number/offset_value.
 
  In 0.8.2.1 - there are no values in offsets anymore, but there is a new
  topic,
  __consumer_offsets, where as I understand offsets are tracked now.
 
  the ConsumerOffsetChecker tool seems to be able to get the offsets values
  from this topic , since I see correct value running it.
  So, how do I access this info myself?
 
 
  I tried:
 
  ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic
  __consumer_offsets --from-beginning
 
  but it does not show anything
  Also, how would I change the offset? I need to do this sometimes if I
 want
  to skip/ignore some messages and just advance offset manually.
 
  thanks,
  Marina
 




-- 
Thanks,
Raja.


Mirrormaker stops consuming

2015-05-22 Thread Rajasekar Elango
We recently upgraded to kafka 0.8.2.1 and found issues with mirrormaker
that randomly stops consuming. We had to restart the mirrormaker process to
resolve the problem. This problem has occurred several times in past two
weeks.

Here is what I found in analysis:

When this problem happens:

Mirrormaker log stopped rolling (ie nothing in logs) . Last couple of
messages in mirrormaker log are ProducerSendThread producing to
destination. No errors or exceptions.

Mirrormaker consumer offset doesn't increase. ConsumerOffsetChecker shows
mirrormaker consumer offset stops incrementing.

Mirrormaker consumer MinFetch rate jmx metric drops to zero.
ConsumerTopicMetric.BytesPerSec drops to zero.

So its mirrormaker consumer should have stopped accepting new data.

Can some one provide input on how to trouble shoot this problem further and
identify root cause?

Got Thread dump before restarting, it looks ok to me, no blocked thread.
Here is thread dump output

2015-05-21 18:59:09
Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.76-b04 mixed mode):

Attach Listener daemon prio=10 tid=0x7f7248002000 nid=0x2d53 waiting
on condition [0x]
   java.lang.Thread.State: RUNNABLE

   Locked ownable synchronizers:
- None

ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-tyo.ops.sfdc.net-1431458688650-fb15f395-0-2
prio=10 tid=0x7f71e407e000 nid=0x3425 waiting on condition
[0x7f72833f2000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  0x00042cd15cc8 (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
at
java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
at
kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
at
kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:129)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:110)
at
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
at
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
at
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:110)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:110)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:110)
at kafka.utils.Utils$.inLock(Utils.scala:535)
at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:109)
at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:87)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

   Locked ownable synchronizers:
- 0x00042ea62eb0 (a
java.util.concurrent.locks.ReentrantLock$NonfairSync)

ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-tyo.ops.sfdc.net-1431458688650-fb15f395-0-3
prio=10 tid=0x7f71e407b000 nid=0x3424 waiting on condition
[0x7f7281f99000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  0x00042ccece80 (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
at
java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
at
kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
at
kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:129)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:110)
at
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
at
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
at
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:110)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:110)
at

Re: Mirrormaker stops consuming

2015-05-22 Thread Rajasekar Elango
Thanks for pointers Joel. Will look into SSLSocketChannel. Yes this was
working fine before upgrade.

If its just one producer thread stuck on write, it might affect only one
consumer thread/partition. But we found consuming stopped for all
topic/partitions. Or Is it only single  data channel shared between all
producer and consumer threads..?

Thanks,
Raja.


On Fri, May 22, 2015 at 12:12 PM, Joel Koshy jjkosh...@gmail.com wrote:

 The threaddump suggests that one of the producers
 (mirrormaker-producer-6) is blocked on write for some reason. So the
 data-channel for that producer (which sits between the consumers and
 the producer) is full which blocks the consumers from progressing.

 This appears to be in your (custom) SSLSocketChannel code. If you take
 consecutive threaddumps I'm guessing you would see the same trace. If
 this is reproducible can you do that? You can also hook up jvisualvm
 or yourkit to see which threads are active and it may well be that
 producer in a tight loop on the writeCompletely. Just to confirm you
 did not see this issue before upgrading?

 Joel

 On Fri, May 22, 2015 at 11:35:19AM -0400, Rajasekar Elango wrote:
  We recently upgraded to kafka 0.8.2.1 and found issues with mirrormaker
  that randomly stops consuming. We had to restart the mirrormaker process
 to
  resolve the problem. This problem has occurred several times in past two
  weeks.
 
  Here is what I found in analysis:
 
  When this problem happens:
 
  Mirrormaker log stopped rolling (ie nothing in logs) . Last couple of
  messages in mirrormaker log are ProducerSendThread producing to
  destination. No errors or exceptions.
 
  Mirrormaker consumer offset doesn't increase. ConsumerOffsetChecker shows
  mirrormaker consumer offset stops incrementing.
 
  Mirrormaker consumer MinFetch rate jmx metric drops to zero.
  ConsumerTopicMetric.BytesPerSec drops to zero.
 
  So its mirrormaker consumer should have stopped accepting new data.
 
  Can some one provide input on how to trouble shoot this problem further
 and
  identify root cause?
 
  Got Thread dump before restarting, it looks ok to me, no blocked thread.
  Here is thread dump output
 
  2015-05-21 18:59:09
  Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.76-b04 mixed
 mode):
 
  Attach Listener daemon prio=10 tid=0x7f7248002000 nid=0x2d53
 waiting
  on condition [0x]
 java.lang.Thread.State: RUNNABLE
 
 Locked ownable synchronizers:
  - None
 
 
 ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-tyo.ops.sfdc.net-1431458688650-fb15f395-0-2
  prio=10 tid=0x7f71e407e000 nid=0x3425 waiting on condition
  [0x7f72833f2000]
 java.lang.Thread.State: WAITING (parking)
  at sun.misc.Unsafe.park(Native Method)
  - parking to wait for  0x00042cd15cc8 (a
  java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
  at
 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
  at
 
 java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
  at
  kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
  at
 
 kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
  at
 
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:129)
  at
 
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:110)
  at
  scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
  at
  scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
  at
  scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
  at
 
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:110)
  at
 
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:110)
  at
 
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:110)
  at kafka.utils.Utils$.inLock(Utils.scala:535)
  at
 
 kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:109)
  at
  kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:87)
  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
 
 Locked ownable synchronizers:
  - 0x00042ea62eb0 (a
  java.util.concurrent.locks.ReentrantLock$NonfairSync)
 
 
 ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-tyo.ops.sfdc.net-1431458688650-fb15f395-0-3
  prio=10 tid=0x7f71e407b000 nid=0x3424 waiting on condition
  [0x7f7281f99000]
 java.lang.Thread.State: WAITING (parking)
  at sun.misc.Unsafe.park(Native Method)
  - parking to wait

Re: Kafka consumer offset checker hangs indefinitely

2015-05-14 Thread Rajasekar Elango
Hi Meghana,

We also faced similar issue and found that it returned
ConsumerCoordinatorNotAvailableCode always for one broker (server id 3) and
leader for all partitions of __consumer_offsets topic is same broker id 3.

So wiped off kafka data dir on that broker and restarted it. After that
ConsumerOffsetChecker started working. We had replication enabled so losing
kafka data dir is not big deal for us. If you can't delete complete kafka
data dir, you can try deleting just  __consumer_offsets data.

Thanks,
Raja.


On Thu, May 14, 2015 at 10:46 AM, Meghana Narasimhan 
mnarasim...@bandwidth.com wrote:

 Hi Mayuresh,
 A few more inputs that I can provide at the moment after some testing are
 as follows.
 1. The error returned by the consumer offset checker's
 ConsumerMetadataResponse is ConsumerCoordinatorNotAvailableCode. Could it
 somehow be related to the offsets being written to zookeeper and not the
 internal kafka offsets topic ?
 2. I see that the __consumer_offsets topic has been created on all the
 brokers.
 3. I also tried the steps for migrating from zookeeper to kafka offset
 topic as specified in the documentation [using the offsets.storage and
 dual.commit.enabled configs]
 4. Also based on a few other links I tried to commit offset
 using OffsetCommitRequest method and to get ConsumerMetadataResponse . But
 the OffsetCommitResponse also returned error and could not commit the
 offset successfully.

 On the other hand the producer and consumer are working fine and able to
 produce and consume data from the topic. The issue is only with the
 consumer offset checker tool.

 Thanks,
 Meghana


 On Mon, May 11, 2015 at 7:34 PM, Mayuresh Gharat 
 gharatmayures...@gmail.com
  wrote:

  Hi Meghana,
 
  Let me try this out on my cluster that has latest trunk deployed.
 
  Thanks,
 
  Mayuresh
 
  On Mon, May 11, 2015 at 1:53 PM, Meghana Narasimhan 
  mnarasim...@bandwidth.com wrote:
 
   Hi Mayuresh,
   A small update. The Kafka version I'm currently using is  2.10-0.8.2.1
  (not
   2.11 as previously mentioned). The cluster looks fine. Not sure why the
   consumer offset checker does not return a valid output and gets stuck.
  
   bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
   Topic:test   PartitionCount:3ReplicationFactor:3
   Configs:min.insync.replicas=2
   Topic: test  Partition: 0Leader: 1   Replicas: 1,2,0
 Isr:
   1,2,0
   Topic: test  Partition: 1Leader: 2   Replicas: 2,0,1
 Isr:
   1,2,0
   Topic: test  Partition: 2Leader: 0   Replicas: 0,1,2
 Isr:
   1,2,0
  
  
  
  
   On Fri, May 8, 2015 at 12:52 PM, Meghana Narasimhan 
   mnarasim...@bandwidth.com wrote:
  
Hi Mayuresh,
   
Yes, the broker is up and accepting connections. Multiple consumers
 are
consuming off topics on the broker.
Also I am seeing the issue only with this particular version (
2.11-0.8.2.1). It worked fine with the beta that I was using earlier.
   
   
On Fri, May 8, 2015 at 12:45 PM, Mayuresh Gharat 
gharatmayures...@gmail.com wrote:
   
Is X.X.X.X:9092 up and accepting connections?
I am confused aas in why is it not connecting some other broker if
connection to this broker fails. Can you check if the broker is up?
   
The way it works is the consumer will send a ConsumerMetadataRequest
  to
one
of the brokers and get the offsetmanager for its group and then
  perform
the
offset management.
   
Thanks,
   
Mayuresh
   
On Fri, May 8, 2015 at 9:22 AM, Meghana Narasimhan 
mnarasim...@bandwidth.com wrote:
   
 Hi,
 I'm using the Kafka 8.2.1 version(kafka_2.11-0.8.2.1) and the
  consumer
 offset checker hangs indefinitely and does not return any
 results. I
 enabled the debug for tools and below is the debug statements as
  seen
   on
 the stdout. Any thoughts or inputs on this will be much
 appreciated.

 command used :
 bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181
   --group
 test-consumer-group
  or
 ./kafka-consumer-offset-checker.sh --zookeeper
 broker1:2181,broker2:2181,broker3:2181 --group test-consumer-group

  DEBUG Querying X.X.X.X:9092 to locate offset manager for
 test-consumer-group. (kafka.client.ClientUtils$)
 [2015-05-08 10:23:55,090] DEBUG Consumer metadata response:
 ConsumerMetadataResponse(None,15,0) (kafka.client.ClientUtils$)
 [2015-05-08 10:23:55,091] DEBUG Query to X.X.X.X:9092 to locate
  offset
 manager for test-consumer-group failed - will retry in 3000
milliseconds.
 (kafka.client.ClientUtils$)
 [2015-05-08 10:23:58,093] DEBUG Querying X.X.X.X:9092 to locate
  offset
 manager for test-consumer-group. (kafka.client.ClientUtils$)
 [2015-05-08 10:23:58,102] DEBUG Consumer metadata response:
 ConsumerMetadataResponse(None,15,0) (kafka.client.ClientUtils$)
 [2015-05-08 10:23:58,103] DEBUG Query to X.X.X.X:9092 to locate
  

Re: How to set console consumer group ID

2015-04-22 Thread Rajasekar Elango
Yes, you pass any consumer property including group.id by having them in
property file and passing path to it using --consumer.config of consumer
consumer.

Thanks,
Raja.

On Wed, Apr 22, 2015 at 1:45 AM, Lukáš Havrlant lu...@havrlant.cz wrote:

 Hi,
 is it possible to set group ID for console consumer on command line?
 Something like

 $ bin/kafka-console-consumer.sh --groupid myGroupId

 Lukáš




-- 
Thanks,
Raja.


Re: kafka monitoring

2015-01-08 Thread Rajasekar Elango
Hi Sa Li,

You need to set environment variable $JMX_PORT to enable jmx while starting
kafka.  See to kafka-run-class.sh on how it is used. Then you can connect
to hostname:jmxport using Jconsole.

Thanks,
Raja.

On Thu, Jan 8, 2015 at 2:08 PM, Sa Li sal...@gmail.com wrote:

 Hello, All

 I understand many of you are using jmxtrans along with graphite/ganglia to
 pull out metrics, according to https://kafka.apache.org/081/ops.html,  it
 says The easiest way to see the available metrics to fire up jconsole and
 point it at a running kafka client or server; this will all browsing all
 metrics with JMX. ..

 I tried to fire up a jconsole on windows attempting to access our dev and
 production cluster which are running good,
 here is the main node of my dev:
 10.100.75.128, broker port:9092, zk port:2181

 Jconsole shows:

  New Connection
 Remote Process:

 Usage: hostname:port OR service:jmx:protocol:sap
 Username:Password:

 Sorry about my naive, I tried connect base on above ip just can't be
 connected, do I need to do something in dev server to be able to make it
 work?

 thanks

 --

 Alec Li




-- 
Thanks,
Raja.


Re: kafka monitoring system

2014-12-22 Thread Rajasekar Elango
Hi Sa Li,

You can also try jmxtrans + graphite (for charting). jmxtrans has graphite
output adapter out of the box.

Regards,
Raja.

On Mon, Dec 22, 2014 at 10:39 PM, YuanJia Li yuanjia8...@163.com wrote:

 Hi Sa Li,
 You can try to use jmxtrans+opentsdb to monitor kafka. Jmxtrans is
 collecting data with JMX and sending to opentsdb. Opentsdb is graphing and
 alerting.




 YuanJia Li





 From: Sa Li
 Date: 2014-12-23 08:41
 To: users
 Subject: kafka monitoring system
 Hi, all

 I am thinking to make a reliable monitoring system for our kafka production
 cluster. I read such from documents:

 Kafka uses Yammer Metrics for metrics reporting in both the server and the
 client. This can be configured to report stats using pluggable stats
 reporters to hook up to your monitoring system.

 The easiest way to see the available metrics to fire up jconsole and point
 it at a running kafka client or server; this will all browsing all metrics
 with JMX.

 We pay particular we do graphing and alerting on the following metrics:

 ..


 I am wondering if anyone ever use Jconsole to monitor the kafka, or anyone
 can recommend a good monitoring tool for kafka production.


 thanks


 --

 Alec Li




-- 
Thanks,
Raja.


Re: Announcing Confluent

2014-11-06 Thread Rajasekar Elango
Congrats. Wish you all the very best and success.

Thanks,
Raja.

On Thu, Nov 6, 2014 at 1:36 PM, Niek Sanders niek.sand...@gmail.com wrote:

 Congrats!

 On Thu, Nov 6, 2014 at 10:28 AM, Jay Kreps jay.kr...@gmail.com wrote:
  Hey all,
 
  I’m happy to announce that Jun Rao, Neha Narkhede and I are creating a
  company around Kafka called Confluent. We are planning on productizing
 the
  kind of Kafka-based real-time data platform we built out at LinkedIn. We
  are doing this because we think this is a really powerful idea and we
 felt
  there was a lot to do to make this idea really take root. We wanted to
 make
  that our full time mission and focus.
 
  There is a blog post that goes into a little more depth here:
  http://blog.confluent.io/
 
  LinkedIn will remain a heavy Kafka user and contributor. Combined with
 our
  additional resources from the funding of the company this should be a
  really good thing for the Kafka development effort. Especially when
  combined with the increasing contributions from the rest of the
 development
  community. This is great news, as there is a lot of work to do. We'll
 need
  to really focus on scaling this distributed development in a healthy way.
 
  One thing I do want to emphasize is that the addition of a company in the
  Kafka ecosystem won’t mean meddling with open source. Kafka will remain
  100% open source and community focused, as of course is true of any
 Apache
  project. I have been doing open source for a long time and strongly
 believe
  it is the right model for infrastructure software development.
 
  Confluent is just getting off the ground now. We left LinkedIn, raised
 some
  money, and we have an office (but no furniture yet!). None the less, f
 you
  are interested in finding out more about the company and either getting
  help with your Kafka usage or joining us to help build all this, by all
  means reach out to us, we’d love to talk.
 
  Wish us luck!
 
  -Jay




-- 
Thanks,
Raja.


Re: [DISCUSS] Kafka Security Specific Features

2014-07-31 Thread Rajasekar Elango
Can we get the info on targeted release dates for 0.8.2 release and 0.9
release for our planning purposes?

Thanks.
Raja.


On Wed, Jul 30, 2014 at 7:27 PM, Joe Stein joe.st...@stealth.ly wrote:

 The 0.8.2 release will not have the patch inside of it.  Trunk already has
 a lot inside of it as a point release.  The patch also doesn't account for
 all of the requirements that all of the stakeholders need/want for the
 feature.  Instead of releasing something that is useful but only for some
 it is better to spend the time to get it right for everyone.  We are going
 to have it in the 0.9 release (possibly also with authorization, encryption
 and more of the security features too) then.

 What we will do is keep the patch rebased against trunk and then then 0.8.2
 branch (once we get to that point) so that folks can apply it to the 0.8.2
 release and do a build from src.  When we get to that I can create a write
 or something if folks find problems doing it.

 /***
  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  http://www.stealth.ly
  Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
 /


 On Wed, Jul 30, 2014 at 7:10 PM, Calvin Lei ckp...@gmail.com wrote:

  yeah i just saw that. Looking forward to the prod release of 0.8.2
 
 
  On Wed, Jul 30, 2014 at 11:01 AM, Rajasekar Elango 
 rela...@salesforce.com
  
  wrote:
 
   We implemented security features on older snapshot version of 0.8
 kafka.
   But Joe Stein's organization rebased it to latest version of kafka
   available at https://github.com/stealthly/kafka/tree/v0.8.2_KAFKA-1477
 .
  
   Thanks,
   Raja.
  
  
   On Tue, Jul 29, 2014 at 10:54 PM, Calvin Lei ckp...@gmail.com wrote:
  
Raja,
   Which Kafka version is your security enhancement based on?
   
thanks,
Cal
   
   
On Wed, Jul 23, 2014 at 5:01 PM, Chris Neal cwn...@gmail.com
 wrote:
   
 Pramod,

 I got that same error when following the configuration from Raja's
 presentation earlier in this thread.  If you'll notice the usage
 for
   the
 console_producer.sh, it is slightly different, which is also
 slightly
 different than the scala code for the ConsoleProducer. :)

 When I changed this:

 bin/kafka-console-producer.sh --broker-list n5:9092:true --topic
 test

 to this:

 bin/kafka-console-producer.sh --broker-list n5:9092 --secure
 --client.security.file config/client.security.properties --topic
 test

 I was able to push messages to the topic, although I got a WARN
 about
   the
 property topic not being valid, even though it is required.

 Also, the Producer reported this warning to me:

 [2014-07-23 20:45:24,509] WARN Attempt to reinitialize auth context
 (kafka.network.security.SecureAuth$)

 and the broker gave me this:
 [2014-07-23 20:45:24,114] INFO begin ssl handshake for
 n5.example.com/192.168.1.144:48817//192.168.1.144:9092
 (kafka.network.security.SSLSocketChannel)
 [2014-07-23 20:45:24,374] INFO finished ssl handshake for
 n5.example.com/192.168.1.144:48817//192.168.1.144:9092
 (kafka.network.security.SSLSocketChannel)
 [2014-07-23 20:45:24,493] INFO Closing socket connection to
 n5.example.com/192.168.1.144. (kafka.network.Processor)
 [2014-07-23 20:45:24,555] INFO begin ssl handshake for
 n5.example.com/192.168.1.144:48818//192.168.1.144:9092
 (kafka.network.security.SSLSocketChannel)
 [2014-07-23 20:45:24,566] INFO finished ssl handshake for
 n5.example.com/192.168.1.144:48818//192.168.1.144:9092
 (kafka.network.security.SSLSocketChannel)

 It's like it did the SSL piece twice :)

 Subsequent puts to the topic did not exhibit this behavior though:

 root@n5[937]:~/kafka_2.10-0-8-2-0.1.0.0
  bin/kafka-console-producer.sh
 --broker-list n5:9092 --secure --client.security.file
 config/client.security.properties --topic test
 [2014-07-23 20:45:17,530] WARN Property topic is not valid
 (kafka.utils.VerifiableProperties)
 1
 [2014-07-23 20:45:24,509] WARN Attempt to reinitialize auth context
 (kafka.network.security.SecureAuth$)
 2
 3
 4

 Consuming worked with these options:

 root@n5[918]:~/kafka_2.10-0-8-2-0.1.0.0
  bin/kafka-console-consumer.sh
 --topic test --zookeeper n5:2181 --from-beginning
   --security.config.file
 config/client.security.properties
 1
 2
 3
 4
 ^CConsumed 5 messages

 I hope that helps!
 Chris


 On Tue, Jul 22, 2014 at 2:10 PM, Pramod Deshmukh 
 dpram...@gmail.com
  
 wrote:

  Anyone getting this issue. Is it something related to environment
  or
   it
 is
  the code. Producer works fine when run with secure=false (no
   security)
  mode.
 
 
  pdeshmukh$ bin/kafka-console

Re: KAFKA-1477 (authentication layer) and 0.8.2

2014-07-25 Thread Rajasekar Elango
Yes we are very much interested in getting this code merged to trunk. I can
also do testing once it's available on trunk.

Thanks,
Raja.


On Fri, Jul 25, 2014 at 12:11 PM, Joe Stein joe.st...@stealth.ly wrote:

 Hi Chris, glad to hear that even more folks are going to (want to) use the
 feature.  I didn't author the patch (Raja and Ivan did) and created the
 fork so folks could test it without much fuss.

 I just commented on the ticket to address Jun's last comment and think it
 also answers your question too.

 I know folks are using this now and other folks are looking to use it out
 of the core project.

 As long as it has a way to cause no harm when it is off I believe it really
 adds to the value Kafka brings to a number of organizations that can't use
 Kafka just because of this one thing.

 I am looking forward to being able to commit it to trunk.

 /***
  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  http://www.stealth.ly
  Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
 /


 On Fri, Jul 25, 2014 at 11:34 AM, Chris Neal cwn...@gmail.com wrote:

  Hi guys,
 
  This JIRA (https://issues.apache.org/jira/browse/KAFKA-1477) leads me to
  believe that an authentication layer implementation is planned as part of
  the 0.8.2 release.  I was wondering if this is still the case?
 
  There was an earlier thread talking about security, but there hasn't been
  activity on it in awhile.
 
  I grabbed Joe's fork and it works, but I was wondering about it getting
  merged back into the official 0.8.2 codebase, or is this more likely
  something that will be in 0.9?
 
  Thanks!
 




-- 
Thanks,
Raja.


Re: [DISCUSS] Kafka Security Specific Features

2014-07-16 Thread Rajasekar Elango
Pramod,


I presented secure kafka configuration and usage at last meet up. So hope this
video recording http://www.ustream.tv/recorded/48396701would help. You
can skip to about 59 min to jump to security talk.

Thanks,
Raja.


On Wed, Jul 16, 2014 at 5:57 PM, Pramod Deshmukh dpram...@gmail.com wrote:

 Hello Joe,

 Is there a configuration or example to test Kafka security piece?

 Thanks,

 Pramod


 On Wed, Jul 16, 2014 at 5:20 PM, Pramod Deshmukh dpram...@gmail.com
 wrote:

  Thanks Joe,
 
  This branch works. I was able to proceed. I still had to set scala
 version
  to 2.9.2 in kafka-run-class.sh.
 
 
 
  On Wed, Jul 16, 2014 at 3:57 PM, Joe Stein joe.st...@stealth.ly wrote:
 
  That is a very old branch.
 
  Here is a more up to date one
  https://github.com/stealthly/kafka/tree/v0.8.2_KAFKA-1477 (needs to be
  updated to latest trunk might have a chance to-do that next week).
 
  You should be using gradle now as per the README.
 
  /***
   Joe Stein
   Founder, Principal Consultant
   Big Data Open Source Security LLC
   http://www.stealth.ly
   Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
  /
 
 
  On Wed, Jul 16, 2014 at 3:49 PM, Pramod Deshmukh dpram...@gmail.com
  wrote:
 
   Thanks Joe for this,
  
   I cloned this branch and tried to run zookeeper but I get
  
   Error: Could not find or load main class
   org.apache.zookeeper.server.quorum.QuorumPeerMain
  
  
   I see scala version is still set to 2.8.0
  
   if [ -z $SCALA_VERSION ]; then
  
   SCALA_VERSION=2.8.0
  
   fi
  
  
  
   Then I installed sbt and scala and followed your instructions for
  different
   scala versions. I was able to bring zookeeper up but brokers fail to
  start
   with error
  
   Error: Could not find or load main class kafka.Kafka
  
   I think I am doing something wrong. Can you please help me?
  
   Our current production setup is with 2.8.0 and want to stick to it.
  
   Thanks,
  
   Pramod
  
  
   On Tue, Jun 3, 2014 at 3:57 PM, Joe Stein joe.st...@stealth.ly
 wrote:
  
Hi,I wanted to re-ignite the discussion around Apache Kafka
 Security.
This
is a huge bottleneck (non-starter in some cases) for a lot of
   organizations
(due to regulatory, compliance and other requirements). Below are my
suggestions for specific changes in Kafka to accommodate security
requirements.  This comes from what folks are doing in the wild to
workaround and implement security with Kafka as it is today and also
   what I
have discovered from organizations about their blockers. It also
  picks up
from the wiki (which I should have time to update later in the week
  based
on the below and feedback from the thread).
   
1) Transport Layer Security (i.e. SSL)
   
This also includes client authentication in addition to in-transit
   security
layer.  This work has been picked up here
https://issues.apache.org/jira/browse/KAFKA-1477 and do appreciate
  any
thoughts, comments, feedback, tomatoes, whatever for this patch.  It
  is a
pickup from the fork of the work first done here
https://github.com/relango/kafka/tree/kafka_security.
   
2) Data encryption at rest.
   
This is very important and something that can be facilitated within
  the
wire protocol. It requires an additional map data structure for the
encrypted [data encryption key]. With this map (either in your
  object
   or
in the wire protocol) you can store the dynamically generated
  symmetric
   key
(for each message) and then encrypt the data using that dynamically
generated key.  You then encrypt the encryption key using each
 public
  key
for whom is expected to be able to decrypt the encryption key to
 then
decrypt the message.  For each public key encrypted symmetric key
  (which
   is
now the encrypted [data encryption key] along with which public
 key
  it
was encrypted with for (so a map of [publicKey] =
encryptedDataEncryptionKey) as a chain.   Other patterns can be
   implemented
but this is a pretty standard digital enveloping [0] pattern with
  only 1
field added. Other patterns should be able to use that field to-do
  their
implementation too.
   
3) Non-repudiation and long term non-repudiation.
   
Non-repudiation is proving data hasn't changed.  This is often (if
 not
always) done with x509 public certificates (chained to a certificate
authority).
   
Long term non-repudiation is what happens when the certificates of
 the
certificate authority are expired (or revoked) and everything ever
  signed
(ever) with that certificate's public key then becomes no longer
   provable
as ever being authentic.  That is where RFC3126 [1] and RFC3161 [2]
  come
in (or worm drives [hardware], etc).
   
For either (or both) of these it is an operation of the encryptor to
sign/hash the data 

Re: [DISCUSS] Kafka Security Specific Features

2014-06-05 Thread Rajasekar Elango
Hi Jay,

Thanks for putting together a spec for security.

Joe,

Looks Securing zookeeper.. part has been deleted from assumptions
section. communication with zookeeper need to be secured as well to make
entire kafka cluster secure. It may or may not require changes to kafka.
But it's good to have it in spec.

I could not find a link to edit the page after login into wiki. Do I need
any special permission to make edits?

Thanks,
Raja.


On Wed, Jun 4, 2014 at 8:57 PM, Joe Stein joe.st...@stealth.ly wrote:

 I like the idea of working on the spec and prioritizing. I will update the
 wiki.

 - Joestein


 On Wed, Jun 4, 2014 at 1:11 PM, Jay Kreps jay.kr...@gmail.com wrote:

  Hey Joe,
 
  Thanks for kicking this discussion off! I totally agree that for
 something
  that acts as a central message broker security is critical feature. I
 think
  a number of people have been interested in this topic and several people
  have put effort into special purpose security efforts.
 
  Since most the LinkedIn folks are working on the consumer right now I
 think
  this would be a great project for any other interested people to take on.
  There are some challenges in doing these things distributed but it can
 also
  be a lot of fun.
 
  I think a good first step would be to get a written plan we can all agree
  on for how things should work. Then we can break things down into chunks
  that can be done independently while still aiming at a good end state.
 
  I had tried to write up some notes that summarized at least the thoughts
 I
  had had on security:
  https://cwiki.apache.org/confluence/display/KAFKA/Security
 
  What do you think of that?
 
  One assumption I had (which may be incorrect) is that although we want
 all
  the things in your list, the two most pressing would be authentication
 and
  authorization, and that was all that write up covered. You have more
  experience in this domain, so I wonder how you would prioritize?
 
  Those notes are really sketchy, so I think the first goal I would have
  would be to get to a real spec we can all agree on and discuss. A lot of
  the security stuff has a high human interaction element and needs to work
  in pretty different domains and different companies so getting this kind
 of
  review is important.
 
  -Jay
 
 
  On Tue, Jun 3, 2014 at 12:57 PM, Joe Stein joe.st...@stealth.ly wrote:
 
   Hi,I wanted to re-ignite the discussion around Apache Kafka Security.
   This
   is a huge bottleneck (non-starter in some cases) for a lot of
  organizations
   (due to regulatory, compliance and other requirements). Below are my
   suggestions for specific changes in Kafka to accommodate security
   requirements.  This comes from what folks are doing in the wild to
   workaround and implement security with Kafka as it is today and also
  what I
   have discovered from organizations about their blockers. It also picks
 up
   from the wiki (which I should have time to update later in the week
 based
   on the below and feedback from the thread).
  
   1) Transport Layer Security (i.e. SSL)
  
   This also includes client authentication in addition to in-transit
  security
   layer.  This work has been picked up here
   https://issues.apache.org/jira/browse/KAFKA-1477 and do appreciate any
   thoughts, comments, feedback, tomatoes, whatever for this patch.  It
 is a
   pickup from the fork of the work first done here
   https://github.com/relango/kafka/tree/kafka_security.
  
   2) Data encryption at rest.
  
   This is very important and something that can be facilitated within the
   wire protocol. It requires an additional map data structure for the
   encrypted [data encryption key]. With this map (either in your object
  or
   in the wire protocol) you can store the dynamically generated symmetric
  key
   (for each message) and then encrypt the data using that dynamically
   generated key.  You then encrypt the encryption key using each public
 key
   for whom is expected to be able to decrypt the encryption key to then
   decrypt the message.  For each public key encrypted symmetric key
 (which
  is
   now the encrypted [data encryption key] along with which public key
 it
   was encrypted with for (so a map of [publicKey] =
   encryptedDataEncryptionKey) as a chain.   Other patterns can be
  implemented
   but this is a pretty standard digital enveloping [0] pattern with only
 1
   field added. Other patterns should be able to use that field to-do
 their
   implementation too.
  
   3) Non-repudiation and long term non-repudiation.
  
   Non-repudiation is proving data hasn't changed.  This is often (if not
   always) done with x509 public certificates (chained to a certificate
   authority).
  
   Long term non-repudiation is what happens when the certificates of the
   certificate authority are expired (or revoked) and everything ever
 signed
   (ever) with that certificate's public key then becomes no longer
  provable
   as ever being authentic.  That is where 

Re: Spring integration?

2014-04-07 Thread Rajasekar Elango
Hi Michael,

We are using spring integration
kafkahttps://github.com/spring-projects/spring-integration-extensions/tree/master/spring-integration-kafkain
production and have been working fine. We also contributed some
features
(for eg: support for topic filter https://jira.spring.io/browse/INTEXT-77)
 to back to project. The outbound adapter has a poller to make them work
with enterprise integration patten Message Channels.  If you need more
detailed answer you could try posting your question in stack overflow.

I have also seen camel component for
kafkahttps://github.com/BreizhBeans/camel-kafka,
but havent' tried it.

Thanks,
Raja.


On Mon, Apr 7, 2014 at 10:26 PM, Michael Campbell 
michael.campb...@gmail.com wrote:

 Hello,

 My company is looking at Kafka for a backbone to microservices, and we were
 wondering if anyone had actually done anything with it and Spring
 Integration (which we are looking at also for additional things).

 The reason I ask is the one place I can find any code seems about a half
 year out of date, is targeting both an old version of Kafka (0.8-beta) and
 an older version of Scala, and the example seems ... odd to me (he has a
 poller in the outbound adapter, which I don't understand at all).

 Is anyone actually using this combination?

 Or, is there a better way to integrate with Kafka if I want to put a layer
 between my business code and the Kafka API?(Camel, perhaps?)




-- 
Thanks,
Raja.


Re: Kafka and authentication

2014-03-31 Thread Rajasekar Elango
Hi Vijay,

We implemented mutual ssl authentication in kafka for our internal use and
we have plans to it contributed back to community.  But we implemented SSL over
older snapshot of version of kafka 0.8 release. We have been busy with
other projects and haven't got chance to merge our ssl changes to latest
version
of kafka. If you are interested in looking at the changes we made this, its
available in my github fork of apache kafka (
https://github.com/relango/kafka/tree/kafka_security)

Thanks,
Raja.


On Fri, Mar 28, 2014 at 10:06 PM, Neha Narkhede neha.narkh...@gmail.comwrote:

 Hi Vijay,

 The document you pointed out has our initial thoughts on Kafka security.
 This work is still in design and discussion phase, no code has been written
 as such and we hope to pick it up in a couple months. However, if you have
 thoughts on how it should work and/or would like to contribute patches, we
 would be happy to collaborate with you.

 Thanks,
 Neha


 On Fri, Mar 28, 2014 at 4:05 PM, Vijay Ramachandran 
 vramachand...@apple.com
  wrote:

  Hi All,
 
  I was googling around for info on securing kafka. The best document I
  could find was
 https://cwiki.apache.org/confluence/display/KAFKA/Security,
  which is kind of old. It is not clear if any steps were taken after
 this
  doc was put together. Looking at the features / bug fixes in kafka also
  does not paint a clear picture. Hence this set of questions :
 
  Is there a way to make kafka authenticate a producer sending messages /
  consumer reading messages ?
  Is there a way to make kafka authenticate itself to the ZooKeeper
 ensemble
  ?
 
  Any info will be deeply appreciated
 
  Thanks
 
  Vijay




-- 
Thanks,
Raja.


Re: Spring Integration Kafka support

2014-01-13 Thread Rajasekar Elango
Hi Preetham,

We are able to successfully use spring-integration-kafka with non-zero
broker ids without any issues. Could you provide more details on what
exactly problem/error you are getting.?

Also, You  can try this examples under samples
https://github.com/spring-projects/spring-integration-extensions/tree/master/samples/kafka
to
get started quickly.

Thanks,
Raja.



On Fri, Jan 10, 2014 at 4:08 PM, Premchandra, Preetham Kukillaya 
preethampremchan...@fico.com wrote:

 Hi,
 I was doing a poc using
 https://github.com/SpringSource/spring-integration-extensions/tree/master/spring-integration-kafka.
 I figured that code is expecting the brokerid=0 and ideally this will not
 be the case if multiple brokers are connecting to the same zookeeper.

 Regards
 Preetham

 This email and any files transmitted with it are confidential, proprietary
 and intended solely for the individual or entity to whom they are
 addressed. If you have received this email in error please delete it
 immediately.




-- 
Thanks,
Raja.


Re: Producer SSL?

2013-11-15 Thread Rajasekar Elango
Hi Jonathan

We forked kafka to add SSL feature. It not part of kafka official release

Sent from my iPhone

On Nov 15, 2013, at 12:32 PM, Jonathan Hodges hodg...@gmail.com wrote:

 Hi,

 While searching the user group messages I found the following thread -
 http://grokbase.com/t/kafka/users/138vqq1x07/getting-leadernotavailableexception-in-console-producer-after-increasing-partitions-from-4-to-16.
 It shows the following stack trace with 0.8.

 [2013-08-27 08:29:30,372] INFO Fetching metadata from broker
 id:0,host:localhost,port:6667,secure:true with correlation id 8 for 1
 topic(s) Set(test-41) (kafka.client.ClientUtils$)
 [2013-08-27 08:29:30,373] INFO begin ssl handshake for localhost/
 127.0.0.1:6667//127.0.0.1:36640 (kafka.security.SSLSocketChannel)
 [2013-08-27 08:29:30,375] INFO finished ssl handshake for localhost/
 127.0.0.1:6667//127.0.0.1:36640 (kafka.security.SSLSocketChannel)
 [2013-08-27 08:29:30,375] INFO Connected to localhost:6667:true for
 producing (kafka.producer.SyncProducer)
 [2013-08-27 08:29:30,380] INFO Disconnecting from localhost:6667:true
 (kafka.producer.SyncProducer)
 [2013-08-27 08:29:30,381] INFO Secure sockets for data transfer is enabled
 (kafka.producer.SyncProducerConfig)

 Is there a 'secure' boolean property on the broker that allows for SSL?  I
 didn't see it on http://kafka.apache.org/08/configuration.html but maybe I
 missed it?

 Thanks,
 Jonathan


Mirrormaker consumer looping to offset out of range and reset offset errors

2013-10-09 Thread Rajasekar Elango
We are seeing that mirrormaker consumer started looping through offset out
of range and reset offset errors for some of partitions (2 out of 8
partitions). The consumerOffsetChecker reported very high Lag for these 2
partitions. Looks like this problem has started after a consumer rebalance.
Here is log lines:

2013-10-06 06:09:59,993
[ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4]
WARN  (kafka.consumer.ConsumerFetcherThread)  -
[ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4],
current offset 2526006629 for partition [FunnelProto,1] out of range; reset
offset to 2526006629
2013-10-06 06:09:59,993
[ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4]
WARN  (kafka.consumer.ConsumerFetcherThread)  -
[ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4],
current offset 2363213504 for partition [FunnelProto,3] out of range; reset
offset to 2363213504
2013-10-06 06:09:59,993
[ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4]
WARN  (kafka.consumer.ConsumerFetcherThread)  -
[ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4],
current offset 2146256007 for partition [jmx,0] out of range; reset offset
to 2146256007
2013-10-06 06:09:59,992
[ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4]
WARN  (kafka.consumer.ConsumerFetcherThread)  -
[ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4],
current offset 2239688 for partition [tower_timing_metrics,3] out of range;
reset offset to 2239688
2013-10-06 06:09:59,889
[ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4]
WARN  (kafka.consumer.ConsumerFetcherThread)  -
[ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4],
current offset 1234239 for partition [agent,0] out of range; reset offset
to 1234239
2013-10-06 06:09:59,889
[ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4]
WARN  (kafka.consumer.ConsumerFetcherThread)  -
[ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4],
current offset 2526006629 for partition [FunnelProto,1] out of range; reset
offset to 2526006629
2013-10-06 06:09:59,889
[ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4]
WARN  (kafka.consumer.ConsumerFetcherThread)  -
[ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4],
current offset 2363213504 for partition [FunnelProto,3] out of range; reset
offset to 2363213504


Also, as you can it's resetting offset to same value so it's looping
through this offset resets again and again. After we restarted our
mirrormaker process, it started consuming from beginning topic for all
partitions (we started received messages 7 days ) and it caught in couple
of hours..

We have couple of questions

1) What might have caused this to end up in this bad state..?
2) We had offset out of range problem only for 2 out of 8 partitions, but
it started to consume from beginning for all partitions in topic after we
restarted mirrormaker.. How problem with 2 partitions affected all other
partitions ..?


-- 
Thanks,
Raja.


Re: Kafka consumer - Mbean for max lag

2013-09-24 Thread Rajasekar Elango
Thanks Neha, Looks like this mbean was added recently. The version we are
running is from early June and it doesn't have this Mbean.

Thanks,
Raja.


On Mon, Sep 23, 2013 at 9:15 PM, Neha Narkhede neha.narkh...@gmail.comwrote:

 On the consumer side, look for
 kafka.consumer:name=([-.\w]+)-MaxLag,type=ConsumerFetcherManager.
 Updated the website to reflect that.

 Thanks,
 Neha


 On Mon, Sep 23, 2013 at 12:48 PM, Rajasekar Elango
 rela...@salesforce.comwrote:

  In kafka documentation for
  monitoringhttp://kafka.apache.org/documentation.html#operations.
   I see we should be looking at max in messages among all partitions..
 All
  I can see is mbeans
  *kafka.server.FetcherLagMetrics.*ConsumerFetcherThread* and it's value
 is
  pretty much 0. Is this the correct Mbean ? If not, Can you tell me which
  MBean provides that info?
 
  Thanks in advance.
  --
  Thanks,
  Raja.
 




-- 
Thanks,
Raja.


Re: Leader doesn't get assigned for new topics

2013-09-18 Thread Rajasekar Elango
From the output of StateChangeLogMerger tool, I see only this error
repeated;

[2013-09-18 14:16:48,358] ERROR [KafkaApi-1] Error while fetching metadata
for partition [FunnelProto,0] (kafka.server.KafkaApis)

On the state-change.log itself, I see this error:

[2013-09-18 14:22:48,954] ERROR Conditional update of path
/brokers/topics/test-1379439240191/partitions/2/state with data {
controller_epoch:10, isr:[ 1, 5, 4 ], leader:1, leader_epoch:4,
version:1 } and expected version 8 fai
led due to org.apache.zookeeper.KeeperException$BadVersionException:
KeeperErrorCode = BadVersion for
/brokers/topics/test-1379439240191/partitions/2/state (kafka.utils.ZkUtils$)

Do you know reason for above error..? Also this problem seem to be
intermittent, it started working now without any changes. I will continue
to monitor.

Thanks,
Raja.


On Tue, Sep 17, 2013 at 7:59 PM, Neha Narkhede neha.narkh...@gmail.comwrote:

 Raja,

 Could you run the StateChangeLogMerger tool and give it one topic-partition
 that has the above mentioned problem. This tool is documented here -

 https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-7.StateChangeLogMergerTool
 .

 Let me know if you run into any issues while using it.

 Thanks,
 Neha


 On Tue, Sep 17, 2013 at 12:27 PM, Rajasekar Elango
 rela...@salesforce.comwrote:

  Neha/Jun,
 
  The same problem started happening again although now our zookeeper
 cluster
  is configured correctly. The produce always failed with
  LeaderNotAvailableException and list topics shows topic is created with
  leader none. In the controller and stage-change log, I am seeing lot of
  these failures..
 
 
  [2013-09-17 19:21:36,531] WARN [KafkaApi-2] Produce request with
  correlation id 622369865 from client  on partition [FunnelProto,6] failed
  due to Partition [FunnelProto,6] doesn't exist on 2
  (kafka.server.KafkaApis)
  [2013-09-17 19:21:36,531] WARN [KafkaApi-2] Produce request with
  correlation id 622369865 from client  on partition [internal_metrics,3]
  failed due to Partition [internal_metrics,3] doesn't exist on 2
  (kafka.server.KafkaApis)
  [2013-09-17 19:21:36,531] WARN [KafkaApi-2] Produce request with
  correlation id 622369865 from client  on partition [FunnelProto,0] failed
  due to Partition [FunnelProto,0] doesn't exist on 2
  (kafka.server.KafkaApis)
  [2013-09-17 19:21:36,531] WARN [KafkaApi-2] Produce request with
  correlation id 622369865 from client  on partition [jmx,3] failed due to
  Partition [jmx,3] doesn't exist on 2 (kafka.server.KafkaApis)
  [2013-09-17 19:21:36,531] WARN [KafkaApi-2] Produce request with
  correlation id 622369865 from client  on partition [FunnelProto,5] failed
  due to Partition [FunnelProto,5] doesn't exist on 2
  (kafka.server.KafkaApis)
 
 
  When I ran listTopics command for one of above topic, all partitions are
  under replicated (we have replication factor set to 3). Any clues on what
  could be issue and how can we get it back to working?
 
  Thanks,
  Raja.
 
 
 
  On Fri, Sep 13, 2013 at 6:26 PM, Neha Narkhede neha.narkh...@gmail.com
  wrote:
 
   Ah ok. Thanks for sharing that.
  
  
  
   On Fri, Sep 13, 2013 at 2:50 PM, Rajasekar Elango 
  rela...@salesforce.com
   wrote:
  
We have 3 zookeeper node in the cluster with a hardware load
 balancer .
In
one of the zookeeper, we did not configure ensemble correctly
 (server.n
property in zoo.cfg) . So it ended up as like 2 nodes in one cluster,
  one
node in other cluster. The load balancer is randomly hitting one of 2
zookeepers in two different cluster.
   
Thanks,
Raja.
   
   
On Fri, Sep 13, 2013 at 1:04 PM, Neha Narkhede 
  neha.narkh...@gmail.com
wrote:
   
 Just curious to know, what was the misconfiguration?


 On Fri, Sep 13, 2013 at 10:02 AM, Rajasekar Elango
 rela...@salesforce.comwrote:

  Thanks Neha and Jun, It turned out to be miss configuration in
 our
  zookeeper cluster. After correcting it everything looks good.
 
  Thanks,
  Raja.
 
 
  On Fri, Sep 13, 2013 at 10:13 AM, Jun Rao jun...@gmail.com
  wrote:
 
   Any error in the controller and the state-change log? Are
 brokers
2,3,4
   alive?
  
   Thanks,
  
   Jun
  
  
   On Thu, Sep 12, 2013 at 4:56 PM, Rajasekar Elango 
  rela...@salesforce.com
   wrote:
  
We are seeing a problem that we we try to send messages to
 new
topic
 it
fails kafka.common.LeaderNotAvailableException. But usually
  this
  problem
will be transient and if we re-send messages to same topic
 will
work.
  But
now we tried rending message to same topic several time, but
   still
  fails
with same error:
   
In the server log I see ] Auto creation of topic test-sjl2
  with 8
partitions and replication factor 3 is successful!. But
   listTopics
   command
shows leader

Re: Leader doesn't get assigned for new topics

2013-09-17 Thread Rajasekar Elango
Neha/Jun,

The same problem started happening again although now our zookeeper cluster
is configured correctly. The produce always failed with
LeaderNotAvailableException and list topics shows topic is created with
leader none. In the controller and stage-change log, I am seeing lot of
these failures..


[2013-09-17 19:21:36,531] WARN [KafkaApi-2] Produce request with
correlation id 622369865 from client  on partition [FunnelProto,6] failed
due to Partition [FunnelProto,6] doesn't exist on 2 (kafka.server.KafkaApis)
[2013-09-17 19:21:36,531] WARN [KafkaApi-2] Produce request with
correlation id 622369865 from client  on partition [internal_metrics,3]
failed due to Partition [internal_metrics,3] doesn't exist on 2
(kafka.server.KafkaApis)
[2013-09-17 19:21:36,531] WARN [KafkaApi-2] Produce request with
correlation id 622369865 from client  on partition [FunnelProto,0] failed
due to Partition [FunnelProto,0] doesn't exist on 2 (kafka.server.KafkaApis)
[2013-09-17 19:21:36,531] WARN [KafkaApi-2] Produce request with
correlation id 622369865 from client  on partition [jmx,3] failed due to
Partition [jmx,3] doesn't exist on 2 (kafka.server.KafkaApis)
[2013-09-17 19:21:36,531] WARN [KafkaApi-2] Produce request with
correlation id 622369865 from client  on partition [FunnelProto,5] failed
due to Partition [FunnelProto,5] doesn't exist on 2 (kafka.server.KafkaApis)


When I ran listTopics command for one of above topic, all partitions are
under replicated (we have replication factor set to 3). Any clues on what
could be issue and how can we get it back to working?

Thanks,
Raja.



On Fri, Sep 13, 2013 at 6:26 PM, Neha Narkhede neha.narkh...@gmail.comwrote:

 Ah ok. Thanks for sharing that.



 On Fri, Sep 13, 2013 at 2:50 PM, Rajasekar Elango rela...@salesforce.com
 wrote:

  We have 3 zookeeper node in the cluster with a hardware load balancer .
  In
  one of the zookeeper, we did not configure ensemble correctly (server.n
  property in zoo.cfg) . So it ended up as like 2 nodes in one cluster, one
  node in other cluster. The load balancer is randomly hitting one of 2
  zookeepers in two different cluster.
 
  Thanks,
  Raja.
 
 
  On Fri, Sep 13, 2013 at 1:04 PM, Neha Narkhede neha.narkh...@gmail.com
  wrote:
 
   Just curious to know, what was the misconfiguration?
  
  
   On Fri, Sep 13, 2013 at 10:02 AM, Rajasekar Elango
   rela...@salesforce.comwrote:
  
Thanks Neha and Jun, It turned out to be miss configuration in our
zookeeper cluster. After correcting it everything looks good.
   
Thanks,
Raja.
   
   
On Fri, Sep 13, 2013 at 10:13 AM, Jun Rao jun...@gmail.com wrote:
   
 Any error in the controller and the state-change log? Are brokers
  2,3,4
 alive?

 Thanks,

 Jun


 On Thu, Sep 12, 2013 at 4:56 PM, Rajasekar Elango 
rela...@salesforce.com
 wrote:

  We are seeing a problem that we we try to send messages to new
  topic
   it
  fails kafka.common.LeaderNotAvailableException. But usually this
problem
  will be transient and if we re-send messages to same topic will
  work.
But
  now we tried rending message to same topic several time, but
 still
fails
  with same error:
 
  In the server log I see ] Auto creation of topic test-sjl2 with 8
  partitions and replication factor 3 is successful!. But
 listTopics
 command
  shows leader none like below:
 
  topic: test-sjl2partition: 0leader: nonereplicas:
   2,4,3
  isr:
  topic: test-sjl2partition: 1leader: nonereplicas:
   3,2,4
  isr:
  topic: test-sjl2partition: 2leader: nonereplicas:
   4,3,2
  isr:
  topic: test-sjl2partition: 3leader: nonereplicas:
   2,3,4
  isr:
  topic: test-sjl2partition: 4leader: nonereplicas:
   3,4,2
  isr:
  topic: test-sjl2partition: 5leader: nonereplicas:
   4,2,3
  isr:
  topic: test-sjl2partition: 6leader: nonereplicas:
   2,4,3
  isr:
  topic: test-sjl2partition: 7leader: nonereplicas:
   3,2,4
  isr:
 
  I also see following NotLeaderForPatritionExcetion and
ZookeeperExcetion
 in
  logs
 
  kafka.common.NotLeaderForPartitionException
  at
sun.reflect.GeneratedConstructorAccessor19.newInstance(Unknown
  Source)
  at
 
 

   
  
 
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
  at
 java.lang.reflect.Constructor.newInstance(Constructor.java:513)
  at java.lang.Class.newInstance0(Class.java:355)
  at java.lang.Class.newInstance(Class.java:308)
  at
kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:70)
  at
 
 

   
  
 
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun

Re: Leader doesn't get assigned for new topics

2013-09-13 Thread Rajasekar Elango
Thanks Neha and Jun, It turned out to be miss configuration in our
zookeeper cluster. After correcting it everything looks good.

Thanks,
Raja.


On Fri, Sep 13, 2013 at 10:13 AM, Jun Rao jun...@gmail.com wrote:

 Any error in the controller and the state-change log? Are brokers 2,3,4
 alive?

 Thanks,

 Jun


 On Thu, Sep 12, 2013 at 4:56 PM, Rajasekar Elango rela...@salesforce.com
 wrote:

  We are seeing a problem that we we try to send messages to new topic it
  fails kafka.common.LeaderNotAvailableException. But usually this problem
  will be transient and if we re-send messages to same topic will work. But
  now we tried rending message to same topic several time, but still fails
  with same error:
 
  In the server log I see ] Auto creation of topic test-sjl2 with 8
  partitions and replication factor 3 is successful!. But listTopics
 command
  shows leader none like below:
 
  topic: test-sjl2partition: 0leader: nonereplicas: 2,4,3
  isr:
  topic: test-sjl2partition: 1leader: nonereplicas: 3,2,4
  isr:
  topic: test-sjl2partition: 2leader: nonereplicas: 4,3,2
  isr:
  topic: test-sjl2partition: 3leader: nonereplicas: 2,3,4
  isr:
  topic: test-sjl2partition: 4leader: nonereplicas: 3,4,2
  isr:
  topic: test-sjl2partition: 5leader: nonereplicas: 4,2,3
  isr:
  topic: test-sjl2partition: 6leader: nonereplicas: 2,4,3
  isr:
  topic: test-sjl2partition: 7leader: nonereplicas: 3,2,4
  isr:
 
  I also see following NotLeaderForPatritionExcetion and ZookeeperExcetion
 in
  logs
 
  kafka.common.NotLeaderForPartitionException
  at sun.reflect.GeneratedConstructorAccessor19.newInstance(Unknown
  Source)
  at
 
 
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
  at
 java.lang.reflect.Constructor.newInstance(Constructor.java:513)
  at java.lang.Class.newInstance0(Class.java:355)
  at java.lang.Class.newInstance(Class.java:308)
  at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:70)
  at
 
 
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun$apply$5.apply(AbstractFetcherThread.scala:158)
  at
 
 
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun$apply$5.apply(AbstractFetcherThread.scala:158)
  at kafka.utils.Logging$class.warn(Logging.scala:88)
  at
 kafka.utils.ShutdownableThread.warn(ShutdownableThread.scala:23)
  at
 
 
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:157)
  at
 
 
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:113)
  at
  scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178)
  at
  scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347)
  at
 
 
 kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:113)
  at
  kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
  at
 kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
  2013-09-12 23:54:10,838 [kafka-request-handler-2] ERROR
  (kafka.utils.ZkUtils$)  - Conditional update of path
  /brokers/topics/FunnelProto/partitions/4/state with data {
  controller_epoch:3, isr:[ 2, 5 ], leader:2, leader_epoch:2,
  version:1 } and expected version 14 failed due to
  org.apache.zookeeper.KeeperException$BadVersionException:
 KeeperErrorCode =
  BadVersion for /brokers/topics/FunnelProto/partitions/4/state
  2013-09-12 23:54:10,838 [kafka-request-handler-2] ERROR
  (kafka.utils.ZkUtils$)  - Conditional update of path
  /brokers/topics/FunnelProto/partitions/4/state with data {
  controller_epoch:3, isr:[ 2, 5 ], leader:2, leader_epoch:2,
  version:1 } and expected version 14 failed due to
  org.apache.zookeeper.KeeperException$BadVersionException:
 KeeperErrorCode =
  BadVersion for /brokers/topics/FunnelProto/partitions/4/state
 
 
  Any clues on what could be problem.. ?
 
  Any for your help.
 
  --
  Thanks,
  Raja.
 




-- 
Thanks,
Raja.


Re: Leader doesn't get assigned for new topics

2013-09-13 Thread Rajasekar Elango
We have 3 zookeeper node in the cluster with a hardware load balancer .  In
one of the zookeeper, we did not configure ensemble correctly (server.n
property in zoo.cfg) . So it ended up as like 2 nodes in one cluster, one
node in other cluster. The load balancer is randomly hitting one of 2
zookeepers in two different cluster.

Thanks,
Raja.


On Fri, Sep 13, 2013 at 1:04 PM, Neha Narkhede neha.narkh...@gmail.comwrote:

 Just curious to know, what was the misconfiguration?


 On Fri, Sep 13, 2013 at 10:02 AM, Rajasekar Elango
 rela...@salesforce.comwrote:

  Thanks Neha and Jun, It turned out to be miss configuration in our
  zookeeper cluster. After correcting it everything looks good.
 
  Thanks,
  Raja.
 
 
  On Fri, Sep 13, 2013 at 10:13 AM, Jun Rao jun...@gmail.com wrote:
 
   Any error in the controller and the state-change log? Are brokers 2,3,4
   alive?
  
   Thanks,
  
   Jun
  
  
   On Thu, Sep 12, 2013 at 4:56 PM, Rajasekar Elango 
  rela...@salesforce.com
   wrote:
  
We are seeing a problem that we we try to send messages to new topic
 it
fails kafka.common.LeaderNotAvailableException. But usually this
  problem
will be transient and if we re-send messages to same topic will work.
  But
now we tried rending message to same topic several time, but still
  fails
with same error:
   
In the server log I see ] Auto creation of topic test-sjl2 with 8
partitions and replication factor 3 is successful!. But listTopics
   command
shows leader none like below:
   
topic: test-sjl2partition: 0leader: nonereplicas:
 2,4,3
isr:
topic: test-sjl2partition: 1leader: nonereplicas:
 3,2,4
isr:
topic: test-sjl2partition: 2leader: nonereplicas:
 4,3,2
isr:
topic: test-sjl2partition: 3leader: nonereplicas:
 2,3,4
isr:
topic: test-sjl2partition: 4leader: nonereplicas:
 3,4,2
isr:
topic: test-sjl2partition: 5leader: nonereplicas:
 4,2,3
isr:
topic: test-sjl2partition: 6leader: nonereplicas:
 2,4,3
isr:
topic: test-sjl2partition: 7leader: nonereplicas:
 3,2,4
isr:
   
I also see following NotLeaderForPatritionExcetion and
  ZookeeperExcetion
   in
logs
   
kafka.common.NotLeaderForPartitionException
at
  sun.reflect.GeneratedConstructorAccessor19.newInstance(Unknown
Source)
at
   
   
  
 
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
at
   java.lang.reflect.Constructor.newInstance(Constructor.java:513)
at java.lang.Class.newInstance0(Class.java:355)
at java.lang.Class.newInstance(Class.java:308)
at
  kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:70)
at
   
   
  
 
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun$apply$5.apply(AbstractFetcherThread.scala:158)
at
   
   
  
 
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun$apply$5.apply(AbstractFetcherThread.scala:158)
at kafka.utils.Logging$class.warn(Logging.scala:88)
at
   kafka.utils.ShutdownableThread.warn(ShutdownableThread.scala:23)
at
   
   
  
 
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:157)
at
   
   
  
 
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:113)
at
   
 scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178)
at
   
  scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347)
at
   
   
  
 
 kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:113)
at
   
  kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
at
   kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
2013-09-12 23:54:10,838 [kafka-request-handler-2] ERROR
(kafka.utils.ZkUtils$)  - Conditional update of path
/brokers/topics/FunnelProto/partitions/4/state with data {
controller_epoch:3, isr:[ 2, 5 ], leader:2, leader_epoch:2,
version:1 } and expected version 14 failed due to
org.apache.zookeeper.KeeperException$BadVersionException:
   KeeperErrorCode =
BadVersion for /brokers/topics/FunnelProto/partitions/4/state
2013-09-12 23:54:10,838 [kafka-request-handler-2] ERROR
(kafka.utils.ZkUtils$)  - Conditional update of path
/brokers/topics/FunnelProto/partitions/4/state with data {
controller_epoch:3, isr:[ 2, 5 ], leader:2, leader_epoch:2,
version:1 } and expected version 14 failed due to
org.apache.zookeeper.KeeperException$BadVersionException:
   KeeperErrorCode =
BadVersion for /brokers/topics/FunnelProto/partitions/4/state
   
   
Any clues on what could

Re: Mirror maker doesn't replicate new topics

2013-09-10 Thread Rajasekar Elango
Hi Guozhang ,

1) When I say I send messages to new topic - yes I am sending new
messages to source cluster via console producer.
2) The log message Handling 0 events doesn't output topic name. But I
would believe its for both old and new topics, because no other app is
sending messages to source cluster other than me trying to test using
console producer.

Thanks,
Raja.


On Tue, Sep 10, 2013 at 1:03 PM, Guozhang Wang wangg...@gmail.com wrote:

 Hi Raja,

 When you say I send messages to new topic I guess you mean that you send
 messages to the source cluster right? It may be due to the fact that
 producers of mirror make have not catched up with the mirror maker
 consumer.

 When you say I always see Handling 0 events do you mean that you see this
 for both messages for the new topic and for the old topics, or it only
 shows this log for new topic?

 Guozhang


 On Tue, Sep 10, 2013 at 7:47 AM, Rajasekar Elango rela...@salesforce.com
 wrote:

  Thanks Guozhang,
 
  1, 2, 3 all are true. We are using default value 200 for
 batch.num.messages
  and 5000ms queue.buffering.max.ms. I believe it should batch either if
  batch.num.messages is reached or queue.buffering.max.ms is reached.
 
  I see log message 5000ms elapsed , Queue time reached. Sending.on
  regular interval. But when I send messages to new topic, I always see
  Handling 0 events and it doesn't produce to target cluster. But when I
  resend it second time, I see Handling x events and starts producing.
 Any
  clues on how to debug further?
 
  Thanks,
 
  Raja.
 
 
  On Mon, Sep 9, 2013 at 6:02 PM, Guozhang Wang wangg...@gmail.com
 wrote:
 
   Hi Raja,
  
   So just to summarize the scenario:
  
   1) The consumer of mirror maker is successfully consuming all
 partitions
  of
   the newly created topic.
   2) The producer of mirror maker is not producing the new messages
   immediately when the topic is created (observed from
 ProducerSendThread's
   log).
   3) The producer of mirror maker will start producing the new messages
  when
   more messages are sent to the source cluster.
  
   If 1) is true then KAFKA-1030 is excluded, since the consumer
  successfully
   recognize all the partitions and start consuming.
  
   If both 2) and 3) is true, I would wonder if the batch size of the
 mirror
   maker producer is large and hence will not send until enough messages
 are
   accumulated at the producer queue.
  
   Guozhang
  
  
   On Mon, Sep 9, 2013 at 2:36 PM, Rajasekar Elango 
 rela...@salesforce.com
   wrote:
  
yes, the data exists in source cluster, but not in target cluster. I
   can't
replicate this problem in dev environment and it happens only in prod
environment. I turned on debug logging, but not able to identify  the
problem. Basically, whenever I send data to new topic, I don't see
 any
   log
messages from ProducerSendThread in mirrormaker log so they are not
produced to target cluster. If I send more messages to same topic,
 the
producer send thread kicks off and replicates the messages. But
  whatever
messages send first time gets lost. How can I trouble shoot this
  problem
further? Even this could be due to know issue
https://issues.apache.org/jira/browse/KAFKA-1030, how can I confirm
   that?
Is there config tweaking I can make to workaround this..?
ConsumerOffsetChecks helps to track consumers. Its there any other
 tool
   we
can use to track producers in mirrormaker. ?
   
Thanks in advance for help.
   
Thanks,
Raja.
   
   
   
   
On Fri, Sep 6, 2013 at 3:50 AM, Swapnil Ghike sgh...@linkedin.com
   wrote:
   
 Hi Rajasekar,

 You said that ConsumerOffsetChecker shows that new topics are
successfully
 consumed and the lag is 0. If that's the case, can you verify that
   there
 is data on the source cluster for these new topics? If there is no
  data
at
 the source, MirrorMaker will only assign consumer streams to the
 new
 topic, but the lag will be 0.

 This could otherwise be related to
 https://issues.apache.org/jira/browse/KAFKA-1030.

 Swapnil



 On 9/5/13 8:38 PM, Guozhang Wang wangg...@gmail.com wrote:

 Could you let me know the process of reproducing this issue?
 
 Guozhang
 
 
 On Thu, Sep 5, 2013 at 5:04 PM, Rajasekar Elango
 rela...@salesforce.comwrote:
 
  Yes guozhang
 
  Sent from my iPhone
 
  On Sep 5, 2013, at 7:53 PM, Guozhang Wang wangg...@gmail.com
   wrote:
 
   Hi Rajasekar,
  
   Is auto.create.topics.enable set to true in your target
 cluster?
  
   Guozhang
  
  
   On Thu, Sep 5, 2013 at 4:39 PM, Rajasekar Elango
 rela...@salesforce.com
  wrote:
  
   We having issues that mirormaker not longer replicate newly
   created
  topics.
   It continues to replicate data for existing topics and but
 new
topics
   doesn't get created

Re: Mirror maker doesn't replicate new topics

2013-09-09 Thread Rajasekar Elango
yes, the data exists in source cluster, but not in target cluster. I can't
replicate this problem in dev environment and it happens only in prod
environment. I turned on debug logging, but not able to identify  the
problem. Basically, whenever I send data to new topic, I don't see any log
messages from ProducerSendThread in mirrormaker log so they are not
produced to target cluster. If I send more messages to same topic, the
producer send thread kicks off and replicates the messages. But whatever
messages send first time gets lost. How can I trouble shoot this problem
further? Even this could be due to know issue
https://issues.apache.org/jira/browse/KAFKA-1030, how can I confirm that?
Is there config tweaking I can make to workaround this..?
ConsumerOffsetChecks helps to track consumers. Its there any other tool we
can use to track producers in mirrormaker. ?

Thanks in advance for help.

Thanks,
Raja.




On Fri, Sep 6, 2013 at 3:50 AM, Swapnil Ghike sgh...@linkedin.com wrote:

 Hi Rajasekar,

 You said that ConsumerOffsetChecker shows that new topics are successfully
 consumed and the lag is 0. If that's the case, can you verify that there
 is data on the source cluster for these new topics? If there is no data at
 the source, MirrorMaker will only assign consumer streams to the new
 topic, but the lag will be 0.

 This could otherwise be related to
 https://issues.apache.org/jira/browse/KAFKA-1030.

 Swapnil



 On 9/5/13 8:38 PM, Guozhang Wang wangg...@gmail.com wrote:

 Could you let me know the process of reproducing this issue?
 
 Guozhang
 
 
 On Thu, Sep 5, 2013 at 5:04 PM, Rajasekar Elango
 rela...@salesforce.comwrote:
 
  Yes guozhang
 
  Sent from my iPhone
 
  On Sep 5, 2013, at 7:53 PM, Guozhang Wang wangg...@gmail.com wrote:
 
   Hi Rajasekar,
  
   Is auto.create.topics.enable set to true in your target cluster?
  
   Guozhang
  
  
   On Thu, Sep 5, 2013 at 4:39 PM, Rajasekar Elango
 rela...@salesforce.com
  wrote:
  
   We having issues that mirormaker not longer replicate newly created
  topics.
   It continues to replicate data for existing topics and but new topics
   doesn't get created on target cluster. ConsumerOffsetTracker shows
 that
  new
   topics are successfully consumed and Lag is 0. But those topics
 doesn't
  get
   created in target cluster. I also don't see mbeans for this new topic
  under
   kafka.producer.ProducerTopicMetrics.topic namemetric. In logs I see
   warning for NotLeaderForPatition. but don't see major error. What
 else
  can
   we look to troubleshoot this further.
  
   --
   Thanks,
   Raja.
  
  
  
   --
   -- Guozhang
 
 
 
 
 --
 -- Guozhang




-- 
Thanks,
Raja.


Re: Kafka Monitoring

2013-09-05 Thread Rajasekar Elango
Thanks a lot Jun. This is very helpful.

Thanks,
Raja.


On Thu, Sep 5, 2013 at 1:12 AM, Jun Rao jun...@gmail.com wrote:

 Updated the doc at http://kafka.apache.org/documentation.html#monitoring

 Hopefully that answers your questions.

 Thanks,

 Jun


 On Tue, Sep 3, 2013 at 11:16 PM, Vadim Keylis vkeylis2...@gmail.com
 wrote:

  Good evening. I have read through section of monitoring. I tried to map
  each section to corresponding JMX attribute. I will appreciate if you
  answer a few questions bellow.
 
  Thanks so much in advance,
  Vadim
 
  What this JMX
  kafka.controller:type=KafkaController,name=ActiveControllerCount
 for?
 
  The rate of data in and out of the cluster and the number of messages
  written
 Which jmx attributes should I monitor? Since I should alert on this
 What
  are acceptable changes? What are not?
  The log flush rate and the time taken to flush the log
  kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs
  Which attribute I should be watching and what acceptable deviation change
  before I should alert
  The number of partitions that have replicas that are down or have
  fallen behind and are underreplicated.
 Is this the JMX
  kafka.cluster:type=Partition,name=buypets-0-UnderReplicated that
 will
  show replicas that are down?
 
  Unclean leader elections. This shouldn't happen.
 
 
 
  
 kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec.
  I assume that should always be 0 and if its not 0 we have problem.
  Number of partitions each node is the leader for.
 Which JMX attribute(s) monitors this?
  Leader elections: we track each time this happens and how long it
 took:
 
 
 
 kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs
  Any changes to the ISR
  Which JMX attribute I should monitor for this? Should I alert on
 this?
  What are reasonable changes? Which are not?
  The number of produce requests waiting on replication to report back
 Which JMX attribute I should monitor for this? Should I alert on this?
  What are reasonable changes? Which are not?
  The number of fetch requests waiting on data to arrive
 Which JMX attribute I should monitor for this? Should I alert on this?
  What are reasonable changes? Which are not?
 




-- 
Thanks,
Raja.


Mirror maker doesn't replicate new topics

2013-09-05 Thread Rajasekar Elango
We having issues that mirormaker not longer replicate newly created topics.
It continues to replicate data for existing topics and but new topics
doesn't get created on target cluster. ConsumerOffsetTracker shows that new
topics are successfully consumed and Lag is 0. But those topics doesn't get
created in target cluster. I also don't see mbeans for this new topic under
kafka.producer.ProducerTopicMetrics.topic namemetric. In logs I see
warning for NotLeaderForPatition. but don't see major error. What else can
we look to troubleshoot this further.

-- 
Thanks,
Raja.


Re: Mirror maker doesn't replicate new topics

2013-09-05 Thread Rajasekar Elango
Yes guozhang

Sent from my iPhone

On Sep 5, 2013, at 7:53 PM, Guozhang Wang wangg...@gmail.com wrote:

 Hi Rajasekar,

 Is auto.create.topics.enable set to true in your target cluster?

 Guozhang


 On Thu, Sep 5, 2013 at 4:39 PM, Rajasekar Elango 
 rela...@salesforce.comwrote:

 We having issues that mirormaker not longer replicate newly created topics.
 It continues to replicate data for existing topics and but new topics
 doesn't get created on target cluster. ConsumerOffsetTracker shows that new
 topics are successfully consumed and Lag is 0. But those topics doesn't get
 created in target cluster. I also don't see mbeans for this new topic under
 kafka.producer.ProducerTopicMetrics.topic namemetric. In logs I see
 warning for NotLeaderForPatition. but don't see major error. What else can
 we look to troubleshoot this further.

 --
 Thanks,
 Raja.



 --
 -- Guozhang


Re: Number of file handles increases indefinitely in producer if broker host is unresolvable

2013-09-04 Thread Rajasekar Elango
I can easily reproduce this with console producer,  If I run console
producer with right hostname and if broker is not running, the console
producer will exit after three tries. But If I run console producer with
unresolvable broker, it throws below exception and continues to wait for
user input, every time I enter new message, it opens socket and file handle
count keeps increasing..

Here is Exception in producer

ERROR fetching topic metadata for topics [Set(test-1378245487417)] from
broker [ArrayBuffer(id:0,host:localhost1,port:6667)] failed
(kafka.utils.Utils$)
kafka.common.KafkaException: fetching topic metadata for topics
[Set(test-1378245487417)] from broker
[ArrayBuffer(id:0,host:localhost1,port:6667)] failed
at
kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:51)
at
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at
kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
at kafka.utils.Utils$.swallow(Utils.scala:186)
at kafka.utils.Logging$class.swallowError(Logging.scala:105)
at kafka.utils.Utils$.swallowError(Utils.scala:45)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
at scala.collection.immutable.Stream.foreach(Stream.scala:526)
at
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
at
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
Caused by: java.nio.channels.UnresolvedAddressException
at sun.nio.ch.Net.checkAddress(Net.java:30)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:487)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:59)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:151)
at
kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:166)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:73)
at kafka.producer.SyncProducer.send(SyncProducer.scala:117)
at
kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:37)
... 12 more



On Tue, Sep 3, 2013 at 9:29 PM, Neha Narkhede neha.narkh...@gmail.comwrote:

 Interesting. What errors/exceptions do you see in the producer logs?

 Thanks,
 Neha


 On Tue, Sep 3, 2013 at 3:28 PM, Rajasekar Elango rela...@salesforce.com
 wrote:

  We found a issue that if broker host is un resolvable, the number of file
  handle keep increasing for every message we produce and eventually it
 uses
  up all available files handles in operating system. If broker itself is
 not
  running and broker host name is resolvable, open file handles count stays
  flat.
 
  lsof output shows number of these open file handles continue to grow for
  every message we produce.
 
   java  19631relango   81u sock0,6  0t0
   196966526 can't identify protocol
 
  Is this a bug is producer API..? What is best way to self protect our
 self
  ?
 
  --
  Thanks,
  Raja.
 




-- 
Thanks,
Raja.


Re: Kafka-0.8.0-beta1-src Has ObjectName starting with Double Quotes

2013-09-04 Thread Rajasekar Elango
We have the same problem, it doesn't work with restful JMX console
jiminixhttps://code.google.com/p/jminix/.
Is it possible to change kafka to expose mbeans without quotes?

Thanks,
Raja.


On Wed, Sep 4, 2013 at 10:30 AM, Neha Narkhede neha.narkh...@gmail.comwrote:

 I had the same problem and could not use jmxterm to inspect jmx beans due
 to this issue. But we tried escaping the quotes and it works with our
 internal monitoring system as well as JmxTool that ships with Kafka.

 Thanks,
 Neha
 On Sep 4, 2013 7:16 AM, Monika Garg monika.g...@impetus.co.in wrote:

  Hi,
 
  Kafka-0.8.0-beta1-src is having doublequotes in the start of Objects name
  obtained from jConsole.Due to this I am not able to use jmxTrans to
 monitor
  my kafka-0.8.0 cluster.
 
  Please help in solving the issue.
 
 
 
  Regards
  Monika Garg
  Associate Software Engineer
  Impetus Infotech (India) Pvt. Ltd.
  D-40, Sector-59, Noida - 201307, UP
  (O) +91-120-4363300 x 2858
  (M) +91-8588075977
  www.impetus.comhttp://www.impetus.com/
 
 
  
 
 
 
 
 
 
  NOTE: This message may contain information that is confidential,
  proprietary, privileged or otherwise protected by law. The message is
  intended solely for the named addressee. If received in error, please
  destroy and notify the sender. Any use of this email is prohibited when
  received in error. Impetus does not represent, warrant and/or guarantee,
  that the integrity of this communication has been maintained nor that the
  communication is free of errors, virus, interception or interference.
 




-- 
Thanks,
Raja.


Re: Number of file handles increases indefinitely in producer if broker host is unresolvable

2013-09-04 Thread Rajasekar Elango
Sure. Thanks Neha.  Created Issue:
https://issues.apache.org/jira/browse/KAFKA-1041




On Wed, Sep 4, 2013 at 10:32 AM, Neha Narkhede neha.narkh...@gmail.comwrote:

 Ideally if the producer runs into any error, it should close the previous
 socket and open a new one. Seems like that is not happening here. I will
 take a closer look at this today. Do you mind filing a bug?

 Thanks,
 Neha
 On Sep 4, 2013 7:23 AM, Rajasekar Elango rela...@salesforce.com wrote:

  I can easily reproduce this with console producer,  If I run console
  producer with right hostname and if broker is not running, the console
  producer will exit after three tries. But If I run console producer with
  unresolvable broker, it throws below exception and continues to wait for
  user input, every time I enter new message, it opens socket and file
 handle
  count keeps increasing..
 
  Here is Exception in producer
 
  ERROR fetching topic metadata for topics [Set(test-1378245487417)] from
  broker [ArrayBuffer(id:0,host:localhost1,port:6667)] failed
  (kafka.utils.Utils$)
  kafka.common.KafkaException: fetching topic metadata for topics
  [Set(test-1378245487417)] from broker
  [ArrayBuffer(id:0,host:localhost1,port:6667)] failed
  at
  kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:51)
  at
 
 kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
  at
 
 
 kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
  at kafka.utils.Utils$.swallow(Utils.scala:186)
  at kafka.utils.Logging$class.swallowError(Logging.scala:105)
  at kafka.utils.Utils$.swallowError(Utils.scala:45)
  at
 
 
 kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
  at
 
 
 kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
  at
 
 
 kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
  at
 
 
 kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
  at scala.collection.immutable.Stream.foreach(Stream.scala:526)
  at
 
 
 kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
  at
  kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
  Caused by: java.nio.channels.UnresolvedAddressException
  at sun.nio.ch.Net.checkAddress(Net.java:30)
  at
 sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:487)
  at
 kafka.network.BlockingChannel.connect(BlockingChannel.scala:59)
  at kafka.producer.SyncProducer.connect(SyncProducer.scala:151)
  at
  kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:166)
  at
 
 
 kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:73)
  at kafka.producer.SyncProducer.send(SyncProducer.scala:117)
  at
  kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:37)
  ... 12 more
 
 
 
  On Tue, Sep 3, 2013 at 9:29 PM, Neha Narkhede neha.narkh...@gmail.com
  wrote:
 
   Interesting. What errors/exceptions do you see in the producer logs?
  
   Thanks,
   Neha
  
  
   On Tue, Sep 3, 2013 at 3:28 PM, Rajasekar Elango 
 rela...@salesforce.com
   wrote:
  
We found a issue that if broker host is un resolvable, the number of
  file
handle keep increasing for every message we produce and eventually it
   uses
up all available files handles in operating system. If broker itself
 is
   not
running and broker host name is resolvable, open file handles count
  stays
flat.
   
lsof output shows number of these open file handles continue to grow
  for
every message we produce.
   
 java  19631relango   81u sock0,6
  0t0
 196966526 can't identify protocol
   
Is this a bug is producer API..? What is best way to self protect our
   self
?
   
--
Thanks,
Raja.
   
  
 
 
 
  --
  Thanks,
  Raja.
 




-- 
Thanks,
Raja.


Re: Mirrormaker stopped consuming

2013-09-03 Thread Rajasekar Elango
Thanks Neha,

I did not take a thread dump before restarting, will get it when it happens
again. We are using 16 Gigs of jvm heap. Do you have a recommendation on
jvm GC options.?

Thanks,
Raja.


On Tue, Sep 3, 2013 at 12:26 PM, Neha Narkhede neha.narkh...@gmail.comwrote:

 2013-09-01 05:59:27,792 [main-EventThread] INFO
  (org.I0Itec.zkclient.ZkClient)  - zookeeper state changed (Disconnected)
 2013-09-01 05:59:27,692 [main-SendThread(
 mandm-zookeeper-asg.data.sfdc.net:2181)] INFO
  (org.apache.zookeeper.
 ClientCnxn)  - Client session timed out, have not
 heard from server in 4002ms for sessionid 0x140c603da5b0032, closing socket
 connection and attempting reconnect

 This indicates that your mirror maker and/or your zookeeper cluster is
 GCing for long periods of time. I have observed that if client session
 timed out happens too many times, the client tends to lose zookeeper
 watches. This is a potential bug in zookeeper. If this happens, your mirror
 maker instance might not rebalance correctly and will start losing data.

 You mentioned consumption/production stopped on your mirror maker, could
 you please take a thread dump and point us to it? Meanwhile, you might want
 to fix the GC pauses.

 Thanks,
 Neha


 On Tue, Sep 3, 2013 at 8:59 AM, Rajasekar Elango rela...@salesforce.com
 wrote:

  We found that mirrormaker stopped consuming and producing over the week
 end
  (09/01). Just seeing Client session timed out messages in mirrormaker
  log. I restarted to it today 09/03 to resume processing. Here is the logs
  line in reverse order.
 
 
  2013-09-03 14:20:40,918
 
 
 [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
  INFO  (kafka.utils.VerifiableProperties)  - Verifying properties
  2013-09-03 14:20:40,877
 
 
 [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
  INFO  (kafka.consumer.ZookeeperConsumerConnector)  -
  [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506],
  begin rebalancing consumer
  mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506 try
 #1
  2013-09-03 14:20:38,877
 
 
 [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
  INFO  (kafka.consumer.ZookeeperConsumerConnector)  -
  [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506],
  Committing all offsets after clearing the fetcher queues
  2013-09-03 14:20:38,877
 
 
 [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
  INFO  (kafka.consumer.ZookeeperConsumerConnector)  -
  [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506],
  Cleared the data chunks in all the consumer message iterators
  2013-09-03 14:20:38,877
 
 
 [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
  INFO  (kafka.consumer.ZookeeperConsumerConnector)  -
  [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506],
  Cleared all relevant queues for this fetcher
  2013-09-03 14:20:38,877
 
 
 [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
  INFO  (kafka.consumer.ConsumerFetcherManager)  -
  [ConsumerFetcherManager-1378218012760] All connections stopped
  2013-09-03 14:20:38,877
 
 
 [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
  INFO  (kafka.consumer.ConsumerFetcherManager)  -
  [ConsumerFetcherManager-1378218012760] Stopping all fetchers
  2013-09-03 14:20:38,877
 
 
 [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
  INFO  (kafka.consumer.ConsumerFetcherManager)  -
  [ConsumerFetcherManager-1378218012760] Stopping leader finder thread
  2013-09-03 14:20:38,877
 
 
 [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
  INFO  (kafka.consumer.ZookeeperConsumerConnector)  -
  [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506],
  Rebalancing attempt failed. Clearing the cache before the next
 rebalancing
  operation is triggered
  2013-09-03 14:20:38,876
 
 
 [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
  INFO  (kafka.consumer.ZookeeperConsumerConnector)  -
  [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506],
 end
  rebalancing consumer
  mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506 try
 #0
  2013-09-01 05:59:29,069 [main-SendThread(
  mandm-zookeeper-asg.data.sfdc.net:2181)] INFO
   (org.apache.zookeeper.ClientCnxn)  - Socket connection established to
  mandm-zookeeper-asg.data.sfdc.net/10.228.48.38:2181, initiating session
  2013-09-01 05:59:29,069 [main-SendThread(
  mandm-zookeeper-asg.data.sfdc.net:2181)] INFO
   (org.apache.zookeeper.ClientCnxn)  - Opening socket connection to server
  mandm-zookeeper-asg.data.sfdc.net/10.228.48.38:2181
  2013-09-01 05:59:27,792 [main-EventThread] INFO
   (org.I0Itec.zkclient.ZkClient)  - zookeeper state

Number of file handles increases indefinitely in producer if broker host is unresolvable

2013-09-03 Thread Rajasekar Elango
We found a issue that if broker host is un resolvable, the number of file
handle keep increasing for every message we produce and eventually it uses
up all available files handles in operating system. If broker itself is not
running and broker host name is resolvable, open file handles count stays
flat.

lsof output shows number of these open file handles continue to grow for
every message we produce.

 java  19631relango   81u sock0,6  0t0
 196966526 can't identify protocol

Is this a bug is producer API..? What is best way to self protect our self ?

-- 
Thanks,
Raja.


Re: Num of streams for consumers using TopicFilter.

2013-08-30 Thread Rajasekar Elango
Yeah. The actual bottleneck is actually number of topics that match the
topic filter. Num of streams is going be shared between all topics it's
consuming from. I thought about following ideas to work around this. (I am
basically referring to mirrormaker consumer in examples).

Option 1). Instead of running one mirrormaker process with topic filter
.+, We can start multiple mirrormaker process with topic filter matching
each topic (Eg: mirrormaker1 = whitelist topic1.* , mirrormaker2
= whitelist topic2.* etc)

But this adds some operations overhead to start and manage multiple
processes on the host.

Option 2) Modify mirrormaker code to support list of whitelist filters and
it should create message streams for  each filter
(call createMessageStreamsByFilter for each filter).

What would be your recommendation..? If adding feature to mirrormaker is
worth kafka, we can do option 2.

Thanks,
Raja.




On Fri, Aug 30, 2013 at 10:34 AM, Jun Rao jun...@gmail.com wrote:

 Right, but if you set #partitions in each topic to 16, you can use a total
 of 16 streams.

 Thanks,

 Jun


 On Thu, Aug 29, 2013 at 9:08 PM, Rajasekar Elango rela...@salesforce.com
 wrote:

  With option 1) I can't really use 8 streams in each consumer, If I do
 only
  one consumer seem to be doing all work. So I had to actually use total 8
  streams with 4 for each consumer.
 
 
 
  On Fri, Aug 30, 2013 at 12:01 AM, Jun Rao jun...@gmail.com wrote:
 
   The drawback of 2), as you said is no auto failover. I was suggesting
  that
   you use 16 partitions. Then you can use option 1) with 8 streams in
 each
   consumer.
  
   Thanks,
  
   Jun
  
  
   On Thu, Aug 29, 2013 at 8:51 PM, Rajasekar Elango 
  rela...@salesforce.com
   wrote:
  
Hi Jun,
   
If you read my previous posts, based on current re balancing logic,
 if
  we
consumer from topic filter, consumer actively use all streams. Can
 you
provide your recommendation of option 1 vs option 2 in my previous
  post?
   
Thanks,
Raja.
   
   
On Thu, Aug 29, 2013 at 11:42 PM, Jun Rao jun...@gmail.com wrote:
   
 You can always use more partitions to get more parallelism in the
 consumers.

 Thanks,

 Jun


 On Thu, Aug 29, 2013 at 12:44 PM, Rajasekar Elango
 rela...@salesforce.comwrote:

  So what is best way to load balance multiple consumers consuming
  from
 topic
  filter.
 
  Let's say we have 4 topics with 8 partitions and 2 consumers.
 
  Option 1) To load balance consumers, we can set num.streams=4 so
  that
 both
  consumers split 8 partitions. but can only use half of consumer
streams.
 
  Option 2) Configure mutually exclusive topic filter regex such
  that 2
  topics will match consumer1 and 2 topics will match consumer2.
 Now
  we
can
  set num.streams=8 and fully utilize consumer streams. I believe
  this
will
  improve performance, but if consumer dies, we will not get any
 data
from
  the topic used by that consumer.
 
  What would be your recommendation?
 
  Thanks,
  Raja.
 
 
  On Thu, Aug 29, 2013 at 12:42 PM, Neha Narkhede 
neha.narkh...@gmail.com
  wrote:
 
2) When I started mirrormaker with num.streams=16, looks
 like
  16
   consumer
   threads were created, but only 8 are showing up as active as
  owner
   in
   consumer offset tracker and all topics/partitions are
 distributed
  between 8
   consumer threads.
  
   This is because currently the consumer rebalancing process of
assigning
   partitions to consumer streams is at a per topic level. Unless
  you
have
  at
   least one topic with 16 partitions, the remaining 8 threads
 will
   not
do
  any
   work. This is not ideal and we want to look into a better
   rebalancing
   algorithm. Though it is a big change and we prefer doing it as
  part
of
  the
   consumer client rewrite.
  
   Thanks,
   Neha
  
  
   On Thu, Aug 29, 2013 at 8:03 AM, Rajasekar Elango 
  rela...@salesforce.com
   wrote:
  
So my understanding is num of active streams that a consumer
  can
  utilize
   is
number of partitions in topic. This is fine if we consumer
 from
  specific
topic. But if we consumer from TopicFilter, I thought
 consumer
should
   able
to utilize (number of topics that match filter * number of
partitions
  in
topic) . But looks like number of streams that consumer can
 use
   is
   limited
by just number if partitions in topic although it's consuming
   from
   multiple
topic.
   
Here what I observed with 1 mirrormaker consuming from
  whitelist
 '.+'.
   
The white list matches 5 topics and each topic has 8
  partitions.
   I
 used
consumer offset checker to look at owner of each

Num of streams for consumers using TopicFilter.

2013-08-29 Thread Rajasekar Elango
So my understanding is num of active streams that a consumer can utilize is
number of partitions in topic. This is fine if we consumer from specific
topic. But if we consumer from TopicFilter, I thought consumer should able
to utilize (number of topics that match filter * number of partitions in
topic) . But looks like number of streams that consumer can use is limited
by just number if partitions in topic although it's consuming from multiple
topic.

Here what I observed with 1 mirrormaker consuming from whitelist '.+'.

The white list matches 5 topics and each topic has 8 partitions. I used
consumer offset checker to look at owner of each/topic partition.

1) When I started mirrormaker with num.streams=8, all topics/partitions are
distributed between 8 consumer threads.

2) When I started mirrormaker with num.streams=16, looks like 16 consumer
threads were created, but only 8 are showing up as active as owner in
consumer offset tracker and all topics/partitions are distributed between 8
consumer threads.

So this could be bottleneck for consumers as although we partitioned topic,
if we are consuming from topic filter it can't utilize much of parallelism
with num of streams. Am i missing something, is there a way to make
cosumers/mirrormakers to utilize more number of active streams?


-- 
Thanks,
Raja.


Re: Getting LeaderNotAvailableException in console producer after increasing partitions from 4 to 16.

2013-08-29 Thread Rajasekar Elango
Created JIRA  https://issues.apache.org/jira/browse/KAFKA-1035 and
attached patch to it. Please review.


On Wed, Aug 28, 2013 at 1:11 PM, Guozhang Wang wangg...@gmail.com wrote:

 I think this patch can be made in trunk. You can mark it as 0.8.1

 Guozhang

 On Wednesday, August 28, 2013, Rajasekar Elango rela...@salesforce.com
 wrote:
  Guozhang ,
 
  *The documentation says I need to work off of trunk. Can you confirm If I
  should be working in trunk or different branch.*
  *
  *
  *Thanks,*
  *Raja.*
 
 
  On Tue, Aug 27, 2013 at 8:33 PM, Guozhang Wang wangg...@gmail.com
 wrote:
 
  Cool! You can follow the process of creating a JIRA here:
 
  http://kafka.apache.org/contributing.html
 
  And submit patch here:
 
  https://cwiki.apache.org/confluence/display/KAFKA/Git+Workflow
 
  It will be great if you can also add an entry for this issue in FAQ
 since I
  think this is a common question:
 
  https://cwiki.apache.org/confluence/display/KAFKA/FAQ
 
  Guozhang
 
 
  On Tue, Aug 27, 2013 at 2:38 PM, Rajasekar Elango 
 rela...@salesforce.com
  wrote:
 
   Thanks Guozhang, Changing max retry to 5 worked. Since I am changing
   console producer code, I can also submit patch adding both
   message.send.max.retries
   and retry.backoff.ms to console producer. Can you let me know process
  for
   submitting patch?
  
   Thanks,
   Raja.
  
  
   On Tue, Aug 27, 2013 at 4:03 PM, Guozhang Wang wangg...@gmail.com
  wrote:
  
Hello Rajasekar,
   
The remove fetcher log entry is normal under addition of partitions,
   since
they indicate that some leader changes have happened so brokers are
   closing
the fetchers to the old leaders.
   
I just realized that the console Producer does not have the
message.send.max.retries options yet. Could you file a JIRA for this
  and
   I
will followup to add this option? As for now you can hard modify the
default value from 3 to a larger number.
   
Guozhang
   
   
On Tue, Aug 27, 2013 at 12:37 PM, Rajasekar Elango
rela...@salesforce.comwrote:
   
 Thanks Neha  Guozhang,

 When I ran StateChangeLogMerger, I am seeing this message repeated
 16
times
 for each partition:

 [2013-08-27 12:30:02,535] INFO [ReplicaFetcherManager on broker 1]
Removing
 fetcher for partition [test-60,13]
  (kafka.server.ReplicaFetcherManager)
 [2013-08-27 12:30:02,536] INFO [Log Manager on Broker 1] Created
 log
   for
 partition [test-60,13] in

  /home/relango/dev/mandm/kafka/main/target/dist/mandm-kafka/kafka-data.
 (kafka.log.LogManager)

 I am also seeing .log and .index files created for this topic in
 data
dir.
 Also list topic command shows leaders, replicas and isrs for all
 partitions. Do you still think increasing num of retries would
 help
  or
   is
 it some other issue..? Also console Producer doesn't seem to  have
   option
 to set num of retries. Is there a way to configure num of retries
 for
 console producer ?

 Thanks,
 Raja.


 On Tue, Aug 27, 2013 at 12:52 PM, Neha Narkhede 
   neha.narkh...@gmail.com
 wrote:

  As Guozhang said, your producer might give up sooner than the
  leader
  election completes for the new topic. To confirm if your
 producer
   gave
up
  too soon, you can run the state--
  Thanks,
  Raja.
 

 --
 -- Guozhang




-- 
Thanks,
Raja.


Re: Num of streams for consumers using TopicFilter.

2013-08-29 Thread Rajasekar Elango
Hi Jun,

If you read my previous posts, based on current re balancing logic, if we
consumer from topic filter, consumer actively use all streams. Can you
provide your recommendation of option 1 vs option 2 in my previous post?

Thanks,
Raja.


On Thu, Aug 29, 2013 at 11:42 PM, Jun Rao jun...@gmail.com wrote:

 You can always use more partitions to get more parallelism in the
 consumers.

 Thanks,

 Jun


 On Thu, Aug 29, 2013 at 12:44 PM, Rajasekar Elango
 rela...@salesforce.comwrote:

  So what is best way to load balance multiple consumers consuming from
 topic
  filter.
 
  Let's say we have 4 topics with 8 partitions and 2 consumers.
 
  Option 1) To load balance consumers, we can set num.streams=4 so that
 both
  consumers split 8 partitions. but can only use half of consumer streams.
 
  Option 2) Configure mutually exclusive topic filter regex such that 2
  topics will match consumer1 and 2 topics will match consumer2. Now we can
  set num.streams=8 and fully utilize consumer streams. I believe this will
  improve performance, but if consumer dies, we will not get any data from
  the topic used by that consumer.
 
  What would be your recommendation?
 
  Thanks,
  Raja.
 
 
  On Thu, Aug 29, 2013 at 12:42 PM, Neha Narkhede neha.narkh...@gmail.com
  wrote:
 
2) When I started mirrormaker with num.streams=16, looks like 16
   consumer
   threads were created, but only 8 are showing up as active as owner in
   consumer offset tracker and all topics/partitions are distributed
  between 8
   consumer threads.
  
   This is because currently the consumer rebalancing process of assigning
   partitions to consumer streams is at a per topic level. Unless you have
  at
   least one topic with 16 partitions, the remaining 8 threads will not do
  any
   work. This is not ideal and we want to look into a better rebalancing
   algorithm. Though it is a big change and we prefer doing it as part of
  the
   consumer client rewrite.
  
   Thanks,
   Neha
  
  
   On Thu, Aug 29, 2013 at 8:03 AM, Rajasekar Elango 
  rela...@salesforce.com
   wrote:
  
So my understanding is num of active streams that a consumer can
  utilize
   is
number of partitions in topic. This is fine if we consumer from
  specific
topic. But if we consumer from TopicFilter, I thought consumer should
   able
to utilize (number of topics that match filter * number of partitions
  in
topic) . But looks like number of streams that consumer can use is
   limited
by just number if partitions in topic although it's consuming from
   multiple
topic.
   
Here what I observed with 1 mirrormaker consuming from whitelist
 '.+'.
   
The white list matches 5 topics and each topic has 8 partitions. I
 used
consumer offset checker to look at owner of each/topic partition.
   
1) When I started mirrormaker with num.streams=8, all
 topics/partitions
   are
distributed between 8 consumer threads.
   
2) When I started mirrormaker with num.streams=16, looks like 16
  consumer
threads were created, but only 8 are showing up as active as owner in
consumer offset tracker and all topics/partitions are distributed
   between 8
consumer threads.
   
So this could be bottleneck for consumers as although we partitioned
   topic,
if we are consuming from topic filter it can't utilize much of
   parallelism
with num of streams. Am i missing something, is there a way to make
cosumers/mirrormakers to utilize more number of active streams?
   
   
--
Thanks,
Raja.
   
  
 
 
 
  --
  Thanks,
  Raja.
 




-- 
Thanks,
Raja.


Re: Num of streams for consumers using TopicFilter.

2013-08-29 Thread Rajasekar Elango
With option 1) I can't really use 8 streams in each consumer, If I do only
one consumer seem to be doing all work. So I had to actually use total 8
streams with 4 for each consumer.



On Fri, Aug 30, 2013 at 12:01 AM, Jun Rao jun...@gmail.com wrote:

 The drawback of 2), as you said is no auto failover. I was suggesting that
 you use 16 partitions. Then you can use option 1) with 8 streams in each
 consumer.

 Thanks,

 Jun


 On Thu, Aug 29, 2013 at 8:51 PM, Rajasekar Elango rela...@salesforce.com
 wrote:

  Hi Jun,
 
  If you read my previous posts, based on current re balancing logic, if we
  consumer from topic filter, consumer actively use all streams. Can you
  provide your recommendation of option 1 vs option 2 in my previous post?
 
  Thanks,
  Raja.
 
 
  On Thu, Aug 29, 2013 at 11:42 PM, Jun Rao jun...@gmail.com wrote:
 
   You can always use more partitions to get more parallelism in the
   consumers.
  
   Thanks,
  
   Jun
  
  
   On Thu, Aug 29, 2013 at 12:44 PM, Rajasekar Elango
   rela...@salesforce.comwrote:
  
So what is best way to load balance multiple consumers consuming from
   topic
filter.
   
Let's say we have 4 topics with 8 partitions and 2 consumers.
   
Option 1) To load balance consumers, we can set num.streams=4 so that
   both
consumers split 8 partitions. but can only use half of consumer
  streams.
   
Option 2) Configure mutually exclusive topic filter regex such that 2
topics will match consumer1 and 2 topics will match consumer2. Now we
  can
set num.streams=8 and fully utilize consumer streams. I believe this
  will
improve performance, but if consumer dies, we will not get any data
  from
the topic used by that consumer.
   
What would be your recommendation?
   
Thanks,
Raja.
   
   
On Thu, Aug 29, 2013 at 12:42 PM, Neha Narkhede 
  neha.narkh...@gmail.com
wrote:
   
  2) When I started mirrormaker with num.streams=16, looks like 16
 consumer
 threads were created, but only 8 are showing up as active as owner
 in
 consumer offset tracker and all topics/partitions are distributed
between 8
 consumer threads.

 This is because currently the consumer rebalancing process of
  assigning
 partitions to consumer streams is at a per topic level. Unless you
  have
at
 least one topic with 16 partitions, the remaining 8 threads will
 not
  do
any
 work. This is not ideal and we want to look into a better
 rebalancing
 algorithm. Though it is a big change and we prefer doing it as part
  of
the
 consumer client rewrite.

 Thanks,
 Neha


 On Thu, Aug 29, 2013 at 8:03 AM, Rajasekar Elango 
rela...@salesforce.com
 wrote:

  So my understanding is num of active streams that a consumer can
utilize
 is
  number of partitions in topic. This is fine if we consumer from
specific
  topic. But if we consumer from TopicFilter, I thought consumer
  should
 able
  to utilize (number of topics that match filter * number of
  partitions
in
  topic) . But looks like number of streams that consumer can use
 is
 limited
  by just number if partitions in topic although it's consuming
 from
 multiple
  topic.
 
  Here what I observed with 1 mirrormaker consuming from whitelist
   '.+'.
 
  The white list matches 5 topics and each topic has 8 partitions.
 I
   used
  consumer offset checker to look at owner of each/topic partition.
 
  1) When I started mirrormaker with num.streams=8, all
   topics/partitions
 are
  distributed between 8 consumer threads.
 
  2) When I started mirrormaker with num.streams=16, looks like 16
consumer
  threads were created, but only 8 are showing up as active as
 owner
  in
  consumer offset tracker and all topics/partitions are distributed
 between 8
  consumer threads.
 
  So this could be bottleneck for consumers as although we
  partitioned
 topic,
  if we are consuming from topic filter it can't utilize much of
 parallelism
  with num of streams. Am i missing something, is there a way to
 make
  cosumers/mirrormakers to utilize more number of active streams?
 
 
  --
  Thanks,
  Raja.
 

   
   
   
--
Thanks,
Raja.
   
  
 
 
 
  --
  Thanks,
  Raja.
 




-- 
Thanks,
Raja.


Re: Changing the number of partitions after a topic is created

2013-08-28 Thread Rajasekar Elango
Hi Jun,

It's been some time since you last post, is this patch available now. Also
we are doing following manual steps to update existing topic to use new
partitions.

1) stop all zookeepers
2) stop all kafka brokers
3) clean data dir or zookeepers and kafka
4) start zookeepers and kafka.

This involves both downtime and loosing data. Is there a better way to
update existing topics to use new partitions until patch is available..?

-- 
Thanks,
Raja.


Re: Getting LeaderNotAvailableException in console producer after increasing partitions from 4 to 16.

2013-08-28 Thread Rajasekar Elango
Thanks, This is small fix to ConsoleProducer.scala only. Will use 0.8
branch.

Thanks,
Raja.


On Wed, Aug 28, 2013 at 12:49 PM, Neha Narkhede neha.narkh...@gmail.comwrote:

 Rajasekar,

 We are trying to minimize the number of patches in 0.8 to critical bug
 fixes or broken tooling. If the patch involves significant code changes, we
 would encourage taking it on trunk. If you want to just fix the console
 producer to take the retry argument, I would think it is small enough to
 consider taking it on 0.8 branch since it affects the usability of the
 console producer.

 Thanks,
 Neha


 On Wed, Aug 28, 2013 at 8:36 AM, Rajasekar Elango rela...@salesforce.com
 wrote:

  Guozhang ,
 
  *The documentation says I need to work off of trunk. Can you confirm If I
  should be working in trunk or different branch.*
  *
  *
  *Thanks,*
  *Raja.*
 
 
  On Tue, Aug 27, 2013 at 8:33 PM, Guozhang Wang wangg...@gmail.com
 wrote:
 
   Cool! You can follow the process of creating a JIRA here:
  
   http://kafka.apache.org/contributing.html
  
   And submit patch here:
  
   https://cwiki.apache.org/confluence/display/KAFKA/Git+Workflow
  
   It will be great if you can also add an entry for this issue in FAQ
  since I
   think this is a common question:
  
   https://cwiki.apache.org/confluence/display/KAFKA/FAQ
  
   Guozhang
  
  
   On Tue, Aug 27, 2013 at 2:38 PM, Rajasekar Elango 
  rela...@salesforce.com
   wrote:
  
Thanks Guozhang, Changing max retry to 5 worked. Since I am changing
console producer code, I can also submit patch adding both
message.send.max.retries
and retry.backoff.ms to console producer. Can you let me know
 process
   for
submitting patch?
   
Thanks,
Raja.
   
   
On Tue, Aug 27, 2013 at 4:03 PM, Guozhang Wang wangg...@gmail.com
   wrote:
   
 Hello Rajasekar,

 The remove fetcher log entry is normal under addition of
 partitions,
since
 they indicate that some leader changes have happened so brokers are
closing
 the fetchers to the old leaders.

 I just realized that the console Producer does not have the
 message.send.max.retries options yet. Could you file a JIRA for
 this
   and
I
 will followup to add this option? As for now you can hard modify
 the
 default value from 3 to a larger number.

 Guozhang


 On Tue, Aug 27, 2013 at 12:37 PM, Rajasekar Elango
 rela...@salesforce.comwrote:

  Thanks Neha  Guozhang,
 
  When I ran StateChangeLogMerger, I am seeing this message
 repeated
  16
 times
  for each partition:
 
  [2013-08-27 12:30:02,535] INFO [ReplicaFetcherManager on broker
 1]
 Removing
  fetcher for partition [test-60,13]
   (kafka.server.ReplicaFetcherManager)
  [2013-08-27 12:30:02,536] INFO [Log Manager on Broker 1] Created
  log
for
  partition [test-60,13] in
 
   /home/relango/dev/mandm/kafka/main/target/dist/mandm-kafka/kafka-data.
  (kafka.log.LogManager)
 
  I am also seeing .log and .index files created for this topic in
  data
 dir.
  Also list topic command shows leaders, replicas and isrs for all
  partitions. Do you still think increasing num of retries would
 help
   or
is
  it some other issue..? Also console Producer doesn't seem to
  have
option
  to set num of retries. Is there a way to configure num of retries
  for
  console producer ?
 
  Thanks,
  Raja.
 
 
  On Tue, Aug 27, 2013 at 12:52 PM, Neha Narkhede 
neha.narkh...@gmail.com
  wrote:
 
   As Guozhang said, your producer might give up sooner than the
   leader
   election completes for the new topic. To confirm if your
 producer
gave
 up
   too soon, you can run the state change log merge tool for this
   topic
 and
   see when the leader election finished for all partitions
  
   ./bin/kafka-run-class.sh kafka.tools.StateChangeLogMerger
 --logs
  location
   to all state change logs --topic topic
  
   Note that this tool requires you to give the state change logs
  for
all
   brokers in the cluster.
  
  
   Thanks,
   Neha
  
  
   On Tue, Aug 27, 2013 at 9:45 AM, Guozhang Wang 
  wangg...@gmail.com
   
  wrote:
  
Hello Rajasekar,
   
In 0.8 producers keep a cache of the partition -
   leader_broker_id
 map
which is used to determine to which brokers should the
 messages
   be
  sent.
After new partitions are added, the cache on the producer has
  not
   populated
yet hence it will throw this exception. The producer will
 then
   try
to
refresh its cache by asking the brokers who are the leaders
 of
these
  new
partitions that I do not know of before. The brokers at the
 beginning
   also
do not know this information, and will only get this
  information
from

Getting LeaderNotAvailableException in console producer after increasing partitions from 4 to 16.

2013-08-27 Thread Rajasekar Elango
Hello everyone,

We recently increased number of partitions from 4 to 16 and after that
console producer mostly fails with LeaderNotAvailableException and exits
after 3 tries:

Here is last few lines of console producer log:

No partition metadata for topic test-41 due to
kafka.common.LeaderNotAvailableException}] for topic [test-41]: class
kafka.common.LeaderNotAvailableException
 (kafka.producer.BrokerPartitionInfo)
[2013-08-27 08:29:30,271] ERROR Failed to collate messages by topic,
partition due to: Failed to fetch topic metadata for topic: test-41
(kafka.producer.async.DefaultEventHandler)
[2013-08-27 08:29:30,271] INFO Back off for 100 ms before retrying send.
Remaining retries = 0 (kafka.producer.async.DefaultEventHandler)
[2013-08-27 08:29:30,372] INFO Secure sockets for data transfer is enabled
(kafka.producer.SyncProducerConfig)
[2013-08-27 08:29:30,372] INFO Fetching metadata from broker
id:0,host:localhost,port:6667,secure:true with correlation id 8 for 1
topic(s) Set(test-41) (kafka.client.ClientUtils$)
[2013-08-27 08:29:30,373] INFO begin ssl handshake for localhost/
127.0.0.1:6667//127.0.0.1:36640 (kafka.security.SSLSocketChannel)
[2013-08-27 08:29:30,375] INFO finished ssl handshake for localhost/
127.0.0.1:6667//127.0.0.1:36640 (kafka.security.SSLSocketChannel)
[2013-08-27 08:29:30,375] INFO Connected to localhost:6667:true for
producing (kafka.producer.SyncProducer)
[2013-08-27 08:29:30,380] INFO Disconnecting from localhost:6667:true
(kafka.producer.SyncProducer)
[2013-08-27 08:29:30,381] INFO Secure sockets for data transfer is enabled
(kafka.producer.SyncProducerConfig)
[2013-08-27 08:29:30,381] ERROR Failed to send requests for topics test-41
with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler)
kafka.common.FailedToSendMessageException: Failed to send messages after 3
tries.
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
at kafka.producer.Producer.send(Producer.scala:74)
at kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:168)
at kafka.producer.ConsoleProducer.main(ConsoleProducer.scala)
[2013-08-27 08:29:30,383] INFO Shutting down producer
(kafka.producer.Producer)
[2013-08-27 08:29:30,384] INFO Closing all sync producers
(kafka.producer.ProducerPool)


Also, this happens only for new topics (we have auto.create.topic set to
true), If retry sending message to existing topic, it works fine. Is there
any tweaking I need to do to broker or to producer to scale based on number
of partitions?

-- 
Thanks in advance for help,
Raja.


Re: Getting LeaderNotAvailableException in console producer after increasing partitions from 4 to 16.

2013-08-27 Thread Rajasekar Elango
Thanks Neha  Guozhang,

When I ran StateChangeLogMerger, I am seeing this message repeated 16 times
for each partition:

[2013-08-27 12:30:02,535] INFO [ReplicaFetcherManager on broker 1] Removing
fetcher for partition [test-60,13] (kafka.server.ReplicaFetcherManager)
[2013-08-27 12:30:02,536] INFO [Log Manager on Broker 1] Created log for
partition [test-60,13] in
/home/relango/dev/mandm/kafka/main/target/dist/mandm-kafka/kafka-data.
(kafka.log.LogManager)

I am also seeing .log and .index files created for this topic in data dir.
Also list topic command shows leaders, replicas and isrs for all
partitions. Do you still think increasing num of retries would help or is
it some other issue..? Also console Producer doesn't seem to  have option
to set num of retries. Is there a way to configure num of retries for
console producer ?

Thanks,
Raja.


On Tue, Aug 27, 2013 at 12:52 PM, Neha Narkhede neha.narkh...@gmail.comwrote:

 As Guozhang said, your producer might give up sooner than the leader
 election completes for the new topic. To confirm if your producer gave up
 too soon, you can run the state change log merge tool for this topic and
 see when the leader election finished for all partitions

 ./bin/kafka-run-class.sh kafka.tools.StateChangeLogMerger --logs location
 to all state change logs --topic topic

 Note that this tool requires you to give the state change logs for all
 brokers in the cluster.


 Thanks,
 Neha


 On Tue, Aug 27, 2013 at 9:45 AM, Guozhang Wang wangg...@gmail.com wrote:

  Hello Rajasekar,
 
  In 0.8 producers keep a cache of the partition - leader_broker_id map
  which is used to determine to which brokers should the messages be sent.
  After new partitions are added, the cache on the producer has not
 populated
  yet hence it will throw this exception. The producer will then try to
  refresh its cache by asking the brokers who are the leaders of these new
  partitions that I do not know of before. The brokers at the beginning
 also
  do not know this information, and will only get this information from
  controller which will only propagation the leader information after the
  leader elections have all been finished.
 
  If you set num.retries to 3 then it is possible that producer gives up
 too
  soon before the leader info ever propagated to producers, hence to
  producers also. Could you try to increase producer.num.retries and see if
  the producer can eventually succeed in re-trying?
 
  Guozhang
 
 
  On Tue, Aug 27, 2013 at 8:53 AM, Rajasekar Elango 
 rela...@salesforce.com
  wrote:
 
   Hello everyone,
  
   We recently increased number of partitions from 4 to 16 and after that
   console producer mostly fails with LeaderNotAvailableException and
 exits
   after 3 tries:
  
   Here is last few lines of console producer log:
  
   No partition metadata for topic test-41 due to
   kafka.common.LeaderNotAvailableException}] for topic [test-41]: class
   kafka.common.LeaderNotAvailableException
(kafka.producer.BrokerPartitionInfo)
   [2013-08-27 08:29:30,271] ERROR Failed to collate messages by topic,
   partition due to: Failed to fetch topic metadata for topic: test-41
   (kafka.producer.async.DefaultEventHandler)
   [2013-08-27 08:29:30,271] INFO Back off for 100 ms before retrying
 send.
   Remaining retries = 0 (kafka.producer.async.DefaultEventHandler)
   [2013-08-27 08:29:30,372] INFO Secure sockets for data transfer is
  enabled
   (kafka.producer.SyncProducerConfig)
   [2013-08-27 08:29:30,372] INFO Fetching metadata from broker
   id:0,host:localhost,port:6667,secure:true with correlation id 8 for 1
   topic(s) Set(test-41) (kafka.client.ClientUtils$)
   [2013-08-27 08:29:30,373] INFO begin ssl handshake for localhost/
   127.0.0.1:6667//127.0.0.1:36640 (kafka.security.SSLSocketChannel)
   [2013-08-27 08:29:30,375] INFO finished ssl handshake for localhost/
   127.0.0.1:6667//127.0.0.1:36640 (kafka.security.SSLSocketChannel)
   [2013-08-27 08:29:30,375] INFO Connected to localhost:6667:true for
   producing (kafka.producer.SyncProducer)
   [2013-08-27 08:29:30,380] INFO Disconnecting from localhost:6667:true
   (kafka.producer.SyncProducer)
   [2013-08-27 08:29:30,381] INFO Secure sockets for data transfer is
  enabled
   (kafka.producer.SyncProducerConfig)
   [2013-08-27 08:29:30,381] ERROR Failed to send requests for topics
  test-41
   with correlation ids in [0,8]
 (kafka.producer.async.DefaultEventHandler)
   kafka.common.FailedToSendMessageException: Failed to send messages
 after
  3
   tries.
   at
  
  
 
 kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
   at kafka.producer.Producer.send(Producer.scala:74)
   at
  kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:168)
   at kafka.producer.ConsoleProducer.main(ConsoleProducer.scala)
   [2013-08-27 08:29:30,383] INFO Shutting down producer
   (kafka.producer.Producer)
   [2013-08-27 08:29:30,384] INFO Closing all sync producers

Re: Getting LeaderNotAvailableException in console producer after increasing partitions from 4 to 16.

2013-08-27 Thread Rajasekar Elango
Thanks Guozhang, Changing max retry to 5 worked. Since I am changing
console producer code, I can also submit patch adding both
message.send.max.retries
and retry.backoff.ms to console producer. Can you let me know process for
submitting patch?

Thanks,
Raja.


On Tue, Aug 27, 2013 at 4:03 PM, Guozhang Wang wangg...@gmail.com wrote:

 Hello Rajasekar,

 The remove fetcher log entry is normal under addition of partitions, since
 they indicate that some leader changes have happened so brokers are closing
 the fetchers to the old leaders.

 I just realized that the console Producer does not have the
 message.send.max.retries options yet. Could you file a JIRA for this and I
 will followup to add this option? As for now you can hard modify the
 default value from 3 to a larger number.

 Guozhang


 On Tue, Aug 27, 2013 at 12:37 PM, Rajasekar Elango
 rela...@salesforce.comwrote:

  Thanks Neha  Guozhang,
 
  When I ran StateChangeLogMerger, I am seeing this message repeated 16
 times
  for each partition:
 
  [2013-08-27 12:30:02,535] INFO [ReplicaFetcherManager on broker 1]
 Removing
  fetcher for partition [test-60,13] (kafka.server.ReplicaFetcherManager)
  [2013-08-27 12:30:02,536] INFO [Log Manager on Broker 1] Created log for
  partition [test-60,13] in
  /home/relango/dev/mandm/kafka/main/target/dist/mandm-kafka/kafka-data.
  (kafka.log.LogManager)
 
  I am also seeing .log and .index files created for this topic in data
 dir.
  Also list topic command shows leaders, replicas and isrs for all
  partitions. Do you still think increasing num of retries would help or is
  it some other issue..? Also console Producer doesn't seem to  have option
  to set num of retries. Is there a way to configure num of retries for
  console producer ?
 
  Thanks,
  Raja.
 
 
  On Tue, Aug 27, 2013 at 12:52 PM, Neha Narkhede neha.narkh...@gmail.com
  wrote:
 
   As Guozhang said, your producer might give up sooner than the leader
   election completes for the new topic. To confirm if your producer gave
 up
   too soon, you can run the state change log merge tool for this topic
 and
   see when the leader election finished for all partitions
  
   ./bin/kafka-run-class.sh kafka.tools.StateChangeLogMerger --logs
  location
   to all state change logs --topic topic
  
   Note that this tool requires you to give the state change logs for all
   brokers in the cluster.
  
  
   Thanks,
   Neha
  
  
   On Tue, Aug 27, 2013 at 9:45 AM, Guozhang Wang wangg...@gmail.com
  wrote:
  
Hello Rajasekar,
   
In 0.8 producers keep a cache of the partition - leader_broker_id
 map
which is used to determine to which brokers should the messages be
  sent.
After new partitions are added, the cache on the producer has not
   populated
yet hence it will throw this exception. The producer will then try to
refresh its cache by asking the brokers who are the leaders of these
  new
partitions that I do not know of before. The brokers at the
 beginning
   also
do not know this information, and will only get this information from
controller which will only propagation the leader information after
 the
leader elections have all been finished.
   
If you set num.retries to 3 then it is possible that producer gives
 up
   too
soon before the leader info ever propagated to producers, hence to
producers also. Could you try to increase producer.num.retries and
 see
  if
the producer can eventually succeed in re-trying?
   
Guozhang
   
   
On Tue, Aug 27, 2013 at 8:53 AM, Rajasekar Elango 
   rela...@salesforce.com
wrote:
   
 Hello everyone,

 We recently increased number of partitions from 4 to 16 and after
  that
 console producer mostly fails with LeaderNotAvailableException and
   exits
 after 3 tries:

 Here is last few lines of console producer log:

 No partition metadata for topic test-41 due to
 kafka.common.LeaderNotAvailableException}] for topic [test-41]:
 class
 kafka.common.LeaderNotAvailableException
  (kafka.producer.BrokerPartitionInfo)
 [2013-08-27 08:29:30,271] ERROR Failed to collate messages by
 topic,
 partition due to: Failed to fetch topic metadata for topic: test-41
 (kafka.producer.async.DefaultEventHandler)
 [2013-08-27 08:29:30,271] INFO Back off for 100 ms before retrying
   send.
 Remaining retries = 0 (kafka.producer.async.DefaultEventHandler)
 [2013-08-27 08:29:30,372] INFO Secure sockets for data transfer is
enabled
 (kafka.producer.SyncProducerConfig)
 [2013-08-27 08:29:30,372] INFO Fetching metadata from broker
 id:0,host:localhost,port:6667,secure:true with correlation id 8
 for 1
 topic(s) Set(test-41) (kafka.client.ClientUtils$)
 [2013-08-27 08:29:30,373] INFO begin ssl handshake for localhost/
 127.0.0.1:6667//127.0.0.1:36640 (kafka.security.SSLSocketChannel)
 [2013-08-27 08:29:30,375] INFO finished ssl handshake for
 localhost/
 127.0.0.1:6667

Fwd: Tuning mirror maker performance

2013-08-23 Thread Rajasekar Elango
Thanks Jun,

What trouble shooting steps can we do to identify if bottleneck is with
consuming or producing..? Does changing anything in log4j configuration or
a jmx mbeans provide insight into it..? Does Metadata refresh interval
affect picking up new partitions for only existing topic or it affect
picking up any new topics..?

Thanks,
Raja.

-- Forwarded message --
From: Jun Rao jun...@gmail.com
Date: Fri, Aug 23, 2013 at 12:08 AM
Subject: Re: Tuning mirror maker performance
To: users@kafka.apache.org users@kafka.apache.org


You have to determine whether the bottleneck is in the consumer or the
producer.

To improve the performance of the latter, you can increase the # of total
consumer streams. # streams is capped by total # partitions. So, you may
need to increase the # of partitions.

To improve the performance of the latter, you can (a) increase the batch
size in async mode and/or (b) run more instances of producers.

Metadata refresh interval is configurable. It's mainly for the producer to
pick up newly available partitions.

Thanks,

Jun


On Thu, Aug 22, 2013 at 1:44 PM, Rajasekar Elango rela...@salesforce.com
wrote:

 I am trying to tune mirrormaker configurations based on this doc
 

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+mirroring+(MirrorMaker)#Kafkamirroring%28MirrorMaker%29-Consumerandsourceclustersocketbuffersizes
 
 and
 would like know your recommendations.

 Our configuration: We are doing inter datacenter replication with 5
brokers
 in source and destination DC and 2 mirrormakers doing replication. We have
 about 4 topics with 4 partitions each.
 I have been consumerOffsetChecker to analysis lag based on tuning.


1. num.streams : - We have set num.streams=2 so that 4 partitions will
be shared between 2 mirrormaker. Increasing num.streams more than this
 did
not improve any performance, is this correct?
2. num.producers:- We initially set num.producers = 4 (assuming one
producer thread per topic), then we bumped num.producers = 16, but did
 not
see any improvement in performance..? Is this correct..? How do we
determine optimum value for num.producers ?
3. *socket.buffersize : *We initially had default values for these,
then
I changed socket.send.buffer.bytes on source broker,
socket.receive.buffer.bytes, fetch.message.max.bytes on mirrormaker
consumer properties, socket.receive.buffer.bytes,
socket.request.max.bytes on destination broker all to
1024*1024*1024(1073741824) . This did improve the performance, but I
 could
not get Lag to  100.

Here is how our lag looks like after above changes:

 Group   Topic  Pid Offset
 logSize  Lag Owner
 mirrormakerProd FunnelProto0   554704539
 554717088   12549
 mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-0
 mirrormakerProd FunnelProto1   547370573
 547383136   12563
 mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-1
 mirrormakerProd FunnelProto2   553124930
 553125742   812
 mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-0
 mirrormakerProd FunnelProto3   552990834
 552991650   816
 mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-1
 mirrormakerProd agent  0   35438   35440
 2
 mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-0
 mirrormakerProd agent  1   35447   35448
 1
 mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-1
 mirrormakerProd agent  2   35375   35375
 0
 mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-0
 mirrormakerProd agent  3   35336   35336
 0
 mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-1
 mirrormakerProd internal_metrics   0   1930852823
  1930917418  64595
 mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-0
 mirrormakerProd internal_metrics   1   1937237324
  1937301841  64517
 mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-1
 mirrormakerProd internal_metrics   2   1945894901
  1945904067  9166
  mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-0
 mirrormakerProd internal_metrics   3   1946906932
  1946915928  8996
  mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-1
 mirrormakerProd jmx0   485270038
 485280882   10844
 mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-0
 mirrormakerProd jmx1   486363914
 486374759   10845
 mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-1
 mirrormakerProd jmx

Re: Differences in size of data replicated by mirror maker

2013-08-23 Thread Rajasekar Elango
Thanks Guazhang, Jun,

Yes we doing gzip compression and that should be reason for difference in
disk usage. I had a typo that the size is actually 91G in source cluster.So
25G/91G ratio makes sense for compression.

Thanks,
Raja.


On Thu, Aug 22, 2013 at 7:00 PM, Guozhang Wang wangg...@gmail.com wrote:

 When you state the numbers, are they the same across instances in the
 cluster, meaning that Topic-0 would have 910*5 GB in source cluster and
 25*5 GB in target cluster?

 Another possibility is that MirrorMaker uses compression on the producer
 side, but I would be surprised if the compression rate could be 25/910.

 Guozhang


 On Thu, Aug 22, 2013 at 3:48 PM, Rajasekar Elango rela...@salesforce.com
 wrote:

  Yes, both source and target clusters have 5 brokers in cluster.
 
  Sent from my iPhone
 
  On Aug 22, 2013, at 6:11 PM, Guozhang Wang wangg...@gmail.com wrote:
 
   Hello Rajasekar,
  
   Are the size of the source cluster and target cluster the same?
  
   Guozhang
  
  
   On Thu, Aug 22, 2013 at 2:14 PM, Rajasekar Elango 
  rela...@salesforce.comwrote:
  
   Hi,
  
   We are using mirrormaker to replicate data between two kafka clusters.
  I am
   seeing huge difference in size of log in data dir between the broker
 in
   source cluster vs broker in destination cluster:
  
   For eg: Size of ~/data/Topic-0/ is about 910 G in source broker, but
  only
   its only 25G in destination broker. I see segmented log files (~500 M)
  is
   created for about every 2 or 3 mins in source brokers, but I see
  segmented
   log files is created for about every 25 mins in destination broker.
  
   I verified mirrormaker is doing fine using consumer offset checker,
 not
   much lag, offsets are incrementing. I also verified that
  topics/partitions
   are not under replicated in both source and target cluster. What is
 the
   reason for this difference in disk usage?
  
  
   --
   Thanks,
   Raja.
  
  
  
   --
   -- Guozhang
 



 --
 -- Guozhang




-- 
Thanks,
Raja.


Tuning mirror maker performance

2013-08-22 Thread Rajasekar Elango
I am trying to tune mirrormaker configurations based on this doc
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+mirroring+(MirrorMaker)#Kafkamirroring%28MirrorMaker%29-Consumerandsourceclustersocketbuffersizes
and
would like know your recommendations.

Our configuration: We are doing inter datacenter replication with 5 brokers
in source and destination DC and 2 mirrormakers doing replication. We have
about 4 topics with 4 partitions each.
I have been consumerOffsetChecker to analysis lag based on tuning.


   1. num.streams : - We have set num.streams=2 so that 4 partitions will
   be shared between 2 mirrormaker. Increasing num.streams more than this did
   not improve any performance, is this correct?
   2. num.producers:- We initially set num.producers = 4 (assuming one
   producer thread per topic), then we bumped num.producers = 16, but did not
   see any improvement in performance..? Is this correct..? How do we
   determine optimum value for num.producers ?
   3. *socket.buffersize : *We initially had default values for these, then
   I changed socket.send.buffer.bytes on source broker,
   socket.receive.buffer.bytes, fetch.message.max.bytes on mirrormaker
   consumer properties, socket.receive.buffer.bytes,
   socket.request.max.bytes on destination broker all to
   1024*1024*1024(1073741824) . This did improve the performance, but I could
   not get Lag to  100.

   Here is how our lag looks like after above changes:

Group   Topic  Pid Offset
logSize  Lag Owner
mirrormakerProd FunnelProto0   554704539
554717088   12549
mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-0
mirrormakerProd FunnelProto1   547370573
547383136   12563
mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-1
mirrormakerProd FunnelProto2   553124930
553125742   812
mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-0
mirrormakerProd FunnelProto3   552990834
552991650   816
mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-1
mirrormakerProd agent  0   35438   35440
2
mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-0
mirrormakerProd agent  1   35447   35448
1
mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-1
mirrormakerProd agent  2   35375   35375
0
mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-0
mirrormakerProd agent  3   35336   35336
0
mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-1
mirrormakerProd internal_metrics   0   1930852823
 1930917418  64595
mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-0
mirrormakerProd internal_metrics   1   1937237324
 1937301841  64517
mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-1
mirrormakerProd internal_metrics   2   1945894901
 1945904067  9166
 mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-0
mirrormakerProd internal_metrics   3   1946906932
 1946915928  8996
 mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-1
mirrormakerProd jmx0   485270038
485280882   10844
mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-0
mirrormakerProd jmx1   486363914
486374759   10845
mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-1
mirrormakerProd jmx2   491783842
491784826   984
mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-0
mirrormakerProd jmx3   485675629
485676643   1014
 mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-1

In mirrormaker logs, I see topic metadata is fetched after every 10mins and
connection reestablished with producers for producing. Is this normal? If
it's continuously producing, why does it need to reconnect to destination
brokers for producing.?
What else can we tune to bring lag  100 ..?  This is just small set of
data we are currently testing, the real production traffic will be very
large. How can compute optimum configuration as data traffic increases.?

Thanks for help,

Thanks,
Raja.


Re: Tuning mirror maker performance

2013-08-22 Thread Rajasekar Elango
Hi,

I am trying to tune mirrormaker configurations based on this doc
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+mirroring+(MirrorMaker)#Kafkamirroring%28MirrorMaker%29-Consumerandsourceclustersocketbuffersizes
and
would like know your recommendations.

Our configuration: We are doing inter datacenter replication with 5 brokers
in source and destination DC and 2 mirrormakers doing replication. We have
about 4 topics with 4 partitions each.
I have been consumerOffsetChecker to analysis lag based on tuning.


   1. num.streams : - We have set num.streams=2 so that 4 partitions will
   be shared between 2 mirrormaker. Increasing num.streams more than this did
   not improve any performance, is this correct?
   2. num.producers:- We initially set num.producers = 4 (assuming one
   producer thread per topic), then we bumped num.producers = 16, but did not
   see any improvement in performance..? Is this correct..? How do we
   determine optimum value for num.producers ?
   3. *socket.buffersize : *We initially had default values for these, then
   I changed socket.send.buffer.bytes on source broker,
   socket.receive.buffer.bytes, fetch.message.max.bytes on mirrormaker
   consumer properties, socket.receive.buffer.bytes,
   socket.request.max.bytes on destination broker all to
   1024*1024*1024(1073741824) . This did improve the performance, but I could
   not get Lag to  100.

   Here is how our lag looks like after above changes:

Group   Topic  Pid Offset
logSize  Lag Owner
mirrormakerProd FunnelProto0   554704539
554717088   12549
mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-0
mirrormakerProd FunnelProto1   547370573
547383136   12563
mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-1
mirrormakerProd FunnelProto2   553124930
553125742   812
mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-0
mirrormakerProd FunnelProto3   552990834
552991650   816
mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-1
mirrormakerProd agent  0   35438   35440
2
mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-0
mirrormakerProd agent  1   35447   35448
1
mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-1
mirrormakerProd agent  2   35375   35375
0
mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-0
mirrormakerProd agent  3   35336   35336
0
mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-1
mirrormakerProd internal_metrics   0   1930852823
 1930917418  64595
mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-0
mirrormakerProd internal_metrics   1   1937237324
 1937301841  64517
mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-1
mirrormakerProd internal_metrics   2   1945894901
 1945904067  9166
 mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-0
mirrormakerProd internal_metrics   3   1946906932
 1946915928  8996
 mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-1
mirrormakerProd jmx0   485270038
485280882   10844
mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-0
mirrormakerProd jmx1   486363914
486374759   10845
mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-1
mirrormakerProd jmx2   491783842
491784826   984
mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-0
mirrormakerProd jmx3   485675629
485676643   1014
 mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-1

In mirrormaker logs, I see topic metadata is fetched after every 10mins and
connection reestablished with producers for producing. Is this normal? If
it's continuously producing, why does it need to reconnect to destination
brokers for producing.?
What else can we tune to bring lag  100 ..?  This is just small set of
data we are currently testing, the real production traffic will be very
large. How can compute optimum configuration as data traffic increases.?

Thanks for help,

Thanks,
Raja.


On Thu, Aug 22, 2013 at 4:44 PM, Rajasekar Elango rela...@salesforce.comwrote:

 I am trying to tune mirrormaker configurations based on this doc 
 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+mirroring+(MirrorMaker)#Kafkamirroring%28MirrorMaker%29-Consumerandsourceclustersocketbuffersizes
  and
 would like know your recommendations.

 Our configuration: We are doing inter datacenter replication with 5
 brokers in source and destination DC and 2 mirrormakers

Differences in size of data replicated by mirror maker

2013-08-22 Thread Rajasekar Elango
Hi,

We are using mirrormaker to replicate data between two kafka clusters. I am
seeing huge difference in size of log in data dir between the broker in
source cluster vs broker in destination cluster:

For eg: Size of ~/data/Topic-0/ is about 910 G in source broker, but only
its only 25G in destination broker. I see segmented log files (~500 M) is
created for about every 2 or 3 mins in source brokers, but I see segmented
log files is created for about every 25 mins in destination broker.

I verified mirrormaker is doing fine using consumer offset checker, not
much lag, offsets are incrementing. I also verified that topics/partitions
are not under replicated in both source and target cluster. What is the
reason for this difference in disk usage?


-- 
Thanks,
Raja.


Re: Differences in size of data replicated by mirror maker

2013-08-22 Thread Rajasekar Elango
Yes, both source and target clusters have 5 brokers in cluster.

Sent from my iPhone

On Aug 22, 2013, at 6:11 PM, Guozhang Wang wangg...@gmail.com wrote:

 Hello Rajasekar,

 Are the size of the source cluster and target cluster the same?

 Guozhang


 On Thu, Aug 22, 2013 at 2:14 PM, Rajasekar Elango 
 rela...@salesforce.comwrote:

 Hi,

 We are using mirrormaker to replicate data between two kafka clusters. I am
 seeing huge difference in size of log in data dir between the broker in
 source cluster vs broker in destination cluster:

 For eg: Size of ~/data/Topic-0/ is about 910 G in source broker, but only
 its only 25G in destination broker. I see segmented log files (~500 M) is
 created for about every 2 or 3 mins in source brokers, but I see segmented
 log files is created for about every 25 mins in destination broker.

 I verified mirrormaker is doing fine using consumer offset checker, not
 much lag, offsets are incrementing. I also verified that topics/partitions
 are not under replicated in both source and target cluster. What is the
 reason for this difference in disk usage?


 --
 Thanks,
 Raja.



 --
 -- Guozhang