mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r460341186
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback
stateRestoreCallback,
final RecordConverter recordConverter) {
for (final TopicPartition topicPartition : topicPartitions) {
globalConsumer.assign(Collections.singletonList(topicPartition));
+ long offset;
final Long checkpoint = checkpointFileCache.get(topicPartition);
if (checkpoint != null) {
globalConsumer.seek(topicPartition, checkpoint);
+ offset = checkpoint;
} else {
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+ final AtomicLong position = new AtomicLong();
+ retryUntilSuccessOrThrowOnTaskTimeout(
+ () ->
position.set(globalConsumer.position(topicPartition)),
+ String.format(
+ "Failed to get position for partition %s. The broker
may be transiently unavailable at the moment.",
+ topicPartition
+ )
+ );
+ offset = position.get();
}
- long offset = globalConsumer.position(topicPartition);
final Long highWatermark = highWatermarks.get(topicPartition);
final RecordBatchingStateRestoreCallback stateRestoreAdapter =
StateRestoreCallbackAdapter.adapt(stateRestoreCallback);
stateRestoreListener.onRestoreStart(topicPartition, storeName,
offset, highWatermark);
long restoreCount = 0L;
+ long deadlineMs = NO_DEADLINE;
while (offset < highWatermark) {
try {
final ConsumerRecords<byte[], byte[]> records =
globalConsumer.poll(pollTime);
+ if (records.isEmpty()) {
+ if (taskTimeoutMs == 0L) {
+ deadlineMs = maybeUpdateDeadlineOrThrow(
Review comment:
Well, the requestTimeout is no really "random" -- if we send a fetch
request, it's the maximum time until the producer would abort the request and
retry. -- _If_ poll() would throw a TimeoutException, this would be the point
when we get a timeout if there are no retries. -- To be fair, maybe using the
consumer's _default_api_timeout_ms_ config might be better than using
requestTimeout though? -- The point being is, that picking a consumer config
seems to be the best thing we can do for this case instead of a hard-coded
number (or introducing a new config for this specific corner case).
> pollTime may not even appropriate for the global thread
I think it does (even if differently). At least for regular processing: (1)
we interleave polling and flushing, and also when we stop the client, we want
it to responsive and not being blocked in poll() for 5 minutes (taskTimeoutMs
default) for this case. (What of course implies that one cannot really
"intercept" the startup phase atm -- something we might want to address, but
maybe not in this PR.)
> Clearly, we need to set some kind of lower bound on it, though, but it's
not clear that it needs to be configurable.
I think the worst we can do is to make it not configurable. :)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]