vamossagar12 commented on code in PR #14372:
URL: https://github.com/apache/kafka/pull/14372#discussion_r1670154691


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java:
##########
@@ -2156,6 +2157,42 @@ public void testAlterOffsetsSourceConnector(boolean 
enableTopicCreation) throws
         verifyKafkaClusterId();
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testGetSourceConnectorOffsetsFetchError(boolean 
enableTopicCreation) {
+        setup(enableTopicCreation);
+        mockKafkaClusterId();
+
+        ConnectorOffsetBackingStore offsetStore = 
mock(ConnectorOffsetBackingStore.class);
+        CloseableOffsetStorageReader offsetReader = 
mock(CloseableOffsetStorageReader.class);
+
+        Set<Map<String, Object>> connectorPartitions =
+            Collections.singleton(Collections.singletonMap("partitionKey", 
"partitionValue"));
+
+        
when(executorService.submit(any(Runnable.class))).thenAnswer(invocation -> {
+            invocation.getArgument(0, Runnable.class).run();
+            return null;
+        });
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, 
offsetBackingStore, executorService,
+            allConnectorClientConfigOverridePolicy, null);
+        worker.start();
+
+        
when(offsetStore.connectorPartitions(CONNECTOR_ID)).thenReturn(connectorPartitions);
+        doAnswer(invocation -> {
+            throw new ExecutionException(new 
SaslAuthenticationException("error"));
+        }).when(offsetReader).offsets(connectorPartitions);

Review Comment:
   The reason I am throwing `ExecutionException` here is that when the read to 
offsets end fails, the associated callback is set with the error. That error is 
usually wrapped in an `ExecutionException` when eventually we do a get on the 
callback 
[here](https://github.com/apache/kafka/blob/67e68596329df1d990310f02767a47512a44d568/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConvertingFutureCallback.java#L135).
   
   In the case of OffsetStorageReadImpl::offsets as well, something similar 
happens 
[here](https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java#L102)
 where we wait on the future callback object which is what is submitted to the 
   `readToEnd` operation 
[here](https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java#L281)
 and that gets handled eventually in the WorkThread.
   
   Yes `ConnectException` is still thrown but that would wrap this exception 
imo. In the ticket , I had shared a stacktrace, and this is how it looks like :
   
   ```
   [2022-11-20 09:00:59,307] ERROR Unexpected exception in Thread[KafkaBasedLog 
Work Thread - connect-offsets,5,main] 
(org.apache.kafka.connect.util.KafkaBasedLog:440)org.apache.kafka.connect.errors.ConnectException:
 Error while getting end offsets for topic 'connect-offsets' on brokers at XXX
         at 
org.apache.kafka.connect.util.TopicAdmin.endOffsets(TopicAdmin.java:695)      
         at 
org.apache.kafka.connect.util.KafkaBasedLog.readEndOffsets(KafkaBasedLog.java:371)
      
         at 
org.apache.kafka.connect.util.KafkaBasedLog.readToLogEnd(KafkaBasedLog.java:332)
      
         at 
org.apache.kafka.connect.util.KafkaBasedLog.access$400(KafkaBasedLog.java:75)   
   
         at 
org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:406)
   Caused by: java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
         at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
         at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
         at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
         at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
         at 
org.apache.kafka.connect.util.TopicAdmin.endOffsets(TopicAdmin.java:672)
         ... 4 more
   ```
   which is inline with what I tried to explain above. Note that, even in this 
test, the next line being asserted is
   ```
   assertEquals(ConnectException.class, e.getCause().getClass());
   ```
   
   because I am invoking 
[sourceConnectorOffsets](https://github.com/apache/kafka/blob/67e68596329df1d990310f02767a47512a44d568/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1264)
 which does this wrapping. 
   
   Let me know if that explanation makes sense.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to