Attached the patch. If someone can review it will be very helpfull.
Thanks, sahil On 23 December 2017 at 13:11, sahil aggarwal <sahil.ag...@gmail.com> wrote: > my bad its consumer.endOffsets in ConsumerGroupCommand.scala. > > https://github.com/apache/kafka/blob/ef97ed7ee5cb883a30245ea4f77ef9 > 9d4db15373/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala#L467 > > On 23 December 2017 at 13:07, Ted Yu <yuzhih...@gmail.com> wrote: > >> Sahil: >> I did a quick search in 0.11.0 branch and trunk for getEndOffsets but >> didn't find any occurrence. >> >> Mind giving us the location (and class) where getEndOffsets is called ? >> >> Thanks >> >> On Fri, Dec 22, 2017 at 11:29 PM, sahil aggarwal <sahil.ag...@gmail.com> >> wrote: >> >> > Fixed it by some code change in ConsumerGroupCommand.scala. Possible to >> > push it upstream for 0.10.* ? >> > >> > It seems to be fixed in 0.11.* where it uses getEndOffsets() which has >> > requestTimeoutMs instead of Long.MAX_VALUE. >> > >> > On 23 December 2017 at 02:46, Matthias J. Sax <matth...@confluent.io> >> > wrote: >> > >> > > Your observation is correct. KafkaConsumer.position() is a blocking >> > > call. It's a know issue that there is no configurable timeout value. >> > > >> > > I am not aware of any workaround. >> > > >> > > >> > > -Matthias >> > > >> > > On 12/21/17 6:05 AM, sahil aggarwal wrote: >> > > > Hi, >> > > > >> > > > Facing issue where *kafka-consumer-groups.sh --describe * get stuck >> if >> > > one >> > > > of the partition is unavailable i.e no leader. Going through some >> code >> > > > found that it does following to get log end offset: >> > > > >> > > > * Create consumer >> > > > * For each partition >> > > > * assign partition >> > > > * seek to end >> > > > * get position >> > > > >> > > > Issue is KafkaConsumer.posiiton() use Fetcher.retrieveOffsetsByTimes >> () >> > > > internally which is called with timeout Long.MAX_VALUE and it gets >> > stuck >> > > in >> > > > loop there. >> > > > >> > > > >> > > > Any pointers? >> > > > >> > > > >> > > > *Version*: 0.10.0.1 >> > > > >> > > > >> > > > Thanks, >> > > > Sahil >> > > > >> > > >> > > >> > >> > >
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 0386404..707d70d 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -18,6 +18,7 @@ package kafka.admin import java.util.Properties +import java.util.concurrent.{Callable, ExecutorService, Executors, FutureTask} import joptsimple.{OptionParser, OptionSpec} import kafka.api.{OffsetFetchRequest, OffsetFetchResponse, OffsetRequest, PartitionOffsetRequestInfo} @@ -35,10 +36,16 @@ import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.utils.Utils +import scala.concurrent._ +import ExecutionContext.Implicits.global import scala.collection.JavaConverters._ import scala.collection.{Set, mutable} +import scala.concurrent.{Await, Future, TimeoutException} +import scala.concurrent.duration.Duration +import scala.util.{Failure, Success, Try} object ConsumerGroupCommand { + val DEFAULT_CONSUMER_POSITION_TIMEOUT = 500 def main(args: Array[String]) { val opts = new ConsumerGroupCommandOptions(args) @@ -302,6 +309,15 @@ object ConsumerGroupCommand { class KafkaConsumerGroupService(val opts: ConsumerGroupCommandOptions) extends ConsumerGroupService { + private val zkUtils = { + val zkUrl = opts.options.valueOf(opts.zkConnectOpt) + ZkUtils(zkUrl, 30000, 30000, JaasUtils.isZkSecurityEnabled) + } + + private val consumerPositionTimeoutOpt = opts.options.valueOf(opts.consumerPositionTimeoutMsOpt) + + private var lastFailed = false; + private val adminClient = createAdminClient() // `consumer` is only needed for `describe`, so we instantiate it lazily @@ -332,12 +348,33 @@ object ConsumerGroupCommand { } protected def getLogEndOffset(topic: String, partition: Int): LogEndOffsetResult = { - val consumer = getConsumer() - val topicPartition = new TopicPartition(topic, partition) - consumer.assign(List(topicPartition).asJava) - consumer.seekToEnd(List(topicPartition).asJava) - val logEndOffset = consumer.position(topicPartition) - LogEndOffsetResult.LogEndOffset(logEndOffset) + zkUtils.getLeaderForPartition(topic, partition) match { + case Some(-1) => LogEndOffsetResult.Unknown + case Some(brokerId) => + var consumer = getConsumer() + val topicPartition = new TopicPartition(topic, partition) + // it can become unavailable after the check above + val future = Future { + consumer.assign(List(topicPartition).asJava) + consumer.seekToEnd(List(topicPartition).asJava) + val logEndOffset = consumer.position(topicPartition) + LogEndOffsetResult.LogEndOffset(logEndOffset) + } + + try { + Await.result(future, Duration.create(consumerPositionTimeoutOpt, "millisecond")) + } catch { + case t : TimeoutException => + lastFailed = true + LogEndOffsetResult.Unknown + case t : Exception => + lastFailed = true + LogEndOffsetResult.Ignore + } + case None => + println(s"No broker for partition ${new TopicPartition(topic, partition)}") + LogEndOffsetResult.Ignore + } } def close() { @@ -352,7 +389,7 @@ object ConsumerGroupCommand { } private def getConsumer() = { - if (consumer == null) + if (consumer == null || lastFailed) consumer = createNewConsumer() consumer } @@ -376,6 +413,7 @@ object ConsumerGroupCommand { sealed trait LogEndOffsetResult + object LogEndOffsetResult { case class LogEndOffset(value: Long) extends LogEndOffsetResult case object Unknown extends LogEndOffsetResult @@ -425,25 +463,30 @@ object ConsumerGroupCommand { .withRequiredArg .describedAs("command config property file") .ofType(classOf[String]) + var consumerPositionTimeoutOptDoc = "Timeout for consumer.position() in ms" + val consumerPositionTimeoutMsOpt = parser.accepts("consumer-position-timeout", consumerPositionTimeoutOptDoc) + .withRequiredArg() + .ofType(classOf[Long]) + .defaultsTo(DEFAULT_CONSUMER_POSITION_TIMEOUT) + val options = parser.parse(args : _*) val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteOpt) def checkArgs() { + + CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) + // check required args if (options.has(newConsumerOpt)) { CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt) - if (options.has(zkConnectOpt)) - CommandLineUtils.printUsageAndDie(parser, s"Option $zkConnectOpt is not valid with $newConsumerOpt") - if (options.has(deleteOpt)) CommandLineUtils.printUsageAndDie(parser, s"Option $deleteOpt is not valid with $newConsumerOpt. Note that " + "there's no need to delete group metadata for the new consumer as it is automatically deleted when the last " + "member leaves") } else { - CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) if (options.has(bootstrapServerOpt)) CommandLineUtils.printUsageAndDie(parser, s"Option $bootstrapServerOpt is only valid with $newConsumerOpt")