Re: Consumer group describe issue

2018-02-22 Thread Bill Bejeck
Can someone add Sahil to the contributor list?

Thanks,
Bill

On Thu, Feb 22, 2018 at 3:39 AM, sahil aggarwal 
wrote:

> Bill,
>
> I have created the jira: https://issues.apache.org/jira/browse/KAFKA-6581
> but i am not able to assign it to myself.
>
> Can you please help?
>
>
> Thanks,
> Sahil
>
> On 28 December 2017 at 11:50, sahil aggarwal 
> wrote:
>
> > @TedYu
> > From 0.10.0 
> >
> > @Bill
> > Thanks for the pointer. Will follow the steps mentioned in the doc.
> >
> > On 28 December 2017 at 07:39, Ted Yu  wrote:
> >
> >> Which branch was the patch generated from ?
> >> When I tried to apply the patch:
> >>
> >> 6 out of 7 hunks FAILED -- saving rejects to file
> >> core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala.rej
> >>
> >> FYI
> >>
> >> On Mon, Dec 25, 2017 at 9:45 PM, sahil aggarwal 
> >> wrote:
> >>
> >> > Attached the patch. If someone can review it will be very helpfull.
> >> >
> >> >
> >> >
> >> > Thanks,
> >> > sahil
> >> >
> >> > On 23 December 2017 at 13:11, sahil aggarwal 
> >> > wrote:
> >> >
> >> >> my bad its  consumer.endOffsets in ConsumerGroupCommand.scala.
> >> >>
> >> >> https://github.com/apache/kafka/blob/ef97ed7ee5cb883a30245ea
> >> >> 4f77ef99d4db15373/core/src/main/scala/kafka/admin/Consume
> >> >> rGroupCommand.scala#L467
> >> >>
> >> >> On 23 December 2017 at 13:07, Ted Yu  wrote:
> >> >>
> >> >>> Sahil:
> >> >>> I did a quick search in 0.11.0 branch and trunk for getEndOffsets
> but
> >> >>> didn't find any occurrence.
> >> >>>
> >> >>> Mind giving us the location (and class) where getEndOffsets is
> called
> >> ?
> >> >>>
> >> >>> Thanks
> >> >>>
> >> >>> On Fri, Dec 22, 2017 at 11:29 PM, sahil aggarwal <
> >> sahil.ag...@gmail.com>
> >> >>> wrote:
> >> >>>
> >> >>> > Fixed it by some code change in ConsumerGroupCommand.scala.
> >> Possible to
> >> >>> > push it upstream for 0.10.* ?
> >> >>> >
> >> >>> > It seems to be fixed in 0.11.* where it uses getEndOffsets() which
> >> has
> >> >>> > requestTimeoutMs instead of Long.MAX_VALUE.
> >> >>> >
> >> >>> > On 23 December 2017 at 02:46, Matthias J. Sax <
> >> matth...@confluent.io>
> >> >>> > wrote:
> >> >>> >
> >> >>> > > Your observation is correct. KafkaConsumer.position() is a
> >> blocking
> >> >>> > > call. It's a know issue that there is no configurable timeout
> >> value.
> >> >>> > >
> >> >>> > > I am not aware of any workaround.
> >> >>> > >
> >> >>> > >
> >> >>> > > -Matthias
> >> >>> > >
> >> >>> > > On 12/21/17 6:05 AM, sahil aggarwal wrote:
> >> >>> > > > Hi,
> >> >>> > > >
> >> >>> > > > Facing issue where *kafka-consumer-groups.sh --describe * get
> >> >>> stuck if
> >> >>> > > one
> >> >>> > > > of the partition is unavailable i.e no leader. Going through
> >> some
> >> >>> code
> >> >>> > > > found that it does following to get log end offset:
> >> >>> > > >
> >> >>> > > > * Create consumer
> >> >>> > > > * For each partition
> >> >>> > > >* assign partition
> >> >>> > > >* seek to end
> >> >>> > > >* get position
> >> >>> > > >
> >> >>> > > > Issue is KafkaConsumer.posiiton() use
> >> >>> Fetcher.retrieveOffsetsByTimes()
> >> >>> > > > internally which is called with timeout Long.MAX_VALUE and it
> >> gets
> >> >>> > stuck
> >> >>> > > in
> >> >>> > > > loop there.
> >> >>> > > >
> >> >>> > > >
> >> >>> > > > Any pointers?
> >> >>> > > >
> >> >>> > > >
> >> >>> > > > *Version*: 0.10.0.1
> >> >>> > > >
> >> >>> > > >
> >> >>> > > > Thanks,
> >> >>> > > > Sahil
> >> >>> > > >
> >> >>> > >
> >> >>> > >
> >> >>> >
> >> >>>
> >> >>
> >> >>
> >> >
> >>
> >
> >
>


