This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push: new 28211c0 [FLINK-24347][connectors/kafka] Keep idle source readers if parallelism is higher than partitions in KafkaSource 28211c0 is described below commit 28211c0dec1dfb706c5d9583fd757cdd4de38ce8 Author: Fabian Paul <fabianp...@ververica.com> AuthorDate: Wed Sep 22 01:14:15 2021 +0200 [FLINK-24347][connectors/kafka] Keep idle source readers if parallelism is higher than partitions in KafkaSource Before this commit the enumerator signalled the leftover source readers without a partition to finish. This caused that checkpointing was not possible anymore because it is only supported if all tasks are running or FLIP-147 is enabled. This closes #17330 --- .../flink/connector/kafka/source/KafkaSource.java | 4 ++- .../source/enumerator/KafkaSourceEnumerator.java | 10 ++++-- .../source/enumerator/KafkaEnumeratorTest.java | 2 ++ .../connectors/kafka/KafkaConsumerTestBase.java | 4 ++- .../kafka/testutils/ValidatingExactlyOnceSink.java | 38 ++++++++++++++++------ 5 files changed, 44 insertions(+), 14 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java index d1219c0..30c4dd2 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java @@ -175,7 +175,8 @@ public class KafkaSource<OUT> startingOffsetsInitializer, stoppingOffsetsInitializer, props, - enumContext); + enumContext, + boundedness); } @Override @@ -189,6 +190,7 @@ public class KafkaSource<OUT> stoppingOffsetsInitializer, props, enumContext, + boundedness, checkpoint.assignedPartitions()); } diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java index 9c69f38..1af52c2 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java @@ -20,6 +20,7 @@ package org.apache.flink.connector.kafka.source.enumerator; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.api.connector.source.SplitsAssignment; @@ -64,6 +65,7 @@ public class KafkaSourceEnumerator private final Properties properties; private final long partitionDiscoveryIntervalMs; private final SplitEnumeratorContext<KafkaPartitionSplit> context; + private final Boundedness boundedness; // The internal states of the enumerator. /** @@ -97,13 +99,15 @@ public class KafkaSourceEnumerator OffsetsInitializer startingOffsetInitializer, OffsetsInitializer stoppingOffsetInitializer, Properties properties, - SplitEnumeratorContext<KafkaPartitionSplit> context) { + SplitEnumeratorContext<KafkaPartitionSplit> context, + Boundedness boundedness) { this( subscriber, startingOffsetInitializer, stoppingOffsetInitializer, properties, context, + boundedness, Collections.emptySet()); } @@ -113,12 +117,14 @@ public class KafkaSourceEnumerator OffsetsInitializer stoppingOffsetInitializer, Properties properties, SplitEnumeratorContext<KafkaPartitionSplit> context, + Boundedness boundedness, Set<TopicPartition> assignedPartitions) { this.subscriber = subscriber; this.startingOffsetInitializer = startingOffsetInitializer; this.stoppingOffsetInitializer = stoppingOffsetInitializer; this.properties = properties; this.context = context; + this.boundedness = boundedness; this.discoveredPartitions = new HashSet<>(); this.assignedPartitions = new HashSet<>(assignedPartitions); @@ -296,7 +302,7 @@ public class KafkaSourceEnumerator // If periodically partition discovery is disabled and the initializing discovery has done, // signal NoMoreSplitsEvent to pending readers - if (noMoreNewPartitionSplits) { + if (noMoreNewPartitionSplits && boundedness == Boundedness.BOUNDED) { LOG.debug( "No more KafkaPartitionSplits to assign. Sending NoMoreSplitsEvent to reader {}" + " in consumer group {}.", diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java index a46ac2d..edb4c81 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.kafka.source.enumerator; +import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.ReaderInfo; import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext; import org.apache.flink.connector.kafka.source.KafkaSourceOptions; @@ -432,6 +433,7 @@ public class KafkaEnumeratorTest { stoppingOffsetsInitializer, props, enumContext, + Boundedness.CONTINUOUS_UNBOUNDED, assignedPartitions); } diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index 93e0cfe..a76534f 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -1104,8 +1104,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink { getStream(env, topic, schema, props) .map(new PartitionValidatingMapper(numPartitions, 1)) + // Job only fails after a checkpoint is taken and the necessary number of elements + // is seen .map(new FailingIdentityMapper<Integer>(failAfterElements)) - .addSink(new ValidatingExactlyOnceSink(totalElements)) + .addSink(new ValidatingExactlyOnceSink(totalElements, true)) .setParallelism(1); FailingIdentityMapper.failedBefore = false; diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java index 41a2d86..f00be21 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.connectors.kafka.testutils; +import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; @@ -32,20 +33,26 @@ import java.util.List; /** A {@link RichSinkFunction} that verifies that no duplicate records are generated. */ public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> - implements ListCheckpointed<Tuple2<Integer, BitSet>> { + implements ListCheckpointed<Tuple2<Integer, BitSet>>, CheckpointListener { private static final Logger LOG = LoggerFactory.getLogger(ValidatingExactlyOnceSink.class); private static final long serialVersionUID = 1748426382527469932L; private final int numElementsTotal; + private final boolean waitForFinalCheckpoint; private BitSet duplicateChecker = new BitSet(); // this is checkpointed private int numElements; // this is checkpointed public ValidatingExactlyOnceSink(int numElementsTotal) { + this(numElementsTotal, false); + } + + public ValidatingExactlyOnceSink(int numElementsTotal, boolean waitForFinalCheckpoint) { this.numElementsTotal = numElementsTotal; + this.waitForFinalCheckpoint = waitForFinalCheckpoint; } @Override @@ -56,15 +63,8 @@ public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> throw new Exception("Received a duplicate: " + value); } duplicateChecker.set(value); - if (numElements == numElementsTotal) { - // validate - if (duplicateChecker.cardinality() != numElementsTotal) { - throw new Exception("Duplicate checker has wrong cardinality"); - } else if (duplicateChecker.nextClearBit(0) != numElementsTotal) { - throw new Exception("Received sparse sequence"); - } else { - throw new SuccessException(); - } + if (!waitForFinalCheckpoint) { + checkFinish(); } } @@ -87,4 +87,22 @@ public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> this.numElements = s.f0; this.duplicateChecker = s.f1; } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + checkFinish(); + } + + private void checkFinish() throws Exception { + if (numElements == numElementsTotal) { + // validate + if (duplicateChecker.cardinality() != numElementsTotal) { + throw new Exception("Duplicate checker has wrong cardinality"); + } else if (duplicateChecker.nextClearBit(0) != numElementsTotal) { + throw new Exception("Received sparse sequence"); + } else { + throw new SuccessException(); + } + } + } }