abbccdda commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r462047068
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ########## @@ -274,30 +252,74 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, final RecordConverter recordConverter) { for (final TopicPartition topicPartition : topicPartitions) { globalConsumer.assign(Collections.singletonList(topicPartition)); + long offset; final Long checkpoint = checkpointFileCache.get(topicPartition); if (checkpoint != null) { globalConsumer.seek(topicPartition, checkpoint); + offset = checkpoint; } else { globalConsumer.seekToBeginning(Collections.singletonList(topicPartition)); + offset = retryUntilSuccessOrThrowOnTaskTimeout( + () -> globalConsumer.position(topicPartition), + String.format( + "Failed to get position for partition %s. The broker may be transiently unavailable at the moment.", + topicPartition + ) + ); } - long offset = globalConsumer.position(topicPartition); final Long highWatermark = highWatermarks.get(topicPartition); final RecordBatchingStateRestoreCallback stateRestoreAdapter = StateRestoreCallbackAdapter.adapt(stateRestoreCallback); stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark); long restoreCount = 0L; - while (offset < highWatermark) { - final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(pollTime); + while (offset < highWatermark) { // when we "fix" this loop (KAFKA-7380 / KAFKA-10317) + // we should update the `poll()` timeout below + + // we ignore `poll.ms` config during bootstrapping phase and + // apply `request.timeout.ms` plus `task.timeout.ms` instead + // + // the reason is, that `poll.ms` might be too short to give a fetch request a fair chance + // to actually complete and we don't want to start `task.timeout.ms` too early + // + // we also pass `task.timeout.ms` into `poll()` directly right now as it simplifies our own code: + // if we don't pass it in, we would just track the timeout ourselves and call `poll()` again + // in our own retry loop; by passing the timeout we can reuse the consumer's internal retry loop instead + // + // note that using `request.timeout.ms` provides a conservative upper bound for the timeout; + // this implies that we might start `task.timeout.ms` "delayed" -- however, starting the timeout + // delayed is preferable (as it's more robust) than starting it too early + // + // TODO https://issues.apache.org/jira/browse/KAFKA-10315 + // -> do a more precise timeout handling if `poll` would throw an exception if a fetch request fails + // (instead of letting the consumer retry fetch requests silently) + // + // TODO https://issues.apache.org/jira/browse/KAFKA-10317 and + // https://issues.apache.org/jira/browse/KAFKA-7380 + // -> don't pass in `task.timeout.ms` to stay responsive if `KafkaStreams#close` gets called + final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(requestTimeoutPlusTaskTimeout); + if (records.isEmpty()) { + // this will always throw + maybeUpdateDeadlineOrThrow(time.milliseconds()); Review comment: Could we just throw here? ########## File path: docs/streams/developer-guide/config-streams.html ########## @@ -326,13 +321,18 @@ <h4><a class="toc-backref" href="#id5">bootstrap.servers</a><a class="headerlink <tr class="row-even"><td>state.cleanup.delay.ms</td> <td>Low</td> <td colspan="2">The amount of time in milliseconds to wait before deleting state when a partition has migrated.</td> - <td>600000 milliseconds</td> + <td>600000 milliseconds (10 minutes)</td> </tr> <tr class="row-odd"><td>state.dir</td> <td>High</td> <td colspan="2">Directory location for state stores.</td> <td><code class="docutils literal"><span class="pre">/tmp/kafka-streams</span></code></td> </tr> + <tr class="row-odd"><td>task.timeout.ms</td> + <td>Medium</td> Review comment: @mjsax could we do a screenshot to make sure it looks good on the web-page? ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java ########## @@ -671,19 +1211,21 @@ private void writeCorruptCheckpoint() throws IOException { } } - private void initializeConsumer(final long numRecords, final long startOffset, final TopicPartition topicPartition) { + private void initializeConsumer(final long numRecords, final long startOffset, final TopicPartition... topicPartitions) { + consumer.assign(Arrays.asList(topicPartitions)); + final HashMap<TopicPartition, Long> startOffsets = new HashMap<>(); - startOffsets.put(topicPartition, startOffset); final HashMap<TopicPartition, Long> endOffsets = new HashMap<>(); - endOffsets.put(topicPartition, startOffset + numRecords); - consumer.updatePartitions(topicPartition.topic(), Collections.singletonList(new PartitionInfo(topicPartition.topic(), topicPartition.partition(), null, null, null))); - consumer.assign(Collections.singletonList(topicPartition)); + for (final TopicPartition topicPartition : topicPartitions) { Review comment: nit: we could just use `Map` for `startOffsets` and `endOffsets` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org