Re: Consumer group describe issue

2018-02-22 Thread sahil aggarwal
Bill,

I have created the jira: https://issues.apache.org/jira/browse/KAFKA-6581
but i am not able to assign it to myself.

Can you please help?


Thanks,
Sahil

On 28 December 2017 at 11:50, sahil aggarwal  wrote:

> @TedYu
> From 0.10.0 
>
> @Bill
> Thanks for the pointer. Will follow the steps mentioned in the doc.
>
> On 28 December 2017 at 07:39, Ted Yu  wrote:
>
>> Which branch was the patch generated from ?
>> When I tried to apply the patch:
>>
>> 6 out of 7 hunks FAILED -- saving rejects to file
>> core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala.rej
>>
>> FYI
>>
>> On Mon, Dec 25, 2017 at 9:45 PM, sahil aggarwal 
>> wrote:
>>
>> > Attached the patch. If someone can review it will be very helpfull.
>> >
>> >
>> >
>> > Thanks,
>> > sahil
>> >
>> > On 23 December 2017 at 13:11, sahil aggarwal 
>> > wrote:
>> >
>> >> my bad its  consumer.endOffsets in ConsumerGroupCommand.scala.
>> >>
>> >> https://github.com/apache/kafka/blob/ef97ed7ee5cb883a30245ea
>> >> 4f77ef99d4db15373/core/src/main/scala/kafka/admin/Consume
>> >> rGroupCommand.scala#L467
>> >>
>> >> On 23 December 2017 at 13:07, Ted Yu  wrote:
>> >>
>> >>> Sahil:
>> >>> I did a quick search in 0.11.0 branch and trunk for getEndOffsets but
>> >>> didn't find any occurrence.
>> >>>
>> >>> Mind giving us the location (and class) where getEndOffsets is called
>> ?
>> >>>
>> >>> Thanks
>> >>>
>> >>> On Fri, Dec 22, 2017 at 11:29 PM, sahil aggarwal <
>> sahil.ag...@gmail.com>
>> >>> wrote:
>> >>>
>> >>> > Fixed it by some code change in ConsumerGroupCommand.scala.
>> Possible to
>> >>> > push it upstream for 0.10.* ?
>> >>> >
>> >>> > It seems to be fixed in 0.11.* where it uses getEndOffsets() which
>> has
>> >>> > requestTimeoutMs instead of Long.MAX_VALUE.
>> >>> >
>> >>> > On 23 December 2017 at 02:46, Matthias J. Sax <
>> matth...@confluent.io>
>> >>> > wrote:
>> >>> >
>> >>> > > Your observation is correct. KafkaConsumer.position() is a
>> blocking
>> >>> > > call. It's a know issue that there is no configurable timeout
>> value.
>> >>> > >
>> >>> > > I am not aware of any workaround.
>> >>> > >
>> >>> > >
>> >>> > > -Matthias
>> >>> > >
>> >>> > > On 12/21/17 6:05 AM, sahil aggarwal wrote:
>> >>> > > > Hi,
>> >>> > > >
>> >>> > > > Facing issue where *kafka-consumer-groups.sh --describe * get
>> >>> stuck if
>> >>> > > one
>> >>> > > > of the partition is unavailable i.e no leader. Going through
>> some
>> >>> code
>> >>> > > > found that it does following to get log end offset:
>> >>> > > >
>> >>> > > > * Create consumer
>> >>> > > > * For each partition
>> >>> > > >* assign partition
>> >>> > > >* seek to end
>> >>> > > >* get position
>> >>> > > >
>> >>> > > > Issue is KafkaConsumer.posiiton() use
>> >>> Fetcher.retrieveOffsetsByTimes()
>> >>> > > > internally which is called with timeout Long.MAX_VALUE and it
>> gets
>> >>> > stuck
>> >>> > > in
>> >>> > > > loop there.
>> >>> > > >
>> >>> > > >
>> >>> > > > Any pointers?
>> >>> > > >
>> >>> > > >
>> >>> > > > *Version*: 0.10.0.1
>> >>> > > >
>> >>> > > >
>> >>> > > > Thanks,
>> >>> > > > Sahil
>> >>> > > >
>> >>> > >
>> >>> > >
>> >>> >
>> >>>
>> >>
>> >>
>> >
>>
>
>


