vvcephei commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r460287991
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ########## @@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, final RecordConverter recordConverter) { for (final TopicPartition topicPartition : topicPartitions) { globalConsumer.assign(Collections.singletonList(topicPartition)); + long offset; final Long checkpoint = checkpointFileCache.get(topicPartition); if (checkpoint != null) { globalConsumer.seek(topicPartition, checkpoint); + offset = checkpoint; } else { globalConsumer.seekToBeginning(Collections.singletonList(topicPartition)); + final AtomicLong position = new AtomicLong(); + retryUntilSuccessOrThrowOnTaskTimeout( + () -> position.set(globalConsumer.position(topicPartition)), + String.format( + "Failed to get position for partition %s. The broker may be transiently unavailable at the moment.", + topicPartition + ) + ); + offset = position.get(); } - long offset = globalConsumer.position(topicPartition); final Long highWatermark = highWatermarks.get(topicPartition); final RecordBatchingStateRestoreCallback stateRestoreAdapter = StateRestoreCallbackAdapter.adapt(stateRestoreCallback); stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark); long restoreCount = 0L; + long deadlineMs = NO_DEADLINE; while (offset < highWatermark) { try { final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(pollTime); + if (records.isEmpty()) { + if (taskTimeoutMs == 0L) { + deadlineMs = maybeUpdateDeadlineOrThrow( Review comment: Man, this is confusing. I think I see what you're getting at, but it seems pretty strange to have to go to all that trouble to extract the consumer config so that we can apply a shorter timeout on each poll, but then loop around until the originally configured client timeout passes. Can you explain how the outcome is different than just calling `globalConsumer.poll(requestTimeoutMs)`? It also seems strange to extract a consumer timeout configuration that specifically does not apply to `poll` and apply it to `poll`. This seems like it would violate users' expectations when they set that configuration value. Why wouldn't we instead apply the non-progress timeout (`taskTimeoutMs`), since it seems like that's exactly what it's for? I.e., it seems like `globalConsumer.poll(pollTime + taskTimeoutMs)` might be the most appropriate choice here? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ########## @@ -185,32 +200,16 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta log.info("Restoring state for global store {}", store.name()); final List<TopicPartition> topicPartitions = topicPartitionsForStore(store); - Map<TopicPartition, Long> highWatermarks = null; - int attempts = 0; - while (highWatermarks == null) { - try { - highWatermarks = globalConsumer.endOffsets(topicPartitions); - } catch (final TimeoutException retryableException) { - if (++attempts > retries) { - log.error("Failed to get end offsets for topic partitions of global store {} after {} retry attempts. " + - "You can increase the number of retries via configuration parameter `retries`.", - store.name(), - retries, - retryableException); - throw new StreamsException(String.format("Failed to get end offsets for topic partitions of global store %s after %d retry attempts. " + - "You can increase the number of retries via configuration parameter `retries`.", store.name(), retries), - retryableException); - } - log.debug("Failed to get end offsets for partitions {}, backing off for {} ms to retry (attempt {} of {})", - topicPartitions, - retryBackoffMs, - attempts, - retries, - retryableException); - Utils.sleep(retryBackoffMs); - } - } + final Map<TopicPartition, Long> highWatermarks = new HashMap<>(); + retryUntilSuccessOrThrowOnTaskTimeout( + () -> highWatermarks.putAll(globalConsumer.endOffsets(topicPartitions)), Review comment: No need to change anything, just a note: It's mildly concerning to see side-effecting operations inside a retry loop, since there's no guarantee that failed attempts won't leave any garbage in the system, for example leaving a couple of things in highWatermarks on each attempt. I've read through the code, and I don't think this could actually happen right now, but it could after refactoring. Since we're really just retrying simple API calls, something like this would be safer: ```java private <R> R retryUntilSuccessOrThrowOnTaskTimeout(final Supplier<R> supplier, final String errorMessage) { long deadlineMs = NO_DEADLINE; do { try { return supplier.get(); } catch (final TimeoutException retryableException) { if (taskTimeoutMs == 0L) { throw new StreamsException( String.format( "Retrying is disabled. You can enable it by setting `%s` to a value larger than zero.", StreamsConfig.TASK_TIMEOUT_MS_CONFIG ), retryableException ); } deadlineMs = maybeUpdateDeadlineOrThrow(deadlineMs); log.debug(errorMessage, retryableException); } } while (true); } ``` Which would support: ```java final Map<TopicPartition, Long> highWatermarks = new HashMap<>( retryUntilSuccessOrThrowOnTaskTimeout( () -> globalConsumer.endOffsets(topicPartitions), String.format( "Failed to get offsets for partitions %s. The broker may be transiently unavailable at the moment.", topicPartitions ) ) ); ``` Incidentally, this seems to be exactly the same as: ```java final Map<TopicPartition, Long> highWatermarks = new HashMap<>( globalConsumer.endOffsets(topicPartitions, Duration.ofMillis(taskTimeoutMs)) ); ``` right? That might be preferable because we're not implementing any backoff, whereas retrying internally in the client, it can do exponential backoff, etc. (I think). ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ########## @@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, restoreRecords.add(recordConverter.convert(record)); } } - offset = globalConsumer.position(topicPartition); + try { + offset = globalConsumer.position(topicPartition); + } catch (final TimeoutException error) { + // the `globalConsumer.position()` call should never block, because we know that we did + // a successful `position()` call above for the requested partition and thus the consumer + // should have a valid local position that it can return immediately Review comment: It seems like what we're doing here is syncing our `offset` variable with the internal position cursor of the client. Although @mjsax is claiming the client never needs to update its position from the consumer group, the position would advance as a consequence of polling records earlier in this loop. Right? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ########## @@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, restoreRecords.add(recordConverter.convert(record)); } } - offset = globalConsumer.position(topicPartition); + try { + offset = globalConsumer.position(topicPartition); + } catch (final TimeoutException error) { + // the `globalConsumer.position()` call should never block, because we know that we did + // a successful `position()` call above for the requested partition and thus the consumer + // should have a valid local position that it can return immediately Review comment: Hey @mjsax , it sounds like you have an opinion about what would be unexpected internal behavior of the client, based on your knowledge of how it is currently implemented. There doesn't seem to be anything in the contract of the client that says we should rely on this implementation detail, so it seems inappropriate to build in a fatal exception for this case. ########## File path: streams/src/main/java/org/apache/kafka/streams/internals/QuietStreamsConfig.java ########## @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.internals; - -import org.apache.kafka.streams.StreamsConfig; - -import java.util.Map; - -/** - * A {@link StreamsConfig} that does not log its configuration on construction. - * - * This producer cleaner output for unit tests using the {@code test-utils}, - * since logging the config is not really valuable in this context. - */ -public class QuietStreamsConfig extends StreamsConfig { Review comment: I guessing because there are other "quiet" configs there. On the other hand, the "quiet *Streams* config" isn't really a "*client* util"... ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org