[ https://issues.apache.org/jira/browse/KAFKA-7044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16610945#comment-16610945 ]
ASF GitHub Bot commented on KAFKA-7044: --------------------------------------- hachikuji closed pull request #5627: KAFKA-7044: Fix Fetcher.fetchOffsetsByTimes and NPE in describe consumer group URL: https://github.com/apache/kafka/pull/5627 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 3ed4a9c1f3f..2fa499c2165 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -70,7 +70,7 @@ files="MockAdminClient.java"/> <suppress checks="JavaNCSS" - files="RequestResponseTest.java"/> + files="RequestResponseTest.java|FetcherTest.java"/> <suppress checks="NPathComplexity" files="MemoryRecordsTest|MetricsTest"/> diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index dc0daa233ab..9fa64462d85 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -414,7 +414,7 @@ private ListOffsetResult fetchOffsetsByTimes(Map<TopicPartition, Long> timestamp if (value.partitionsToRetry.isEmpty()) return result; - remainingToSearch.keySet().removeAll(result.fetchedOffsets.keySet()); + remainingToSearch.keySet().retainAll(value.partitionsToRetry); } else if (!future.isRetriable()) { throw future.exception(); } @@ -575,7 +575,7 @@ private void resetOffsetsAsync(Map<TopicPartition, Long> partitionResetTimestamp metadata.add(tp.topic()); Map<Node, Map<TopicPartition, ListOffsetRequest.PartitionData>> timestampsToSearchByNode = - groupListOffsetRequests(partitionResetTimestamps); + groupListOffsetRequests(partitionResetTimestamps, new HashSet<>()); for (Map.Entry<Node, Map<TopicPartition, ListOffsetRequest.PartitionData>> entry : timestampsToSearchByNode.entrySet()) { Node node = entry.getKey(); final Map<TopicPartition, ListOffsetRequest.PartitionData> resetTimestamps = entry.getValue(); @@ -624,19 +624,19 @@ public void onFailure(RuntimeException e) { for (TopicPartition tp : timestampsToSearch.keySet()) metadata.add(tp.topic()); + final Set<TopicPartition> partitionsToRetry = new HashSet<>(); Map<Node, Map<TopicPartition, ListOffsetRequest.PartitionData>> timestampsToSearchByNode = - groupListOffsetRequests(timestampsToSearch); + groupListOffsetRequests(timestampsToSearch, partitionsToRetry); if (timestampsToSearchByNode.isEmpty()) return RequestFuture.failure(new StaleMetadataException()); final RequestFuture<ListOffsetResult> listOffsetRequestsFuture = new RequestFuture<>(); final Map<TopicPartition, OffsetData> fetchedTimestampOffsets = new HashMap<>(); - final Set<TopicPartition> partitionsToRetry = new HashSet<>(); final AtomicInteger remainingResponses = new AtomicInteger(timestampsToSearchByNode.size()); for (Map.Entry<Node, Map<TopicPartition, ListOffsetRequest.PartitionData>> entry : timestampsToSearchByNode.entrySet()) { RequestFuture<ListOffsetResult> future = - sendListOffsetRequest(entry.getKey(), entry.getValue(), requireTimestamps); + sendListOffsetRequest(entry.getKey(), entry.getValue(), requireTimestamps); future.addListener(new RequestFutureListener<ListOffsetResult>() { @Override public void onSuccess(ListOffsetResult partialResult) { @@ -663,8 +663,16 @@ public void onFailure(RuntimeException e) { return listOffsetRequestsFuture; } + /** + * Groups timestamps to search by node for topic partitions in `timestampsToSearch` that have + * leaders available. Topic partitions from `timestampsToSearch` that do not have their leader + * available are added to `partitionsToRetry` + * @param timestampsToSearch The mapping from partitions ot the target timestamps + * @param partitionsToRetry A set of topic partitions that will be extended with partitions + * that need metadata update or re-connect to the leader. + */ private Map<Node, Map<TopicPartition, ListOffsetRequest.PartitionData>> groupListOffsetRequests( - Map<TopicPartition, Long> timestampsToSearch) { + Map<TopicPartition, Long> timestampsToSearch, Set<TopicPartition> partitionsToRetry) { final Map<Node, Map<TopicPartition, ListOffsetRequest.PartitionData>> timestampsToSearchByNode = new HashMap<>(); for (Map.Entry<TopicPartition, Long> entry: timestampsToSearch.entrySet()) { TopicPartition tp = entry.getKey(); @@ -673,9 +681,11 @@ public void onFailure(RuntimeException e) { metadata.add(tp.topic()); log.debug("Leader for partition {} is unknown for fetching offset", tp); metadata.requestUpdate(); + partitionsToRetry.add(tp); } else if (info.leader() == null) { log.debug("Leader for partition {} is unavailable for fetching offset", tp); metadata.requestUpdate(); + partitionsToRetry.add(tp); } else if (client.isUnavailable(info.leader())) { client.maybeThrowAuthFailure(info.leader()); @@ -683,7 +693,8 @@ public void onFailure(RuntimeException e) { // try again. No need to request a metadata update since the disconnect will have // done so already. log.debug("Leader {} for partition {} is unavailable for fetching offset until reconnect backoff expires", - info.leader(), tp); + info.leader(), tp); + partitionsToRetry.add(tp); } else { Node node = info.leader(); Map<TopicPartition, ListOffsetRequest.PartitionData> topicData = diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index d314a4d8c9f..5edffa9dd93 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -108,6 +108,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -1919,6 +1920,48 @@ public void testGetOffsetsForTimes() { testGetOffsetsForTimesWithError(Errors.BROKER_NOT_AVAILABLE, Errors.NONE, 10L, 100L, 10L, 100L); } + @Test + public void testGetOffsetsForTimesWhenSomeTopicPartitionLeadersNotKnownInitially() { + final String anotherTopic = "another-topic"; + final TopicPartition t2p0 = new TopicPartition(anotherTopic, 0); + + client.reset(); + + // Metadata initially has one topic + Cluster cluster = TestUtils.clusterWith(3, topicName, 2); + metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); + + // The first metadata refresh should contain one topic + client.prepareMetadataUpdate(cluster, Collections.<String>emptySet(), false); + client.prepareResponseFrom(listOffsetResponse(tp0, Errors.NONE, 1000L, 11L), cluster.leaderFor(tp0)); + client.prepareResponseFrom(listOffsetResponse(tp1, Errors.NONE, 1000L, 32L), cluster.leaderFor(tp1)); + + // Second metadata refresh should contain two topics + Map<String, Integer> partitionNumByTopic = new HashMap<>(); + partitionNumByTopic.put(topicName, 2); + partitionNumByTopic.put(anotherTopic, 1); + Cluster updatedCluster = TestUtils.clusterWith(3, partitionNumByTopic); + client.prepareMetadataUpdate(updatedCluster, Collections.<String>emptySet(), false); + client.prepareResponseFrom(listOffsetResponse(t2p0, Errors.NONE, 1000L, 54L), cluster.leaderFor(t2p0)); + + Map<TopicPartition, Long> timestampToSearch = new HashMap<>(); + timestampToSearch.put(tp0, ListOffsetRequest.LATEST_TIMESTAMP); + timestampToSearch.put(tp1, ListOffsetRequest.LATEST_TIMESTAMP); + timestampToSearch.put(t2p0, ListOffsetRequest.LATEST_TIMESTAMP); + Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = + fetcher.offsetsByTimes(timestampToSearch, time.timer(Long.MAX_VALUE)); + + assertNotNull("Expect Fetcher.offsetsByTimes() to return non-null result for " + tp0, + offsetAndTimestampMap.get(tp0)); + assertNotNull("Expect Fetcher.offsetsByTimes() to return non-null result for " + tp1, + offsetAndTimestampMap.get(tp1)); + assertNotNull("Expect Fetcher.offsetsByTimes() to return non-null result for " + t2p0, + offsetAndTimestampMap.get(t2p0)); + assertEquals(11L, offsetAndTimestampMap.get(tp0).offset()); + assertEquals(32L, offsetAndTimestampMap.get(tp1).offset()); + assertEquals(54L, offsetAndTimestampMap.get(t2p0).offset()); + } + @Test(expected = TimeoutException.class) public void testBatchedListOffsetsMetadataErrors() { Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = new HashMap<>(); diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 1d61720bfa9..c0f6797eddd 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -292,12 +292,8 @@ object ConsumerGroupCommand extends Logging { } getLogEndOffsets(topicPartitions).map { - logEndOffsetResult => - logEndOffsetResult._2 match { - case LogOffsetResult.LogOffset(logEndOffset) => getDescribePartitionResult(logEndOffsetResult._1, Some(logEndOffset)) - case LogOffsetResult.Unknown => getDescribePartitionResult(logEndOffsetResult._1, None) - case LogOffsetResult.Ignore => null - } + case (topicPartition, LogOffsetResult.LogOffset(offset)) => getDescribePartitionResult(topicPartition, Some(offset)) + case (topicPartition, _) => getDescribePartitionResult(topicPartition, None) }.toArray } @@ -399,16 +395,20 @@ object ConsumerGroupCommand extends Logging { private def getLogEndOffsets(topicPartitions: Seq[TopicPartition]): Map[TopicPartition, LogOffsetResult] = { val offsets = getConsumer.endOffsets(topicPartitions.asJava) topicPartitions.map { topicPartition => - val logEndOffset = offsets.get(topicPartition) - topicPartition -> LogOffsetResult.LogOffset(logEndOffset) + Option(offsets.get(topicPartition)) match { + case Some(logEndOffset) => topicPartition -> LogOffsetResult.LogOffset(logEndOffset) + case _ => topicPartition -> LogOffsetResult.Unknown + } }.toMap } private def getLogStartOffsets(topicPartitions: Seq[TopicPartition]): Map[TopicPartition, LogOffsetResult] = { val offsets = getConsumer.beginningOffsets(topicPartitions.asJava) topicPartitions.map { topicPartition => - val logStartOffset = offsets.get(topicPartition) - topicPartition -> LogOffsetResult.LogOffset(logStartOffset) + Option(offsets.get(topicPartition)) match { + case Some(logStartOffset) => topicPartition -> LogOffsetResult.LogOffset(logStartOffset) + case _ => topicPartition -> LogOffsetResult.Unknown + } }.toMap } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > kafka-consumer-groups.sh NullPointerException describing round robin or > sticky assignors > ---------------------------------------------------------------------------------------- > > Key: KAFKA-7044 > URL: https://issues.apache.org/jira/browse/KAFKA-7044 > Project: Kafka > Issue Type: Bug > Components: tools > Affects Versions: 1.1.0 > Environment: CentOS 7.4, Oracle JDK 1.8 > Reporter: Jeff Field > Assignee: Vahid Hashemian > Priority: Major > Fix For: 2.1.0 > > > We've recently moved to using the round robin assignor for one of our > consumer groups, and started testing the sticky assignor. In both cases, > using Kafka 1.1.0 we get a null pointer exception *unless* the group being > described is rebalancing: > {code:java} > kafka-consumer-groups --bootstrap-server fqdn:9092 --describe --group > groupname-for-consumer > Error: Executing consumer group command failed due to null > [2018-06-12 01:32:34,179] DEBUG Exception in consumer group command > (kafka.admin.ConsumerGroupCommand$) > java.lang.NullPointerException > at scala.Predef$.Long2long(Predef.scala:363) > at > kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$getLogEndOffsets$2.apply(ConsumerGroupCommand.scala:612) > at > kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$getLogEndOffsets$2.apply(ConsumerGroupCommand.scala:610) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:392) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.immutable.List.map(List.scala:296) > at > kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.getLogEndOffsets(ConsumerGroupCommand.scala:610) > at > kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describePartitions(ConsumerGroupCommand.scala:328) > at > kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.collectConsumerAssignment(ConsumerGroupCommand.scala:308) > at > kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.collectConsumerAssignment(ConsumerGroupCommand.scala:544) > at > kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$10$$anonfun$13.apply(ConsumerGroupCommand.scala:571) > at > kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$10$$anonfun$13.apply(ConsumerGroupCommand.scala:565) > at scala.collection.immutable.List.flatMap(List.scala:338) > at > kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$10.apply(ConsumerGroupCommand.scala:565) > at > kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$10.apply(ConsumerGroupCommand.scala:558) > at scala.Option.map(Option.scala:146) > at > kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.collectGroupOffsets(ConsumerGroupCommand.scala:558) > at > kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describeGroup(ConsumerGroupCommand.scala:271) > at > kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.describeGroup(ConsumerGroupCommand.scala:544) > at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:77) > at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala) > [2018-06-12 01:32:34,255] DEBUG Removed sensor with name connections-closed: > (org.apache.kafka.common.metrics.Metrics){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)