Re: Consumer group describe issue

2017-12-27 Thread sahil aggarwal
@TedYu
>From 0.10.0 

@Bill
Thanks for the pointer. Will follow the steps mentioned in the doc.

On 28 December 2017 at 07:39, Ted Yu  wrote:

> Which branch was the patch generated from ?
> When I tried to apply the patch:
>
> 6 out of 7 hunks FAILED -- saving rejects to file
> core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala.rej
>
> FYI
>
> On Mon, Dec 25, 2017 at 9:45 PM, sahil aggarwal 
> wrote:
>
> > Attached the patch. If someone can review it will be very helpfull.
> >
> >
> >
> > Thanks,
> > sahil
> >
> > On 23 December 2017 at 13:11, sahil aggarwal 
> > wrote:
> >
> >> my bad its  consumer.endOffsets in ConsumerGroupCommand.scala.
> >>
> >> https://github.com/apache/kafka/blob/ef97ed7ee5cb883a30245ea
> >> 4f77ef99d4db15373/core/src/main/scala/kafka/admin/Consume
> >> rGroupCommand.scala#L467
> >>
> >> On 23 December 2017 at 13:07, Ted Yu  wrote:
> >>
> >>> Sahil:
> >>> I did a quick search in 0.11.0 branch and trunk for getEndOffsets but
> >>> didn't find any occurrence.
> >>>
> >>> Mind giving us the location (and class) where getEndOffsets is called ?
> >>>
> >>> Thanks
> >>>
> >>> On Fri, Dec 22, 2017 at 11:29 PM, sahil aggarwal <
> sahil.ag...@gmail.com>
> >>> wrote:
> >>>
> >>> > Fixed it by some code change in ConsumerGroupCommand.scala. Possible
> to
> >>> > push it upstream for 0.10.* ?
> >>> >
> >>> > It seems to be fixed in 0.11.* where it uses getEndOffsets() which
> has
> >>> > requestTimeoutMs instead of Long.MAX_VALUE.
> >>> >
> >>> > On 23 December 2017 at 02:46, Matthias J. Sax  >
> >>> > wrote:
> >>> >
> >>> > > Your observation is correct. KafkaConsumer.position() is a blocking
> >>> > > call. It's a know issue that there is no configurable timeout
> value.
> >>> > >
> >>> > > I am not aware of any workaround.
> >>> > >
> >>> > >
> >>> > > -Matthias
> >>> > >
> >>> > > On 12/21/17 6:05 AM, sahil aggarwal wrote:
> >>> > > > Hi,
> >>> > > >
> >>> > > > Facing issue where *kafka-consumer-groups.sh --describe * get
> >>> stuck if
> >>> > > one
> >>> > > > of the partition is unavailable i.e no leader. Going through some
> >>> code
> >>> > > > found that it does following to get log end offset:
> >>> > > >
> >>> > > > * Create consumer
> >>> > > > * For each partition
> >>> > > >* assign partition
> >>> > > >* seek to end
> >>> > > >* get position
> >>> > > >
> >>> > > > Issue is KafkaConsumer.posiiton() use
> >>> Fetcher.retrieveOffsetsByTimes()
> >>> > > > internally which is called with timeout Long.MAX_VALUE and it
> gets
> >>> > stuck
> >>> > > in
> >>> > > > loop there.
> >>> > > >
> >>> > > >
> >>> > > > Any pointers?
> >>> > > >
> >>> > > >
> >>> > > > *Version*: 0.10.0.1
> >>> > > >
> >>> > > >
> >>> > > > Thanks,
> >>> > > > Sahil
> >>> > > >
> >>> > >
> >>> > >
> >>> >
> >>>
> >>
> >>
> >
>


