Attached the patch. If someone can review it will be very helpfull.


Thanks,
sahil

On 23 December 2017 at 13:11, sahil aggarwal <sahil.ag...@gmail.com> wrote:

> my bad its  consumer.endOffsets in ConsumerGroupCommand.scala.
>
> https://github.com/apache/kafka/blob/ef97ed7ee5cb883a30245ea4f77ef9
> 9d4db15373/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala#L467
>
> On 23 December 2017 at 13:07, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Sahil:
>> I did a quick search in 0.11.0 branch and trunk for getEndOffsets but
>> didn't find any occurrence.
>>
>> Mind giving us the location (and class) where getEndOffsets is called ?
>>
>> Thanks
>>
>> On Fri, Dec 22, 2017 at 11:29 PM, sahil aggarwal <sahil.ag...@gmail.com>
>> wrote:
>>
>> > Fixed it by some code change in ConsumerGroupCommand.scala. Possible to
>> > push it upstream for 0.10.* ?
>> >
>> > It seems to be fixed in 0.11.* where it uses getEndOffsets() which has
>> > requestTimeoutMs instead of Long.MAX_VALUE.
>> >
>> > On 23 December 2017 at 02:46, Matthias J. Sax <matth...@confluent.io>
>> > wrote:
>> >
>> > > Your observation is correct. KafkaConsumer.position() is a blocking
>> > > call. It's a know issue that there is no configurable timeout value.
>> > >
>> > > I am not aware of any workaround.
>> > >
>> > >
>> > > -Matthias
>> > >
>> > > On 12/21/17 6:05 AM, sahil aggarwal wrote:
>> > > > Hi,
>> > > >
>> > > > Facing issue where *kafka-consumer-groups.sh --describe * get stuck
>> if
>> > > one
>> > > > of the partition is unavailable i.e no leader. Going through some
>> code
>> > > > found that it does following to get log end offset:
>> > > >
>> > > > * Create consumer
>> > > > * For each partition
>> > > >    * assign partition
>> > > >    * seek to end
>> > > >    * get position
>> > > >
>> > > > Issue is KafkaConsumer.posiiton() use Fetcher.retrieveOffsetsByTimes
>> ()
>> > > > internally which is called with timeout Long.MAX_VALUE and it gets
>> > stuck
>> > > in
>> > > > loop there.
>> > > >
>> > > >
>> > > > Any pointers?
>> > > >
>> > > >
>> > > > *Version*: 0.10.0.1
>> > > >
>> > > >
>> > > > Thanks,
>> > > > Sahil
>> > > >
>> > >
>> > >
>> >
>>
>
>
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 0386404..707d70d 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -18,6 +18,7 @@
 package kafka.admin
 
 import java.util.Properties
+import java.util.concurrent.{Callable, ExecutorService, Executors, FutureTask}
 
 import joptsimple.{OptionParser, OptionSpec}
 import kafka.api.{OffsetFetchRequest, OffsetFetchResponse, OffsetRequest, PartitionOffsetRequestInfo}
@@ -35,10 +36,16 @@ import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.apache.kafka.common.utils.Utils
 
+import scala.concurrent._
+import ExecutionContext.Implicits.global
 import scala.collection.JavaConverters._
 import scala.collection.{Set, mutable}
