becketqin commented on a change in pull request #14239:
URL: https://github.com/apache/flink/pull/14239#discussion_r531331794
##
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
##
@@ -170,6 +178,70 @@ public void testSnapshotAndRestore() throws Exception {
}
+ @Test
+ public void testCallableInterruptedDuringShutdownDoNotFailJob() throws
InterruptedException {
Review comment:
Very good suggestion! This should works quite well.
I was actually struggling on this test a little bit. The tricky part is that
`testingContext.close()` also closes the two executors. So it is not guaranteed
that the error handling logic is actually tested every time the test runs. The
`ManuallyTriggeredScheduledExecutorService` works well in this case because
`triggerAll()` will still fire even after the executor has shutdown. It is
probably not the generally correct behavior, but really helps in this
particular test :)
##
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
##
@@ -321,4 +328,37 @@ private void registerReader() {
ListState getReaderState() {
return readerState;
}
+
+ // --- private class --
+
+ private static class CountingDataOutput implements DataOutput {
Review comment:
Sure, I updated the patch to move this to the
`SourceOperatorStreamTask`. It was actually my first choice as well. The
reasons I eventually put it in the `SourceOperator` is that the
`CountingOutput` are create in `AbstractStreamOperator` and
`AbstractStreamOperatorV2`, which is at the operator level.
##
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
##
@@ -131,7 +131,14 @@ public void start() {
LOG.info("Starting the KafkaSourceEnumerator for
consumer group {} " +
"without periodic partition discovery.",
consumerGroupId);
context.callAsync(
-
this::discoverAndInitializePartitionSplit,
+ () -> {
+ try {
+ return
discoverAndInitializePartitionSplit();
+ } finally {
+ // Close the admin
client early because we won't use it anymore.
+ adminClient.close();
Review comment:
The admin client is created in the `start()` method, and closed in two
different ways.
1. If the partition discovery is performed periodically, the admin client is
closed when the enumerator is closed. In this case, the
`KafkaSourceEnumerator.close()` method closes the admin client. That is also
why it is an instance variable.
2. If the partition discovery is not performed periodically but just once at
the beginning, the admin client is closed in the `SourceCoordinatorContext`
worker thread after it completes the partition discovery. So we don't have the
admin client running while not being used.
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org