Re: Consumer group describe issue

2017-12-27 Thread Ted Yu
Which branch was the patch generated from ?
When I tried to apply the patch:

6 out of 7 hunks FAILED -- saving rejects to file
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala.rej

FYI

On Mon, Dec 25, 2017 at 9:45 PM, sahil aggarwal 
wrote:

> Attached the patch. If someone can review it will be very helpfull.
>
>
>
> Thanks,
> sahil
>
> On 23 December 2017 at 13:11, sahil aggarwal 
> wrote:
>
>> my bad its  consumer.endOffsets in ConsumerGroupCommand.scala.
>>
>> https://github.com/apache/kafka/blob/ef97ed7ee5cb883a30245ea
>> 4f77ef99d4db15373/core/src/main/scala/kafka/admin/Consume
>> rGroupCommand.scala#L467
>>
>> On 23 December 2017 at 13:07, Ted Yu  wrote:
>>
>>> Sahil:
>>> I did a quick search in 0.11.0 branch and trunk for getEndOffsets but
>>> didn't find any occurrence.
>>>
>>> Mind giving us the location (and class) where getEndOffsets is called ?
>>>
>>> Thanks
>>>
>>> On Fri, Dec 22, 2017 at 11:29 PM, sahil aggarwal 
>>> wrote:
>>>
>>> > Fixed it by some code change in ConsumerGroupCommand.scala. Possible to
>>> > push it upstream for 0.10.* ?
>>> >
>>> > It seems to be fixed in 0.11.* where it uses getEndOffsets() which has
>>> > requestTimeoutMs instead of Long.MAX_VALUE.
>>> >
>>> > On 23 December 2017 at 02:46, Matthias J. Sax 
>>> > wrote:
>>> >
>>> > > Your observation is correct. KafkaConsumer.position() is a blocking
>>> > > call. It's a know issue that there is no configurable timeout value.
>>> > >
>>> > > I am not aware of any workaround.
>>> > >
>>> > >
>>> > > -Matthias
>>> > >
>>> > > On 12/21/17 6:05 AM, sahil aggarwal wrote:
>>> > > > Hi,
>>> > > >
>>> > > > Facing issue where *kafka-consumer-groups.sh --describe * get
>>> stuck if
>>> > > one
>>> > > > of the partition is unavailable i.e no leader. Going through some
>>> code
>>> > > > found that it does following to get log end offset:
>>> > > >
>>> > > > * Create consumer
>>> > > > * For each partition
>>> > > >* assign partition
>>> > > >* seek to end
>>> > > >* get position
>>> > > >
>>> > > > Issue is KafkaConsumer.posiiton() use
>>> Fetcher.retrieveOffsetsByTimes()
>>> > > > internally which is called with timeout Long.MAX_VALUE and it gets
>>> > stuck
>>> > > in
>>> > > > loop there.
>>> > > >
>>> > > >
>>> > > > Any pointers?
>>> > > >
>>> > > >
>>> > > > *Version*: 0.10.0.1
>>> > > >
>>> > > >
>>> > > > Thanks,
>>> > > > Sahil
>>> > > >
>>> > >
>>> > >
>>> >
>>>
>>
>>
>


