This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 28211c0  [FLINK-24347][connectors/kafka] Keep idle source readers if 
parallelism is higher than partitions in KafkaSource
28211c0 is described below

commit 28211c0dec1dfb706c5d9583fd757cdd4de38ce8
Author: Fabian Paul <fabianp...@ververica.com>
AuthorDate: Wed Sep 22 01:14:15 2021 +0200

    [FLINK-24347][connectors/kafka] Keep idle source readers if parallelism is 
higher than partitions in KafkaSource
    
    Before this commit the enumerator signalled the leftover source readers
    without a partition to finish. This caused that checkpointing was not
    possible anymore because it is only supported if all tasks are running
    or FLIP-147 is enabled.
    
    This closes #17330
---
 .../flink/connector/kafka/source/KafkaSource.java  |  4 ++-
 .../source/enumerator/KafkaSourceEnumerator.java   | 10 ++++--
 .../source/enumerator/KafkaEnumeratorTest.java     |  2 ++
 .../connectors/kafka/KafkaConsumerTestBase.java    |  4 ++-
 .../kafka/testutils/ValidatingExactlyOnceSink.java | 38 ++++++++++++++++------
 5 files changed, 44 insertions(+), 14 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
index d1219c0..30c4dd2 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
@@ -175,7 +175,8 @@ public class KafkaSource<OUT>
                 startingOffsetsInitializer,
                 stoppingOffsetsInitializer,
                 props,
-                enumContext);
+                enumContext,
+                boundedness);
     }
 
     @Override
@@ -189,6 +190,7 @@ public class KafkaSource<OUT>
                 stoppingOffsetsInitializer,
                 props,
                 enumContext,
+                boundedness,
                 checkpoint.assignedPartitions());
     }
 
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
index 9c69f38..1af52c2 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
@@ -20,6 +20,7 @@ package org.apache.flink.connector.kafka.source.enumerator;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.api.connector.source.SplitsAssignment;
@@ -64,6 +65,7 @@ public class KafkaSourceEnumerator
     private final Properties properties;
     private final long partitionDiscoveryIntervalMs;
     private final SplitEnumeratorContext<KafkaPartitionSplit> context;
+    private final Boundedness boundedness;
 
     // The internal states of the enumerator.
     /**
@@ -97,13 +99,15 @@ public class KafkaSourceEnumerator
             OffsetsInitializer startingOffsetInitializer,
             OffsetsInitializer stoppingOffsetInitializer,
             Properties properties,
-            SplitEnumeratorContext<KafkaPartitionSplit> context) {
+            SplitEnumeratorContext<KafkaPartitionSplit> context,
+            Boundedness boundedness) {
         this(
                 subscriber,
                 startingOffsetInitializer,
                 stoppingOffsetInitializer,
                 properties,
                 context,
+                boundedness,
                 Collections.emptySet());
     }
 
@@ -113,12 +117,14 @@ public class KafkaSourceEnumerator
             OffsetsInitializer stoppingOffsetInitializer,
             Properties properties,
             SplitEnumeratorContext<KafkaPartitionSplit> context,
+            Boundedness boundedness,
             Set<TopicPartition> assignedPartitions) {
         this.subscriber = subscriber;
         this.startingOffsetInitializer = startingOffsetInitializer;
         this.stoppingOffsetInitializer = stoppingOffsetInitializer;
         this.properties = properties;
         this.context = context;
+        this.boundedness = boundedness;
 
         this.discoveredPartitions = new HashSet<>();
         this.assignedPartitions = new HashSet<>(assignedPartitions);
@@ -296,7 +302,7 @@ public class KafkaSourceEnumerator
 
         // If periodically partition discovery is disabled and the 
initializing discovery has done,
         // signal NoMoreSplitsEvent to pending readers
-        if (noMoreNewPartitionSplits) {
+        if (noMoreNewPartitionSplits && boundedness == Boundedness.BOUNDED) {
             LOG.debug(
                     "No more KafkaPartitionSplits to assign. Sending 
NoMoreSplitsEvent to reader {}"
                             + " in consumer group {}.",
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
index a46ac2d..edb4c81 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.kafka.source.enumerator;
 
+import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.ReaderInfo;
 import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
 import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
@@ -432,6 +433,7 @@ public class KafkaEnumeratorTest {
                 stoppingOffsetsInitializer,
                 props,
                 enumContext,
+                Boundedness.CONTINUOUS_UNBOUNDED,
                 assignedPartitions);
     }
 
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 93e0cfe..a76534f 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -1104,8 +1104,10 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBaseWithFlink {
 
         getStream(env, topic, schema, props)
                 .map(new PartitionValidatingMapper(numPartitions, 1))
+                // Job only fails after a checkpoint is taken and the 
necessary number of elements
+                // is seen
                 .map(new FailingIdentityMapper<Integer>(failAfterElements))
-                .addSink(new ValidatingExactlyOnceSink(totalElements))
+                .addSink(new ValidatingExactlyOnceSink(totalElements, true))
                 .setParallelism(1);
 
         FailingIdentityMapper.failedBefore = false;
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
index 41a2d86..f00be21 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.connectors.kafka.testutils;
 
+import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -32,20 +33,26 @@ import java.util.List;
 
 /** A {@link RichSinkFunction} that verifies that no duplicate records are 
generated. */
 public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer>
-        implements ListCheckpointed<Tuple2<Integer, BitSet>> {
+        implements ListCheckpointed<Tuple2<Integer, BitSet>>, 
CheckpointListener {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ValidatingExactlyOnceSink.class);
 
     private static final long serialVersionUID = 1748426382527469932L;
 
     private final int numElementsTotal;
+    private final boolean waitForFinalCheckpoint;
 
     private BitSet duplicateChecker = new BitSet(); // this is checkpointed
 
     private int numElements; // this is checkpointed
 
     public ValidatingExactlyOnceSink(int numElementsTotal) {
+        this(numElementsTotal, false);
+    }
+
+    public ValidatingExactlyOnceSink(int numElementsTotal, boolean 
waitForFinalCheckpoint) {
         this.numElementsTotal = numElementsTotal;
+        this.waitForFinalCheckpoint = waitForFinalCheckpoint;
     }
 
     @Override
@@ -56,15 +63,8 @@ public class ValidatingExactlyOnceSink extends 
RichSinkFunction<Integer>
             throw new Exception("Received a duplicate: " + value);
         }
         duplicateChecker.set(value);
-        if (numElements == numElementsTotal) {
-            // validate
-            if (duplicateChecker.cardinality() != numElementsTotal) {
-                throw new Exception("Duplicate checker has wrong cardinality");
-            } else if (duplicateChecker.nextClearBit(0) != numElementsTotal) {
-                throw new Exception("Received sparse sequence");
-            } else {
-                throw new SuccessException();
-            }
+        if (!waitForFinalCheckpoint) {
+            checkFinish();
         }
     }
 
@@ -87,4 +87,22 @@ public class ValidatingExactlyOnceSink extends 
RichSinkFunction<Integer>
         this.numElements = s.f0;
         this.duplicateChecker = s.f1;
     }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        checkFinish();
+    }
+
+    private void checkFinish() throws Exception {
+        if (numElements == numElementsTotal) {
+            // validate
+            if (duplicateChecker.cardinality() != numElementsTotal) {
+                throw new Exception("Duplicate checker has wrong cardinality");
+            } else if (duplicateChecker.nextClearBit(0) != numElementsTotal) {
+                throw new Exception("Received sparse sequence");
+            } else {
+                throw new SuccessException();
+            }
+        }
+    }
 }

Reply via email to