viktorsomogyi commented on code in PR #18142:
URL: https://github.com/apache/kafka/pull/18142#discussion_r3249357041
##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java:
##########
@@ -86,14 +86,18 @@ public <T> Map<Map<String, T>, Map<String, Object>>
offsets(Collection<Map<Strin
Map<ByteBuffer, ByteBuffer> raw;
try {
Future<Map<ByteBuffer, ByteBuffer>> offsetReadFuture;
+
+ // Note: this call can block for long time waiting for data flush
to complete (`KafkaProducer.flush()`).
+ offsetReadFuture = backingStore.get(serializedToOriginal.keySet());
Review Comment:
(Ofc with the error message moved to a constant to avoid duplication)
##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java:
##########
@@ -86,14 +86,18 @@ public <T> Map<Map<String, T>, Map<String, Object>>
offsets(Collection<Map<Strin
Map<ByteBuffer, ByteBuffer> raw;
try {
Future<Map<ByteBuffer, ByteBuffer>> offsetReadFuture;
+
+ // Note: this call can block for long time waiting for data flush
to complete (`KafkaProducer.flush()`).
+ offsetReadFuture = backingStore.get(serializedToOriginal.keySet());
Review Comment:
I'm wondering if we need a behavior change here, afterall the point would be
to fix the race condition, but with this change we also call
`backingStore.get()` regardless of the value of `closed` which may impose some
extra work. Do you think it would make sense to duplicate the if condition
below to fail fast?
```suggestion
if (closed.get()) {
throw new ConnectException(
"Offset reader is closed. This is likely because the
task has already been "
+ "scheduled to stop but has taken longer than
the graceful shutdown "
+ "period to do so.");
}
// Note: this call can block for long time waiting for data
flush to complete (`KafkaProducer.flush()`).
offsetReadFuture =
backingStore.get(serializedToOriginal.keySet());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]