Re: Consumer group describe issue

2017-12-27 Thread Bill Bejeck
Sahil,

Thanks for the patch.

In order to have your patch reviewed,  please look over the contribution
guidelines outlined here https://kafka.apache.org/contributing

Thanks,
Bill

On Tue, Dec 26, 2017 at 12:45 AM, sahil aggarwal 
wrote:

> Attached the patch. If someone can review it will be very helpfull.
>
>
>
> Thanks,
> sahil
>
> On 23 December 2017 at 13:11, sahil aggarwal 
> wrote:
>
>> my bad its  consumer.endOffsets in ConsumerGroupCommand.scala.
>>
>> https://github.com/apache/kafka/blob/ef97ed7ee5cb883a30245ea
>> 4f77ef99d4db15373/core/src/main/scala/kafka/admin/Consume
>> rGroupCommand.scala#L467
>>
>> On 23 December 2017 at 13:07, Ted Yu  wrote:
>>
>>> Sahil:
>>> I did a quick search in 0.11.0 branch and trunk for getEndOffsets but
>>> didn't find any occurrence.
>>>
>>> Mind giving us the location (and class) where getEndOffsets is called ?
>>>
>>> Thanks
>>>
>>> On Fri, Dec 22, 2017 at 11:29 PM, sahil aggarwal 
>>> wrote:
>>>
>>> > Fixed it by some code change in ConsumerGroupCommand.scala. Possible to
>>> > push it upstream for 0.10.* ?
>>> >
>>> > It seems to be fixed in 0.11.* where it uses getEndOffsets() which has
>>> > requestTimeoutMs instead of Long.MAX_VALUE.
>>> >
>>> > On 23 December 2017 at 02:46, Matthias J. Sax 
>>> > wrote:
>>> >
>>> > > Your observation is correct. KafkaConsumer.position() is a blocking
>>> > > call. It's a know issue that there is no configurable timeout value.
>>> > >
>>> > > I am not aware of any workaround.
>>> > >
>>> > >
>>> > > -Matthias
>>> > >
>>> > > On 12/21/17 6:05 AM, sahil aggarwal wrote:
>>> > > > Hi,
>>> > > >
>>> > > > Facing issue where *kafka-consumer-groups.sh --describe * get
>>> stuck if
>>> > > one
>>> > > > of the partition is unavailable i.e no leader. Going through some
>>> code
>>> > > > found that it does following to get log end offset:
>>> > > >
>>> > > > * Create consumer
>>> > > > * For each partition
>>> > > >* assign partition
>>> > > >* seek to end
>>> > > >* get position
>>> > > >
>>> > > > Issue is KafkaConsumer.posiiton() use
>>> Fetcher.retrieveOffsetsByTimes()
>>> > > > internally which is called with timeout Long.MAX_VALUE and it gets
>>> > stuck
>>> > > in
>>> > > > loop there.
>>> > > >
>>> > > >
>>> > > > Any pointers?
>>> > > >
>>> > > >
>>> > > > *Version*: 0.10.0.1
>>> > > >
>>> > > >
>>> > > > Thanks,
>>> > > > Sahil
>>> > > >
>>> > >
>>> > >
>>> >
>>>
>>
>>
>


Re: Consumer group describe issue

2017-12-25 Thread sahil aggarwal
Attached the patch. If someone can review it will be very helpfull.



Thanks,
sahil

On 23 December 2017 at 13:11, sahil aggarwal  wrote:

