vvcephei commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r460287991



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
                               final RecordConverter recordConverter) {
         for (final TopicPartition topicPartition : topicPartitions) {
             globalConsumer.assign(Collections.singletonList(topicPartition));
+            long offset;
             final Long checkpoint = checkpointFileCache.get(topicPartition);
             if (checkpoint != null) {
                 globalConsumer.seek(topicPartition, checkpoint);
+                offset = checkpoint;
             } else {
                 
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+                final AtomicLong position = new AtomicLong();
+                retryUntilSuccessOrThrowOnTaskTimeout(
+                    () -> 
position.set(globalConsumer.position(topicPartition)),
+                    String.format(
+                        "Failed to get position for partition %s. The broker 
may be transiently unavailable at the moment.",
+                        topicPartition
+                    )
+                );
+                offset = position.get();
             }
 
-            long offset = globalConsumer.position(topicPartition);
             final Long highWatermark = highWatermarks.get(topicPartition);
             final RecordBatchingStateRestoreCallback stateRestoreAdapter =
                 StateRestoreCallbackAdapter.adapt(stateRestoreCallback);
 
             stateRestoreListener.onRestoreStart(topicPartition, storeName, 
offset, highWatermark);
             long restoreCount = 0L;
 
+            long deadlineMs = NO_DEADLINE;
             while (offset < highWatermark) {
                 try {
                     final ConsumerRecords<byte[], byte[]> records = 
globalConsumer.poll(pollTime);
+                    if (records.isEmpty()) {
+                        if (taskTimeoutMs == 0L) {
+                            deadlineMs = maybeUpdateDeadlineOrThrow(

Review comment:
       Man, this is confusing. I think I see what you're getting at, but it 
seems pretty strange to have to go to all that trouble to extract the consumer 
config so that we can apply a shorter timeout on each poll, but then loop 
around until the originally configured client timeout passes.
   
   Can you explain how the outcome is different than just calling 
`globalConsumer.poll(requestTimeoutMs)`?
   
   It also seems strange to extract a consumer timeout configuration that 
specifically does not apply to `poll` and apply it to `poll`. This seems like 
it would violate users' expectations when they set that configuration value.
   
   Why wouldn't we instead apply the non-progress timeout (`taskTimeoutMs`), 
since it seems like that's exactly what it's for? I.e., it seems like 
`globalConsumer.poll(pollTime + taskTimeoutMs)` might be the most appropriate 
choice here?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -185,32 +200,16 @@ public void registerStore(final StateStore store, final 
StateRestoreCallback sta
 
         log.info("Restoring state for global store {}", store.name());
         final List<TopicPartition> topicPartitions = 
topicPartitionsForStore(store);
-        Map<TopicPartition, Long> highWatermarks = null;
 
-        int attempts = 0;
-        while (highWatermarks == null) {
-            try {
-                highWatermarks = globalConsumer.endOffsets(topicPartitions);
-            } catch (final TimeoutException retryableException) {
-                if (++attempts > retries) {
-                    log.error("Failed to get end offsets for topic partitions 
of global store {} after {} retry attempts. " +
-                        "You can increase the number of retries via 
configuration parameter `retries`.",
-                        store.name(),
-                        retries,
-                        retryableException);
-                    throw new StreamsException(String.format("Failed to get 
end offsets for topic partitions of global store %s after %d retry attempts. " +
-                            "You can increase the number of retries via 
configuration parameter `retries`.", store.name(), retries),
-                        retryableException);
-                }
-                log.debug("Failed to get end offsets for partitions {}, 
backing off for {} ms to retry (attempt {} of {})",
-                    topicPartitions,
-                    retryBackoffMs,
-                    attempts,
-                    retries,
-                    retryableException);
-                Utils.sleep(retryBackoffMs);
-            }
-        }
+        final Map<TopicPartition, Long> highWatermarks = new HashMap<>();
+        retryUntilSuccessOrThrowOnTaskTimeout(
+            () -> 
highWatermarks.putAll(globalConsumer.endOffsets(topicPartitions)),

Review comment:
       No need to change anything, just a note: It's mildly concerning to see 
side-effecting operations inside a retry loop, since there's no guarantee that 
failed attempts won't leave any garbage in the system, for example leaving a 
couple of things in highWatermarks on each attempt. I've read through the code, 
and I don't think this could actually happen right now, but it could after 
refactoring.
   
   Since we're really just retrying simple API calls, something like this would 
be safer:
   ```java
       private <R> R retryUntilSuccessOrThrowOnTaskTimeout(final Supplier<R> 
supplier,
                                                           final String 
errorMessage) {
           long deadlineMs = NO_DEADLINE;
   
           do {
               try {
                   return supplier.get();
               } catch (final TimeoutException retryableException) {
                   if (taskTimeoutMs == 0L) {
                       throw new StreamsException(
                           String.format(
                               "Retrying is disabled. You can enable it by 
setting `%s` to a value larger than zero.",
                               StreamsConfig.TASK_TIMEOUT_MS_CONFIG
                           ),
                           retryableException
                       );
                   }
   
                   deadlineMs = maybeUpdateDeadlineOrThrow(deadlineMs);
   
                   log.debug(errorMessage, retryableException);
               }
           } while (true);
       }
   ```
   
   Which would support:
   ```java
           final Map<TopicPartition, Long> highWatermarks = new HashMap<>(
               retryUntilSuccessOrThrowOnTaskTimeout(
                   () -> globalConsumer.endOffsets(topicPartitions),
                   String.format(
                       "Failed to get offsets for partitions %s. The broker may 
be transiently unavailable at the moment.",
                       topicPartitions
                   )
               )
           );
   ```
   
   Incidentally, this seems to be exactly the same as:
   ```java
   
           final Map<TopicPartition, Long> highWatermarks = new HashMap<>(
               globalConsumer.endOffsets(topicPartitions, 
Duration.ofMillis(taskTimeoutMs))
           );
   ```
   
   right? That might be preferable because we're not implementing any backoff, 
whereas retrying internally in the client, it can do exponential backoff, etc. 
(I think).

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
                             
restoreRecords.add(recordConverter.convert(record));
                         }
                     }
-                    offset = globalConsumer.position(topicPartition);
+                    try {
+                        offset = globalConsumer.position(topicPartition);
+                    } catch (final TimeoutException error) {
+                        // the `globalConsumer.position()` call should never 
block, because we know that we did
+                        // a successful `position()` call above for the 
requested partition and thus the consumer
+                        // should have a valid local position that it can 
return immediately

Review comment:
       It seems like what we're doing here is syncing our `offset` variable 
with the internal position cursor of the client. Although @mjsax is claiming 
the client never needs to update its position from the consumer group, the 
position would advance as a consequence of polling records earlier in this 
loop. Right?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
                             
restoreRecords.add(recordConverter.convert(record));
                         }
                     }
-                    offset = globalConsumer.position(topicPartition);
+                    try {
+                        offset = globalConsumer.position(topicPartition);
+                    } catch (final TimeoutException error) {
+                        // the `globalConsumer.position()` call should never 
block, because we know that we did
+                        // a successful `position()` call above for the 
requested partition and thus the consumer
+                        // should have a valid local position that it can 
return immediately

Review comment:
       Hey @mjsax , it sounds like you have an opinion about what would be 
unexpected internal behavior of the client, based on your knowledge of how it 
is currently implemented. There doesn't seem to be anything in the contract of 
the client that says we should rely on this implementation detail, so it seems 
inappropriate to build in a fatal exception for this case.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/internals/QuietStreamsConfig.java
##########
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.internals;
-
-import org.apache.kafka.streams.StreamsConfig;
-
-import java.util.Map;
-
-/**
- * A {@link StreamsConfig} that does not log its configuration on construction.
- *
- * This producer cleaner output for unit tests using the {@code test-utils},
- * since logging the config is not really valuable in this context.
- */
-public class QuietStreamsConfig extends StreamsConfig {

Review comment:
       I guessing because there are other "quiet" configs there. On the other 
hand, the "quiet *Streams* config" isn't really a "*client* util"...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to