vvcephei commented on a change in pull request #8787:
URL: https://github.com/apache/kafka/pull/8787#discussion_r436208312



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -562,23 +564,18 @@ private void restoreChangelog(final ChangelogMetadata 
changelogMetadata) {
     }
 
     private Map<TopicPartition, Long> committedOffsetForChangelogs(final 
Set<TopicPartition> partitions) {
-        if (partitions.isEmpty())
-            return Collections.emptyMap();
-

Review comment:
       Ah, now I see it.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
##########
@@ -95,19 +99,65 @@ public static String getTaskProducerClientId(final String 
threadClientId, final
         return result;
     }
 
-    public static Map<TopicPartition, ListOffsetsResultInfo> 
fetchEndOffsets(final Collection<TopicPartition> partitions,
-                                                                             
final Admin adminClient) {
-        final Map<TopicPartition, ListOffsetsResultInfo> endOffsets;
+    /**
+     * @throws StreamsException if the consumer throws an exception
+     * @throws org.apache.kafka.common.errors.TimeoutException if the request 
times out
+     */
+    public static Map<TopicPartition, Long> fetchCommittedOffsets(final 
Set<TopicPartition> partitions,
+                                                                  final 
Consumer<byte[], byte[]> consumer) {
+        if (partitions.isEmpty()) {
+            return Collections.emptyMap();
+        }
+
+        final Map<TopicPartition, Long> committedOffsets;
         try {
-            final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> 
future =  adminClient.listOffsets(
-                
partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> 
OffsetSpec.latest())))
-                                                                               
         .all();
-            endOffsets = future.get();
+            // those which do not have a committed offset would default to 0
+            committedOffsets = 
consumer.committed(partitions).entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() 
== null ? 0L : e.getValue().offset()));
+        } catch (final TimeoutException e) {
+            LOG.warn("The committed offsets request timed out, try increasing 
the consumer client's default.api.timeout.ms", e);
+            throw e;
+        } catch (final KafkaException e) {
+            LOG.warn("The committed offsets request failed.", e);
+            throw new StreamsException(String.format("Failed to retrieve end 
offsets for %s", partitions), e);
+        }
+
+        return committedOffsets;
+    }
 
+    public static KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> 
fetchEndOffsetsFuture(final Collection<TopicPartition> partitions,
+                                                                               
                 final Admin adminClient) {
+        return adminClient.listOffsets(
+            partitions.stream().collect(Collectors.toMap(Function.identity(), 
tp -> OffsetSpec.latest())))
+            .all();
+    }
+
+    /**
+     * A helper method that wraps the {@code Future#get} call and rethrows any 
thrown exception as a StreamsException
+     * @throws StreamsException if the admin client request throws an exception
+     * @throws org.apache.kafka.common.errors.TimeoutException if the request 
times out
+     */
+    public static Map<TopicPartition, ListOffsetsResultInfo> 
getEndOffsets(final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> 
endOffsetsFuture) {
+        try {
+            return endOffsetsFuture.get();
+        } catch (final TimeoutException e) {
+            LOG.warn("The listOffsets request timed out, try increasing the 
admin client's default.api.timeout.ms", e);
+            throw e;

Review comment:
       Upon retrospect, I'm not sure if this is possible. The javadoc for 
Future#get indicates that any exception would be wrapped in an 
ExecutionException. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to