> my bad its  consumer.endOffsets in ConsumerGroupCommand.scala.
>
> https://github.com/apache/kafka/blob/ef97ed7ee5cb883a30245ea4f77ef9
> 9d4db15373/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala#L467
>
> On 23 December 2017 at 13:07, Ted Yu  wrote:
>
>> Sahil:
>> I did a quick search in 0.11.0 branch and trunk for getEndOffsets but
>> didn't find any occurrence.
>>
>> Mind giving us the location (and class) where getEndOffsets is called ?
>>
>> Thanks
>>
>> On Fri, Dec 22, 2017 at 11:29 PM, sahil aggarwal 
>> wrote:
>>
>> > Fixed it by some code change in ConsumerGroupCommand.scala. Possible to
>> > push it upstream for 0.10.* ?
>> >
>> > It seems to be fixed in 0.11.* where it uses getEndOffsets() which has
>> > requestTimeoutMs instead of Long.MAX_VALUE.
>> >
>> > On 23 December 2017 at 02:46, Matthias J. Sax 
>> > wrote:
>> >
>> > > Your observation is correct. KafkaConsumer.position() is a blocking
>> > > call. It's a know issue that there is no configurable timeout value.
>> > >
>> > > I am not aware of any workaround.
>> > >
>> > >
>> > > -Matthias
>> > >
>> > > On 12/21/17 6:05 AM, sahil aggarwal wrote:
>> > > > Hi,
>> > > >
>> > > > Facing issue where *kafka-consumer-groups.sh --describe * get stuck
>> if
>> > > one
>> > > > of the partition is unavailable i.e no leader. Going through some
>> code
>> > > > found that it does following to get log end offset:
>> > > >
>> > > > * Create consumer
>> > > > * For each partition
>> > > >* assign partition
>> > > >* seek to end
>> > > >* get position
>> > > >
>> > > > Issue is KafkaConsumer.posiiton() use Fetcher.retrieveOffsetsByTimes
>> ()
>> > > > internally which is called with timeout Long.MAX_VALUE and it gets
>> > stuck
>> > > in
>> > > > loop there.
>> > > >
>> > > >
>> > > > Any pointers?
>> > > >
>> > > >
>> > > > *Version*: 0.10.0.1
>> > > >
>> > > >
>> > > > Thanks,
>> > > > Sahil
>> > > >
>> > >
>> > >
>> >
>>
>
>
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 0386404..707d70d 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -18,6 +18,7 @@
 package kafka.admin
 
 import java.util.Properties
+import java.util.concurrent.{Callable, ExecutorService, Executors, FutureTask}
 
 import joptsimple.{OptionParser, OptionSpec}
 import kafka.api.{OffsetFetchRequest, OffsetFetchResponse, OffsetRequest, PartitionOffsetRequestInfo}
@@ -35,10 +36,16 @@ import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.apache.kafka.common.utils.Utils
 
+import scala.concurrent._
+import ExecutionContext.Implicits.global
 import scala.collection.JavaConverters._
 import scala.collection.{Set, mutable}