+import scala.concurrent.{Await, Future, TimeoutException}
+import scala.concurrent.duration.Duration
+import scala.util.{Failure, Success, Try}
 
 object ConsumerGroupCommand {
+  val DEFAULT_CONSUMER_POSITION_TIMEOUT = 500
 
   def main(args: Array[String]) {
     val opts = new ConsumerGroupCommandOptions(args)
@@ -302,6 +309,15 @@ object ConsumerGroupCommand {
 
   class KafkaConsumerGroupService(val opts: ConsumerGroupCommandOptions) extends ConsumerGroupService {
 
+    private val zkUtils = {
+      val zkUrl = opts.options.valueOf(opts.zkConnectOpt)
+      ZkUtils(zkUrl, 30000, 30000, JaasUtils.isZkSecurityEnabled)
+    }
+
+    private val consumerPositionTimeoutOpt = opts.options.valueOf(opts.consumerPositionTimeoutMsOpt)
+
+    private var lastFailed = false;
+
     private val adminClient = createAdminClient()
 
     // `consumer` is only needed for `describe`, so we instantiate it lazily
@@ -332,12 +348,33 @@ object ConsumerGroupCommand {
     }
 
     protected def getLogEndOffset(topic: String, partition: Int): LogEndOffsetResult = {
-      val consumer = getConsumer()
-      val topicPartition = new TopicPartition(topic, partition)
-      consumer.assign(List(topicPartition).asJava)
-      consumer.seekToEnd(List(topicPartition).asJava)
-      val logEndOffset = consumer.position(topicPartition)
-      LogEndOffsetResult.LogEndOffset(logEndOffset)
+      zkUtils.getLeaderForPartition(topic, partition) match {
+        case Some(-1) => LogEndOffsetResult.Unknown
+        case Some(brokerId) =>
+          var consumer = getConsumer()
+          val topicPartition = new TopicPartition(topic, partition)
+          // it can become unavailable after the check above
+          val future = Future {
+            consumer.assign(List(topicPartition).asJava)
+            consumer.seekToEnd(List(topicPartition).asJava)
+            val logEndOffset = consumer.position(topicPartition)
+            LogEndOffsetResult.LogEndOffset(logEndOffset)
+          }
+
+          try {
+            Await.result(future, Duration.create(consumerPositionTimeoutOpt, "millisecond"))
+          } catch  {
+            case t : TimeoutException =>
+              lastFailed = true
+              LogEndOffsetResult.Unknown
+            case t : Exception =>
+              lastFailed = true
+              LogEndOffsetResult.Ignore
+          }
+        case None =>
+          println(s"No broker for partition ${new TopicPartition(topic, partition)}")
+          LogEndOffsetResult.Ignore
+      }
     }
 
     def close() {
@@ -352,7 +389,7 @@ object ConsumerGroupCommand {
     }
 
     private def getConsumer() = {
-      if (consumer == null)
+      if (consumer == null || lastFailed)
         consumer = createNewConsumer()
       consumer
     }
@@ -376,6 +413,7 @@ object ConsumerGroupCommand {
 
   sealed trait LogEndOffsetResult
 
+
   object LogEndOffsetResult {
     case class LogEndOffset(value: Long) extends LogEndOffsetResult
     case object Unknown extends LogEndOffsetResult
@@ -425,25 +463,30 @@ object ConsumerGroupCommand {
                                   .withRequiredArg
                                   .describedAs("command config property file")
                                   .ofType(classOf[String])
+    var consumerPositionTimeoutOptDoc = "Timeout for consumer.position() in ms"
+    val consumerPositionTimeoutMsOpt = parser.accepts("consumer-position-timeout", consumerPositionTimeoutOptDoc)
+                                              .withRequiredArg()
+                                                .ofType(classOf[Long])
+                                                .defaultsTo(DEFAULT_CONSUMER_POSITION_TIMEOUT)
+
     val options = parser.parse(args : _*)
 
     val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteOpt)
 
     def checkArgs() {
+
+      CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
+
       // check required args
       if (options.has(newConsumerOpt)) {
         CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt)
 
-        if (options.has(zkConnectOpt))
-          CommandLineUtils.printUsageAndDie(parser, s"Option $zkConnectOpt is not valid with $newConsumerOpt")
-
         if (options.has(deleteOpt))
           CommandLineUtils.printUsageAndDie(parser, s"Option $deleteOpt is not valid with $newConsumerOpt. Note that " +
             "there's no need to delete group metadata for the new consumer as it is automatically deleted when the last " +
             "member leaves")
 
       } else {
-        CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
 
         if (options.has(bootstrapServerOpt))
           CommandLineUtils.printUsageAndDie(parser, s"Option $bootstrapServerOpt is only valid with $newConsumerOpt")

Reply via email to