viktorsomogyi commented on code in PR #18142:
URL: https://github.com/apache/kafka/pull/18142#discussion_r3249357041


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java:
##########
@@ -86,14 +86,18 @@ public <T> Map<Map<String, T>, Map<String, Object>> 
offsets(Collection<Map<Strin
         Map<ByteBuffer, ByteBuffer> raw;
         try {
             Future<Map<ByteBuffer, ByteBuffer>> offsetReadFuture;
+
+            // Note: this call can block for long time waiting for data flush 
to complete (`KafkaProducer.flush()`).
+            offsetReadFuture = backingStore.get(serializedToOriginal.keySet());

Review Comment:
   (Ofc with the error message moved to a constant to avoid duplication)



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java:
##########
@@ -86,14 +86,18 @@ public <T> Map<Map<String, T>, Map<String, Object>> 
offsets(Collection<Map<Strin
         Map<ByteBuffer, ByteBuffer> raw;
         try {
             Future<Map<ByteBuffer, ByteBuffer>> offsetReadFuture;
+
+            // Note: this call can block for long time waiting for data flush 
to complete (`KafkaProducer.flush()`).
+            offsetReadFuture = backingStore.get(serializedToOriginal.keySet());

Review Comment:
   I'm wondering if we need a behavior change here, afterall the point would be 
to fix the race condition, but with this change we also call 
`backingStore.get()` regardless of the value of `closed` which may impose some 
extra work. Do you think it would make sense to duplicate the if condition 
below to fail fast?
   
   ```suggestion
               if (closed.get()) {
                   throw new ConnectException(
                           "Offset reader is closed. This is likely because the 
task has already been "
                               + "scheduled to stop but has taken longer than 
the graceful shutdown "
                               + "period to do so.");
               }
               
               // Note: this call can block for long time waiting for data 
flush to complete (`KafkaProducer.flush()`).
               offsetReadFuture = 
backingStore.get(serializedToOriginal.keySet());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to