+import scala.concurrent.{Await, Future, TimeoutException}
+import scala.concurrent.duration.Duration
+import scala.util.{Failure, Success, Try}
 
 object ConsumerGroupCommand {
+  val DEFAULT_CONSUMER_POSITION_TIMEOUT = 500
 
   def main(args: Array[String]) {
 val opts = new ConsumerGroupCommandOptions(args)
@@ -302,6 +309,15 @@ object ConsumerGroupCommand {
 
   class KafkaConsumerGroupService(val opts: ConsumerGroupCommandOptions) extends ConsumerGroupService {
 
+private val zkUtils = {
+  val zkUrl = opts.options.valueOf(opts.zkConnectOpt)
+  ZkUtils(zkUrl, 3, 3, JaasUtils.isZkSecurityEnabled)
+}
+
+private val consumerPositionTimeoutOpt = opts.options.valueOf(opts.consumerPositionTimeoutMsOpt)
+
+private var lastFailed = false;
+
 private val adminClient = createAdminClient()
 
 // `consumer` is only needed for `describe`, so we instantiate it lazily
@@ -332,12 +348,33 @@ object ConsumerGroupCommand {
 }
 
 protected def getLogEndOffset(topic: String, partition: Int): LogEndOffsetResult = {
-  val consumer = getConsumer()
-  val topicPartition = new TopicPartition(topic, partition)
-  consumer.assign(List(topicPartition).asJava)
-  consumer.seekToEnd(List(topicPartition).asJava)
-  val logEndOffset = consumer.position(topicPartition)
-  LogEndOffsetResult.LogEndOffset(logEndOffset)
+  zkUtils.getLeaderForPartition(topic, partition) match {
+case Some(-1) => LogEndOffsetResult.Unknown
+case Some(brokerId) =>
+  var consumer = getConsumer()
+  val topicPartition = new TopicPartition(topic, partition)
+  // it can become unavailable after the check above
+  val future = Future {
+consumer.assign(List(topicPartition).asJava)
+consumer.seekToEnd(List(topicPartition).asJava)
+val logEndOffset = 

Re: Consumer group describe issue

2017-12-22 Thread sahil aggarwal
my bad its  consumer.endOffsets in ConsumerGroupCommand.scala.

https://github.com/apache/kafka/blob/ef97ed7ee5cb883a30245ea4f77ef99d4db15373/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala#L467

On 23 December 2017 at 13:07, Ted Yu  wrote:

> Sahil:
> I did a quick search in 0.11.0 branch and trunk for getEndOffsets but
> didn't find any occurrence.
>
> Mind giving us the location (and class) where getEndOffsets is called ?
>
> Thanks
>
> On Fri, Dec 22, 2017 at 11:29 PM, sahil aggarwal 
> wrote:
>
> > Fixed it by some code change in ConsumerGroupCommand.scala. Possible to
> > push it upstream for 0.10.* ?
> >
> > It seems to be fixed in 0.11.* where it uses getEndOffsets() which has
> > requestTimeoutMs instead of Long.MAX_VALUE.
> >
> > On 23 December 2017 at 02:46, Matthias J. Sax 
> > wrote:
> >
> > > Your observation is correct. KafkaConsumer.position() is a blocking
> > > call. It's a know issue that there is no configurable timeout value.
> > >
> > > I am not aware of any workaround.
> > >
> > >
> > > -Matthias
> > >
> > > On 12/21/17 6:05 AM, sahil aggarwal wrote:
> > > > Hi,
> > > >
> > > > Facing issue where *kafka-consumer-groups.sh --describe * get stuck
> if
> > > one
> > > > of the partition is unavailable i.e no leader. Going through some
> code
> > > > found that it does following to get log end offset:
> > > >
> > > > * Create consumer
> > > > * For each partition
> > > >* assign partition
> > > >* seek to end
> > > >* get position
> > > >
> > > > Issue is KafkaConsumer.posiiton() use Fetcher.
> retrieveOffsetsByTimes()
> > > > internally which is called with timeout Long.MAX_VALUE and it gets
> > stuck
> > > in
> > > > loop there.
> > > >
> > > >
> > > > Any pointers?
> > > >
> > > >
> > > > *Version*: 0.10.0.1
> > > >
> > > >
> > > > Thanks,
> > > > Sahil
> > > >
> > >
> > >
> >
>


Re: Consumer group describe issue

2017-12-22 Thread Ted Yu
Sahil:
I did a quick search in 0.11.0 branch and trunk for getEndOffsets but
didn't find any occurrence.

Mind giving us the location (and class) where getEndOffsets is called ?

Thanks

On Fri, Dec 22, 2017 at 11:29 PM, sahil aggarwal 
wrote:

> Fixed it by some code change in ConsumerGroupCommand.scala. Possible to
> push it upstream for 0.10.* ?
>
> It seems to be fixed in 0.11.* where it uses getEndOffsets() which has
> requestTimeoutMs instead of Long.MAX_VALUE.
>
> On 23 December 2017 at 02:46, Matthias J. Sax 
> wrote:
>
> > Your observation is correct. KafkaConsumer.position() is a blocking
> > call. It's a know issue that there is no configurable timeout value.
> >
> > I am not aware of any workaround.
> >
> >
> > -Matthias
> >
> > On 12/21/17 6:05 AM, sahil aggarwal wrote:
> > > Hi,
> > >
> > > Facing issue where *kafka-consumer-groups.sh --describe * get stuck if
> > one
> > > of the partition is unavailable i.e no leader. Going through some code
> > > found that it does following to get log end offset:
> > >
> > > * Create consumer
> > > * For each partition
> > >* assign partition
> > >* seek to end
> > >* get position
> > >
> > > Issue is KafkaConsumer.posiiton() use Fetcher.retrieveOffsetsByTimes()
> > > internally which is called with timeout Long.MAX_VALUE and it gets
> stuck
> > in
> > > loop there.
> > >
> > >
> > > Any pointers?
> > >
> > >
> > > *Version*: 0.10.0.1
> > >
> > >
> > > Thanks,
> > > Sahil
> > >
> >
> >
>


Re: Consumer group describe issue

2017-12-22 Thread sahil aggarwal
Fixed it by some code change in ConsumerGroupCommand.scala. Possible to
push it upstream for 0.10.* ?

It seems to be fixed in 0.11.* where it uses getEndOffsets() which has
requestTimeoutMs instead of Long.MAX_VALUE.

On 23 December 2017 at 02:46, Matthias J. Sax  wrote:

> Your observation is correct. KafkaConsumer.position() is a blocking
> call. It's a know issue that there is no configurable timeout value.
>
> I am not aware of any workaround.
>
>
> -Matthias
>
> On 12/21/17 6:05 AM, sahil aggarwal wrote:
> > Hi,
> >
> > Facing issue where *kafka-consumer-groups.sh --describe * get stuck if
> one
> > of the partition is unavailable i.e no leader. Going through some code
> > found that it does following to get log end offset:
> >
> > * Create consumer
> > * For each partition
> >* assign partition
> >* seek to end
> >* get position
> >
> > Issue is KafkaConsumer.posiiton() use Fetcher.retrieveOffsetsByTimes()
> > internally which is called with timeout Long.MAX_VALUE and it gets stuck
> in
> > loop there.
> >
> >
> > Any pointers?
> >
> >
> > *Version*: 0.10.0.1
> >
> >
> > Thanks,
> > Sahil
> >
>
>


Re: Consumer group describe issue

2017-12-22 Thread Matthias J. Sax
Your observation is correct. KafkaConsumer.position() is a blocking
call. It's a know issue that there is no configurable timeout value.

I am not aware of any workaround.


-Matthias

On 12/21/17 6:05 AM, sahil aggarwal wrote:
> Hi,
> 
> Facing issue where *kafka-consumer-groups.sh --describe * get stuck if one
> of the partition is unavailable i.e no leader. Going through some code
> found that it does following to get log end offset:
> 
> * Create consumer
> * For each partition
>* assign partition
>* seek to end
>* get position
> 
> Issue is KafkaConsumer.posiiton() use Fetcher.retrieveOffsetsByTimes()
> internally which is called with timeout Long.MAX_VALUE and it gets stuck in
> loop there.
> 
> 
> Any pointers?
> 
> 
> *Version*: 0.10.0.1
> 
> 
> Thanks,
> Sahil
> 



signature.asc
Description: OpenPGP digital signature


Consumer group describe issue

2017-12-21 Thread sahil aggarwal
Hi,

Facing issue where *kafka-consumer-groups.sh --describe * get stuck if one
of the partition is unavailable i.e no leader. Going through some code
found that it does following to get log end offset:

* Create consumer
* For each partition
   * assign partition
   * seek to end
   * get position

Issue is KafkaConsumer.posiiton() use Fetcher.retrieveOffsetsByTimes()
internally which is called with timeout Long.MAX_VALUE and it gets stuck in
loop there.


Any pointers?


*Version*: 0.10.0.1


Thanks,
Sahil