Repository: flink Updated Branches: refs/heads/master 069de27df -> f46ca3918
[FLINK-4723] [kafka] Unify committed offsets to Kafka to be next record to process This closes #2580 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f46ca391 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f46ca391 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f46ca391 Branch: refs/heads/master Commit: f46ca39188dce1764ee6615eb6697588fdc04a2a Parents: 069de27 Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Authored: Sun Oct 2 16:54:57 2016 +0800 Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Committed: Tue Oct 18 11:44:30 2016 +0800 ---------------------------------------------------------------------- .../connectors/kafka/Kafka010FetcherTest.java | 8 +- .../connectors/kafka/Kafka010ITCase.java | 18 ++ .../kafka/KafkaTestEnvironmentImpl.java | 33 +- .../kafka/internals/Kafka08Fetcher.java | 15 +- .../internals/PeriodicOffsetCommitter.java | 6 +- .../kafka/internals/ZookeeperOffsetHandler.java | 19 +- .../connectors/kafka/Kafka08ITCase.java | 178 ++-------- .../kafka/KafkaTestEnvironmentImpl.java | 31 ++ .../kafka/internal/Kafka09Fetcher.java | 2 +- .../connectors/kafka/Kafka09FetcherTest.java | 10 +- .../connectors/kafka/Kafka09ITCase.java | 18 ++ .../kafka/KafkaTestEnvironmentImpl.java | 32 +- .../kafka/FlinkKafkaConsumerBase.java | 35 +- .../kafka/internals/AbstractFetcher.java | 9 +- .../kafka/FlinkKafkaConsumerBaseTest.java | 56 ++-- .../connectors/kafka/KafkaConsumerTestBase.java | 322 ++++++++++++++++++- .../connectors/kafka/KafkaTestEnvironment.java | 10 + .../AbstractFetcherTimestampsTest.java | 2 +- 18 files changed, 559 insertions(+), 245 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java index 8f0b170..76e3950 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java @@ -143,7 +143,7 @@ public class Kafka010FetcherTest { @Override public void run() { try { - fetcher.commitSpecificOffsetsToKafka(testCommitData); + fetcher.commitInternalOffsetsToKafka(testCommitData); } catch (Throwable t) { commitError.set(t); } @@ -255,7 +255,7 @@ public class Kafka010FetcherTest { // ----- trigger the first offset commit ----- - fetcher.commitSpecificOffsetsToKafka(testCommitData1); + fetcher.commitInternalOffsetsToKafka(testCommitData1); Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take(); for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) { @@ -272,7 +272,7 @@ public class Kafka010FetcherTest { // ----- trigger the second offset commit ----- - fetcher.commitSpecificOffsetsToKafka(testCommitData2); + fetcher.commitInternalOffsetsToKafka(testCommitData2); Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take(); for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) { @@ -297,4 +297,4 @@ public class Kafka010FetcherTest { throw new Exception("Exception in the fetcher", caughtError); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java index 28bf6d5..77407ff 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java @@ -131,6 +131,24 @@ public class Kafka010ITCase extends KafkaConsumerTestBase { runEndOfStreamTest(); } + // --- offset committing --- + + @Test(timeout = 60000) + public void testCommitOffsetsToKafka() throws Exception { + runCommitOffsetsToKafka(); + } + + @Test(timeout = 60000) + public void testStartFromKafkaCommitOffsets() throws Exception { + runStartFromKafkaCommitOffsets(); + } + + // TODO: This test will not pass until FLINK-4727 is resolved +// @Test(timeout = 60000) +// public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception { +// runAutoOffsetRetrievalAndCommitToKafka(); +// } + /** * Kafka 0.10 specific test, ensuring Timestamps are properly written to and read from Kafka */ http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 78fc1c6..7d12cde 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -20,7 +20,6 @@ package org.apache.flink.streaming.connectors.kafka; import kafka.admin.AdminUtils; import kafka.common.KafkaException; -import kafka.network.SocketServer; import kafka.server.KafkaConfig; import kafka.server.KafkaServer; import kafka.utils.SystemTime$; @@ -35,6 +34,9 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.util.NetUtils; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.kafka.common.requests.MetadataResponse; import org.slf4j.Logger; @@ -118,6 +120,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { } @Override + public KafkaOffsetHandler createOffsetHandler(Properties props) { + return new KafkaOffsetHandlerImpl(props); + } + + @Override public void restartBroker(int leaderId) throws Exception { brokers.set(leaderId, getKafkaServer(leaderId, tmpKafkaDirs.get(leaderId))); } @@ -213,11 +220,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { standardProps.setProperty("zookeeper.connect", zookeeperConnectionString); standardProps.setProperty("bootstrap.servers", brokerConnectionString); standardProps.setProperty("group.id", "flink-tests"); - standardProps.setProperty("auto.commit.enable", "false"); + standardProps.setProperty("enable.auto.commit", "false"); standardProps.setProperty("zookeeper.session.timeout.ms", String.valueOf(zkTimeout)); standardProps.setProperty("zookeeper.connection.timeout.ms", String.valueOf(zkTimeout)); standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. (earliest is kafka 0.10 value) - standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!) + standardProps.setProperty("max.partition.fetch.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!) } @Override @@ -381,4 +388,24 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { throw new Exception("Could not start Kafka after " + numTries + " retries due to port conflicts."); } + private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler { + + private final KafkaConsumer<byte[], byte[]> offsetClient; + + public KafkaOffsetHandlerImpl(Properties props) { + offsetClient = new KafkaConsumer<>(props); + } + + @Override + public Long getCommittedOffset(String topicName, int partition) { + OffsetAndMetadata committed = offsetClient.committed(new TopicPartition(topicName, partition)); + return (committed != null) ? committed.offset() : null; + } + + @Override + public void close() { + offsetClient.close(); + } + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java index aee3acc..5861058 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java @@ -140,11 +140,13 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> { } } - Map<KafkaTopicPartition, Long> zkOffsets = zookeeperOffsetHandler.getOffsets(partitionsWithNoOffset); + Map<KafkaTopicPartition, Long> zkOffsets = zookeeperOffsetHandler.getCommittedOffsets(partitionsWithNoOffset); for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) { - Long offset = zkOffsets.get(partition.getKafkaTopicPartition()); - if (offset != null) { - partition.setOffset(offset); + Long zkOffset = zkOffsets.get(partition.getKafkaTopicPartition()); + if (zkOffset != null) { + // the offset in ZK represents the "next record to process", so we need to subtract it by 1 + // to correctly represent our internally checkpointed offsets + partition.setOffset(zkOffset - 1); } } } @@ -324,10 +326,11 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> { // ------------------------------------------------------------------------ @Override - public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception { + public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception { ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler; if (zkHandler != null) { - zkHandler.writeOffsets(offsets); + // the ZK handler takes care of incrementing the offsets by 1 before committing + zkHandler.prepareAndCommitOffsets(offsets); } // Set committed offsets in topic partition state http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java index 6aaeca9..27d90f2 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java @@ -62,12 +62,12 @@ public class PeriodicOffsetCommitter extends Thread { Thread.sleep(commitInterval); // create copy a deep copy of the current offsets - HashMap<KafkaTopicPartition, Long> currentOffsets = new HashMap<>(partitionStates.length); + HashMap<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>(partitionStates.length); for (KafkaTopicPartitionState<?> partitionState : partitionStates) { - currentOffsets.put(partitionState.getKafkaTopicPartition(), partitionState.getOffset()); + offsetsToCommit.put(partitionState.getKafkaTopicPartition(), partitionState.getOffset()); } - offsetHandler.writeOffsets(currentOffsets); + offsetHandler.prepareAndCommitOffsets(offsetsToCommit); } } catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java index a1a81ed..8f2ef09 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java @@ -77,29 +77,30 @@ public class ZookeeperOffsetHandler { // ------------------------------------------------------------------------ /** - * Writes given set of offsets for Kafka partitions to ZooKeeper. + * Commits offsets for Kafka partitions to ZooKeeper. The given offsets to this method should be the offsets of + * the last processed records; this method will take care of incrementing the offsets by 1 before committing them so + * that the committed offsets to Zookeeper represent the next record to process. * - * @param offsetsToWrite The offsets for the partitions to write. + * @param internalOffsets The internal offsets (representing last processed records) for the partitions to commit. * @throws Exception The method forwards exceptions. */ - public void writeOffsets(Map<KafkaTopicPartition, Long> offsetsToWrite) throws Exception { - for (Map.Entry<KafkaTopicPartition, Long> entry : offsetsToWrite.entrySet()) { + public void prepareAndCommitOffsets(Map<KafkaTopicPartition, Long> internalOffsets) throws Exception { + for (Map.Entry<KafkaTopicPartition, Long> entry : internalOffsets.entrySet()) { KafkaTopicPartition tp = entry.getKey(); - long offset = entry.getValue(); - if (offset >= 0) { - setOffsetInZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition(), offset); + Long lastProcessedOffset = entry.getValue(); + if (lastProcessedOffset != null && lastProcessedOffset >= 0) { + setOffsetInZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition(), lastProcessedOffset + 1); } } } /** - * * @param partitions The partitions to read offsets for. * @return The mapping from partition to offset. * @throws Exception This method forwards exceptions. */ - public Map<KafkaTopicPartition, Long> getOffsets(List<KafkaTopicPartition> partitions) throws Exception { + public Map<KafkaTopicPartition, Long> getCommittedOffsets(List<KafkaTopicPartition> partitions) throws Exception { Map<KafkaTopicPartition, Long> ret = new HashMap<>(partitions.size()); for (KafkaTopicPartition tp : partitions) { Long offset = getOffsetFromZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition()); http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java index 1c69d78..fabb0fe 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java @@ -135,31 +135,21 @@ public class Kafka08ITCase extends KafkaConsumerTestBase { runBrokerFailureTest(); } - // --- special executions --- + // --- offset committing --- @Test(timeout = 60000) - public void testBigRecordJob() throws Exception { - runBigRecordTestTopology(); + public void testCommitOffsetsToZookeeper() throws Exception { + runCommitOffsetsToKafka(); } @Test(timeout = 60000) - public void testMultipleTopics() throws Exception { - runProduceConsumeMultipleTopics(); - } - - @Test(timeout = 60000) - public void testAllDeletes() throws Exception { - runAllDeletesTest(); - } - - @Test(timeout=60000) - public void testEndOfStream() throws Exception { - runEndOfStreamTest(); + public void testStartFromZookeeperCommitOffsets() throws Exception { + runStartFromKafkaCommitOffsets(); } @Test(timeout = 60000) - public void testMetrics() throws Throwable { - runMetricsTest(); + public void testAutoOffsetRetrievalAndCommitToZookeeper() throws Exception { + runAutoOffsetRetrievalAndCommitToKafka(); } @Test @@ -186,59 +176,6 @@ public class Kafka08ITCase extends KafkaConsumerTestBase { fail(e.getMessage()); } } - - /** - * Tests that offsets are properly committed to ZooKeeper and initial offsets are read from ZooKeeper. - * - * This test is only applicable if the Flink Kafka Consumer uses the ZooKeeperOffsetHandler. - */ - @Test(timeout = 60000) - public void testOffsetInZookeeper() throws Exception { - final int parallelism = 3; - - // write a sequence from 0 to 99 to each of the 3 partitions. - final String topicName = writeSequence("testOffsetInZK", 100, parallelism, 1); - - StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env1.getConfig().disableSysoutLogging(); - env1.enableCheckpointing(50); - env1.getConfig().setRestartStrategy(RestartStrategies.noRestart()); - env1.setParallelism(parallelism); - - StreamExecutionEnvironment env2 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env2.getConfig().disableSysoutLogging(); - env2.enableCheckpointing(50); - env2.getConfig().setRestartStrategy(RestartStrategies.noRestart()); - env2.setParallelism(parallelism); - - readSequence(env1, standardProps, parallelism, topicName, 100, 0); - - CuratorFramework curatorClient = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient(); - - Long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardProps.getProperty("group.id"), topicName, 0); - Long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardProps.getProperty("group.id"), topicName, 1); - Long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardProps.getProperty("group.id"), topicName, 2); - - LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3); - - assertTrue(o1 == null || (o1 >= 0 && o1 <= 100)); - assertTrue(o2 == null || (o2 >= 0 && o2 <= 100)); - assertTrue(o3 == null || (o3 >= 0 && o3 <= 100)); - - LOG.info("Manipulating offsets"); - - // set the offset to 50 for the three partitions - ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardProps.getProperty("group.id"), topicName, 0, 49); - ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardProps.getProperty("group.id"), topicName, 1, 49); - ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardProps.getProperty("group.id"), topicName, 2, 49); - - curatorClient.close(); - - // create new env - readSequence(env2, standardProps, parallelism, topicName, 50, 50); - - deleteTestTopic(topicName); - } @Test(timeout = 60000) public void testOffsetAutocommitTest() throws Exception { @@ -275,94 +212,37 @@ public class Kafka08ITCase extends KafkaConsumerTestBase { // ensure that the offset has been committed boolean atLeastOneOffsetSet = (o1 != null && o1 > 0 && o1 <= 100) || - (o2 != null && o2 > 0 && o2 <= 100) || - (o3 != null && o3 > 0 && o3 <= 100); + (o2 != null && o2 > 0 && o2 <= 100) || + (o3 != null && o3 > 0 && o3 <= 100); assertTrue("Expecting at least one offset to be set o1="+o1+" o2="+o2+" o3="+o3, atLeastOneOffsetSet); deleteTestTopic(topicName); } - /** - * This test ensures that when the consumers retrieve some start offset from kafka (earliest, latest), that this offset - * is committed to Zookeeper, even if some partitions are not read - * - * Test: - * - Create 3 topics - * - write 50 messages into each. - * - Start three consumers with auto.offset.reset='latest' and wait until they committed into ZK. - * - Check if the offsets in ZK are set to 50 for the three partitions - * - * See FLINK-3440 as well - */ - @Test(timeout = 60000) - public void testKafkaOffsetRetrievalToZookeeper() throws Exception { - final int parallelism = 3; + // --- special executions --- - // write a sequence from 0 to 49 to each of the 3 partitions. - final String topicName = writeSequence("testKafkaOffsetToZk", 50, parallelism, 1); + @Test(timeout = 60000) + public void testBigRecordJob() throws Exception { + runBigRecordTestTopology(); + } - final StreamExecutionEnvironment env2 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env2.getConfig().disableSysoutLogging(); - env2.getConfig().setRestartStrategy(RestartStrategies.noRestart()); - env2.setParallelism(parallelism); - env2.enableCheckpointing(200); + @Test(timeout = 60000) + public void testMultipleTopics() throws Exception { + runProduceConsumeMultipleTopics(); + } - Properties readProps = new Properties(); - readProps.putAll(standardProps); - readProps.setProperty("auto.offset.reset", "latest"); - - DataStream<String> stream = env2.addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), readProps)); - stream.addSink(new DiscardingSink<String>()); - - final AtomicReference<Throwable> errorRef = new AtomicReference<>(); - final Thread runner = new Thread("runner") { - @Override - public void run() { - try { - env2.execute(); - } - catch (Throwable t) { - if (!(t.getCause() instanceof JobCancellationException)) { - errorRef.set(t); - } - } - } - }; - runner.start(); - - final CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient(); - final Long l49 = 49L; - - final long deadline = 30000 + System.currentTimeMillis(); - do { - Long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 0); - Long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 1); - Long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 2); - - if (l49.equals(o1) && l49.equals(o2) && l49.equals(o3)) { - break; - } - - Thread.sleep(100); - } - while (System.currentTimeMillis() < deadline); - - // cancel the job - JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout)); - - final Throwable t = errorRef.get(); - if (t != null) { - throw new RuntimeException("Job failed with an exception", t); - } + @Test(timeout = 60000) + public void testAllDeletes() throws Exception { + runAllDeletesTest(); + } - // check if offsets are correctly in ZK - Long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 0); - Long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 1); - Long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 2); - Assert.assertEquals(Long.valueOf(49L), o1); - Assert.assertEquals(Long.valueOf(49L), o2); - Assert.assertEquals(Long.valueOf(49L), o3); + @Test(timeout=60000) + public void testEndOfStream() throws Exception { + runEndOfStreamTest(); + } - curatorFramework.close(); + @Test(timeout = 60000) + public void testMetrics() throws Throwable { + runMetricsTest(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index a0d5002..567d22d 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -35,6 +35,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader; +import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler; import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; @@ -111,6 +112,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { } @Override + public KafkaOffsetHandler createOffsetHandler(Properties props) { + return new KafkaOffsetHandlerImpl(props); + } + + @Override public void restartBroker(int leaderId) throws Exception { brokers.set(leaderId, getKafkaServer(leaderId, tmpKafkaDirs.get(leaderId))); } @@ -351,4 +357,29 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { throw new Exception("Could not start Kafka after " + numTries + " retries due to port conflicts."); } + private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler { + + private final CuratorFramework offsetClient; + private final String groupId; + + public KafkaOffsetHandlerImpl(Properties props) { + offsetClient = createCuratorClient(); + groupId = props.getProperty("group.id"); + } + + @Override + public Long getCommittedOffset(String topicName, int partition) { + try { + return ZookeeperOffsetHandler.getOffsetFromZooKeeper(offsetClient, groupId, topicName, partition); + } catch (Exception e) { + throw new RuntimeException("Exception when getting offsets from Zookeeper", e); + } + } + + @Override + public void close() { + offsetClient.close(); + } + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java index 37e40fc..3a3d3de 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java @@ -300,7 +300,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem } @Override - public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception { + public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception { KafkaTopicPartitionState<TopicPartition>[] partitions = subscribedPartitions(); Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(partitions.length); http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java index 5a638b2..c5cf0cc 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java @@ -143,7 +143,7 @@ public class Kafka09FetcherTest { @Override public void run() { try { - fetcher.commitSpecificOffsetsToKafka(testCommitData); + fetcher.commitInternalOffsetsToKafka(testCommitData); } catch (Throwable t) { commitError.set(t); } @@ -259,7 +259,7 @@ public class Kafka09FetcherTest { // ----- trigger the first offset commit ----- - fetcher.commitSpecificOffsetsToKafka(testCommitData1); + fetcher.commitInternalOffsetsToKafka(testCommitData1); Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take(); for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) { @@ -270,13 +270,13 @@ public class Kafka09FetcherTest { } else if (partition.topic().equals("another")) { assertEquals(99, partition.partition()); - assertEquals(18L, entry.getValue().offset()); + assertEquals(17L, entry.getValue().offset()); } } // ----- trigger the second offset commit ----- - fetcher.commitSpecificOffsetsToKafka(testCommitData2); + fetcher.commitInternalOffsetsToKafka(testCommitData2); Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take(); for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) { @@ -287,7 +287,7 @@ public class Kafka09FetcherTest { } else if (partition.topic().equals("another")) { assertEquals(99, partition.partition()); - assertEquals(28L, entry.getValue().offset()); + assertEquals(27L, entry.getValue().offset()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java index b9ec18a..3d347dc 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java @@ -109,4 +109,22 @@ public class Kafka09ITCase extends KafkaConsumerTestBase { public void testMetrics() throws Throwable { runMetricsTest(); } + + // --- offset committing --- + + @Test(timeout = 60000) + public void testCommitOffsetsToKafka() throws Exception { + runCommitOffsetsToKafka(); + } + + @Test(timeout = 60000) + public void testStartFromKafkaCommitOffsets() throws Exception { + runStartFromKafkaCommitOffsets(); + } + + // TODO: This test will not pass until FLINK-4727 is resolved +// @Test(timeout = 60000) +// public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception { +// runAutoOffsetRetrievalAndCommitToKafka(); +// } } http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 9d8fa9a..223dacb 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -36,6 +36,9 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.util.NetUtils; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.SecurityProtocol; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -103,6 +106,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { } @Override + public KafkaOffsetHandler createOffsetHandler(Properties props) { + return new KafkaOffsetHandlerImpl(props); + } + + @Override public void restartBroker(int leaderId) throws Exception { brokers.set(leaderId, getKafkaServer(leaderId, tmpKafkaDirs.get(leaderId))); } @@ -203,11 +211,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { standardProps.setProperty("zookeeper.connect", zookeeperConnectionString); standardProps.setProperty("bootstrap.servers", brokerConnectionString); standardProps.setProperty("group.id", "flink-tests"); - standardProps.setProperty("auto.commit.enable", "false"); + standardProps.setProperty("enable.auto.commit", "false"); standardProps.setProperty("zookeeper.session.timeout.ms", zkTimeout); standardProps.setProperty("zookeeper.connection.timeout.ms", zkTimeout); standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. (earliest is kafka 0.9 value) - standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!) + standardProps.setProperty("max.partition.fetch.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!) } @@ -396,4 +404,24 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { return prop; } + private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler { + + private final KafkaConsumer<byte[], byte[]> offsetClient; + + public KafkaOffsetHandlerImpl(Properties props) { + offsetClient = new KafkaConsumer<>(props); + } + + @Override + public Long getCommittedOffset(String topicName, int partition) { + OffsetAndMetadata committed = offsetClient.committed(new TopicPartition(topicName, partition)); + return (committed != null) ? committed.offset() : null; + } + + @Override + public void close() { + offsetClient.close(); + } + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java index 05028e6..7d6bd76 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java @@ -103,8 +103,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti // runtime state (used individually by each parallel subtask) // ------------------------------------------------------------------------ - /** Data for pending but uncommitted checkpoints */ - private final LinkedMap pendingCheckpoints = new LinkedMap(); + /** Data for pending but uncommitted offsets */ + private final LinkedMap pendingOffsetsToCommit = new LinkedMap(); /** The fetcher implements the connections to the Kafka brokers */ private transient volatile AbstractFetcher<T, ?> kafkaFetcher; @@ -347,12 +347,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti if (restoreToOffset != null) { // the map cannot be asynchronously updated, because only one checkpoint call can happen // on this function at a time: either snapshotState() or notifyCheckpointComplete() - pendingCheckpoints.put(checkpointId, restoreToOffset); - - // truncate the map, to prevent infinite growth - while (pendingCheckpoints.size() > MAX_NUM_PENDING_CHECKPOINTS) { - pendingCheckpoints.remove(0); - } + pendingOffsetsToCommit.put(checkpointId, restoreToOffset); for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : restoreToOffset.entrySet()) { listState.add(Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue())); @@ -367,17 +362,17 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti // the map cannot be asynchronously updated, because only one checkpoint call can happen // on this function at a time: either snapshotState() or notifyCheckpointComplete() - pendingCheckpoints.put(checkpointId, currentOffsets); - - // truncate the map, to prevent infinite growth - while (pendingCheckpoints.size() > MAX_NUM_PENDING_CHECKPOINTS) { - pendingCheckpoints.remove(0); - } + pendingOffsetsToCommit.put(checkpointId, currentOffsets); for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) { listState.add(Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue())); } } + + // truncate the map of pending offsets to commit, to prevent infinite growth + while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) { + pendingOffsetsToCommit.remove(0); + } } } @@ -400,26 +395,26 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti } try { - final int posInMap = pendingCheckpoints.indexOf(checkpointId); + final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId); if (posInMap == -1) { LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId); return; } @SuppressWarnings("unchecked") - HashMap<KafkaTopicPartition, Long> checkpointOffsets = - (HashMap<KafkaTopicPartition, Long>) pendingCheckpoints.remove(posInMap); + HashMap<KafkaTopicPartition, Long> offsets = + (HashMap<KafkaTopicPartition, Long>) pendingOffsetsToCommit.remove(posInMap); // remove older checkpoints in map for (int i = 0; i < posInMap; i++) { - pendingCheckpoints.remove(0); + pendingOffsetsToCommit.remove(0); } - if (checkpointOffsets == null || checkpointOffsets.size() == 0) { + if (offsets == null || offsets.size() == 0) { LOG.debug("Checkpoint state was empty."); return; } - fetcher.commitSpecificOffsetsToKafka(checkpointOffsets); + fetcher.commitInternalOffsetsToKafka(offsets); } catch (Exception e) { if (running) { http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java index 7ee3079..eb01b78 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java @@ -159,12 +159,15 @@ public abstract class AbstractFetcher<T, KPH> { /** * Commits the given partition offsets to the Kafka brokers (or to ZooKeeper for - * older Kafka versions). + * older Kafka versions). The given offsets are the internal checkpointed offsets, representing + * the last processed record of each partition. Version-specific implementations of this method + * need to hold the contract that the given offsets must be incremented by 1 before + * committing them, so that committed offsets to Kafka represent "the next record to process". * - * @param offsets The offsets to commit to Kafka. + * @param offsets The offsets to commit to Kafka (implementations must increment offsets by 1 before committing). * @throws Exception This method forwards exceptions. */ - public abstract void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception; + public abstract void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception; // ------------------------------------------------------------------------ // snapshot and restore the state http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java index 6d2dc70..97220c2 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java @@ -161,9 +161,17 @@ public class FlinkKafkaConsumerBaseTest { assertFalse(listState.get().iterator().hasNext()); } + /** + * Tests that on snapshots, states and offsets to commit to Kafka are correct + */ @Test @SuppressWarnings("unchecked") public void testSnapshotState() throws Exception { + + // -------------------------------------------------------------------- + // prepare fake states + // -------------------------------------------------------------------- + final HashMap<KafkaTopicPartition, Long> state1 = new HashMap<>(); state1.put(new KafkaTopicPartition("abc", 13), 16768L); state1.put(new KafkaTopicPartition("def", 7), 987654321L); @@ -176,13 +184,15 @@ public class FlinkKafkaConsumerBaseTest { state3.put(new KafkaTopicPartition("abc", 13), 16780L); state3.put(new KafkaTopicPartition("def", 7), 987654377L); + // -------------------------------------------------------------------- + final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class); when(fetcher.snapshotCurrentState()).thenReturn(state1, state2, state3); - - final LinkedMap pendingCheckpoints = new LinkedMap(); - - FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, pendingCheckpoints, true); - assertEquals(0, pendingCheckpoints.size()); + + final LinkedMap pendingOffsetsToCommit = new LinkedMap(); + + FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, pendingOffsetsToCommit, true); + assertEquals(0, pendingOffsetsToCommit.size()); OperatorStateStore backend = mock(OperatorStateStore.class); @@ -207,8 +217,8 @@ public class FlinkKafkaConsumerBaseTest { } assertEquals(state1, snapshot1); - assertEquals(1, pendingCheckpoints.size()); - assertEquals(state1, pendingCheckpoints.get(138L)); + assertEquals(1, pendingOffsetsToCommit.size()); + assertEquals(state1, pendingOffsetsToCommit.get(138L)); // checkpoint 2 consumer.prepareSnapshot(140L, 140L); @@ -221,13 +231,13 @@ public class FlinkKafkaConsumerBaseTest { } assertEquals(state2, snapshot2); - assertEquals(2, pendingCheckpoints.size()); - assertEquals(state2, pendingCheckpoints.get(140L)); - + assertEquals(2, pendingOffsetsToCommit.size()); + assertEquals(state2, pendingOffsetsToCommit.get(140L)); + // ack checkpoint 1 consumer.notifyCheckpointComplete(138L); - assertEquals(1, pendingCheckpoints.size()); - assertTrue(pendingCheckpoints.containsKey(140L)); + assertEquals(1, pendingOffsetsToCommit.size()); + assertTrue(pendingOffsetsToCommit.containsKey(140L)); // checkpoint 3 consumer.prepareSnapshot(141L, 141L); @@ -240,16 +250,16 @@ public class FlinkKafkaConsumerBaseTest { } assertEquals(state3, snapshot3); - assertEquals(2, pendingCheckpoints.size()); - assertEquals(state3, pendingCheckpoints.get(141L)); - + assertEquals(2, pendingOffsetsToCommit.size()); + assertEquals(state3, pendingOffsetsToCommit.get(141L)); + // ack checkpoint 3, subsumes number 2 consumer.notifyCheckpointComplete(141L); - assertEquals(0, pendingCheckpoints.size()); + assertEquals(0, pendingOffsetsToCommit.size()); consumer.notifyCheckpointComplete(666); // invalid checkpoint - assertEquals(0, pendingCheckpoints.size()); + assertEquals(0, pendingOffsetsToCommit.size()); OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); TestingListState<Tuple2<KafkaTopicPartition, Long>> listState = new TestingListState<>(); @@ -260,24 +270,24 @@ public class FlinkKafkaConsumerBaseTest { consumer.prepareSnapshot(i, i); listState.clear(); } - assertEquals(FlinkKafkaConsumerBase.MAX_NUM_PENDING_CHECKPOINTS, pendingCheckpoints.size()); + assertEquals(FlinkKafkaConsumerBase.MAX_NUM_PENDING_CHECKPOINTS, pendingOffsetsToCommit.size()); // commit only the second last consumer.notifyCheckpointComplete(598); - assertEquals(1, pendingCheckpoints.size()); + assertEquals(1, pendingOffsetsToCommit.size()); // access invalid checkpoint consumer.notifyCheckpointComplete(590); // and the last consumer.notifyCheckpointComplete(599); - assertEquals(0, pendingCheckpoints.size()); + assertEquals(0, pendingOffsetsToCommit.size()); } // ------------------------------------------------------------------------ private static <T> FlinkKafkaConsumerBase<T> getConsumer( - AbstractFetcher<T, ?> fetcher, LinkedMap pendingCheckpoints, boolean running) throws Exception + AbstractFetcher<T, ?> fetcher, LinkedMap pendingOffsetsToCommit, boolean running) throws Exception { FlinkKafkaConsumerBase<T> consumer = new DummyFlinkKafkaConsumer<>(); @@ -285,9 +295,9 @@ public class FlinkKafkaConsumerBaseTest { fetcherField.setAccessible(true); fetcherField.set(consumer, fetcher); - Field mapField = FlinkKafkaConsumerBase.class.getDeclaredField("pendingCheckpoints"); + Field mapField = FlinkKafkaConsumerBase.class.getDeclaredField("pendingOffsetsToCommit"); mapField.setAccessible(true); - mapField.set(consumer, pendingCheckpoints); + mapField.set(consumer, pendingOffsetsToCommit); Field runningField = FlinkKafkaConsumerBase.class.getDeclaredField("running"); runningField.setAccessible(true); http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index 0810a3e..7b06cfd 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -29,6 +29,7 @@ import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; @@ -97,7 +98,6 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.lang.management.ManagementFactory; import java.util.ArrayList; -import java.util.Arrays; import java.util.BitSet; import java.util.Collections; import java.util.Date; @@ -201,6 +201,249 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { } } } + + /** + * Ensures that the committed offsets to Kafka are the offsets of "the next record to process" + */ + public void runCommitOffsetsToKafka() throws Exception { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testCommitOffsetsToKafkaTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + env.setParallelism(parallelism); + env.enableCheckpointing(200); + + DataStream<String> stream = env.addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), standardProps)); + stream.addSink(new DiscardingSink<String>()); + + final AtomicReference<Throwable> errorRef = new AtomicReference<>(); + final Thread runner = new Thread("runner") { + @Override + public void run() { + try { + env.execute(); + } + catch (Throwable t) { + if (!(t.getCause() instanceof JobCancellationException)) { + errorRef.set(t); + } + } + } + }; + runner.start(); + + final Long l50 = 50L; // the final committed offset in Kafka should be 50 + final long deadline = 30000 + System.currentTimeMillis(); + + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + + do { + Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0); + Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1); + Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2); + + if (l50.equals(o1) && l50.equals(o2) && l50.equals(o3)) { + break; + } + + Thread.sleep(100); + } + while (System.currentTimeMillis() < deadline); + + // cancel the job + JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout)); + + final Throwable t = errorRef.get(); + if (t != null) { + throw new RuntimeException("Job failed with an exception", t); + } + + // final check to see if offsets are correctly in Kafka + Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0); + Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1); + Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2); + Assert.assertEquals(Long.valueOf(50L), o1); + Assert.assertEquals(Long.valueOf(50L), o2); + Assert.assertEquals(Long.valueOf(50L), o3); + + kafkaOffsetHandler.close(); + deleteTestTopic(topicName); + } + + /** + * This test first writes a total of 300 records to a test topic, reads the first 150 so that some offsets are + * committed to Kafka, and then startup the consumer again to read the remaining records starting from the committed offsets. + * The test ensures that whatever offsets were committed to Kafka, the consumer correctly picks them up + * and starts at the correct position. + */ + public void runStartFromKafkaCommitOffsets() throws Exception { + final int parallelism = 3; + final int recordsInEachPartition = 300; + + final String topicName = writeSequence("testStartFromKafkaCommitOffsetsTopic", recordsInEachPartition, parallelism, 1); + + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + + Long o1; + Long o2; + Long o3; + int attempt = 0; + // make sure that o1, o2, o3 are not all null before proceeding + do { + attempt++; + LOG.info("Attempt " + attempt + " to read records and commit some offsets to Kafka"); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + env.setParallelism(parallelism); + env.enableCheckpointing(20); // fast checkpoints to make sure we commit some offsets + + env + .addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), standardProps)) + .map(new ThrottledMapper<String>(50)) + .map(new MapFunction<String, Object>() { + int count = 0; + @Override + public Object map(String value) throws Exception { + count++; + if (count == 150) { + throw new SuccessException(); + } + return null; + } + }) + .addSink(new DiscardingSink<>()); + + tryExecute(env, "Read some records to commit offsets to Kafka"); + + o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0); + o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1); + o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2); + } while (o1 == null && o2 == null && o3 == null && attempt < 3); + + if (o1 == null && o2 == null && o3 == null) { + throw new RuntimeException("No offsets have been committed after 3 attempts"); + } + + LOG.info("Got final committed offsets from Kafka o1={}, o2={}, o3={}", o1, o2, o3); + + final StreamExecutionEnvironment env2 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env2.getConfig().disableSysoutLogging(); + env2.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + env2.setParallelism(parallelism); + + // whatever offsets were committed for each partition, the consumer should pick + // them up and start from the correct position so that the remaining records are all read + HashMap<Integer, Tuple2<Integer, Integer>> partitionsToValuesCountAndStartOffset = new HashMap<>(); + partitionsToValuesCountAndStartOffset.put(0, new Tuple2<>( + (o1 != null) ? (int) (recordsInEachPartition - o1) : recordsInEachPartition, + (o1 != null) ? o1.intValue() : 0 + )); + partitionsToValuesCountAndStartOffset.put(1, new Tuple2<>( + (o2 != null) ? (int) (recordsInEachPartition - o2) : recordsInEachPartition, + (o2 != null) ? o2.intValue() : 0 + )); + partitionsToValuesCountAndStartOffset.put(2, new Tuple2<>( + (o3 != null) ? (int) (recordsInEachPartition - o3) : recordsInEachPartition, + (o3 != null) ? o3.intValue() : 0 + )); + + readSequence(env2, standardProps, topicName, partitionsToValuesCountAndStartOffset); + + kafkaOffsetHandler.close(); + deleteTestTopic(topicName); + } + + /** + * This test ensures that when the consumers retrieve some start offset from kafka (earliest, latest), that this offset + * is committed to Kafka, even if some partitions are not read. + * + * Test: + * - Create 3 partitions + * - write 50 messages into each. + * - Start three consumers with auto.offset.reset='latest' and wait until they committed into Kafka. + * - Check if the offsets in Kafka are set to 50 for the three partitions + * + * See FLINK-3440 as well + */ + public void runAutoOffsetRetrievalAndCommitToKafka() throws Exception { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testAutoOffsetRetrievalAndCommitToKafkaTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + env.setParallelism(parallelism); + env.enableCheckpointing(200); + + Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "latest"); // set to reset to latest, so that partitions are initially not read + + DataStream<String> stream = env.addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), readProps)); + stream.addSink(new DiscardingSink<String>()); + + final AtomicReference<Throwable> errorRef = new AtomicReference<>(); + final Thread runner = new Thread("runner") { + @Override + public void run() { + try { + env.execute(); + } + catch (Throwable t) { + if (!(t.getCause() instanceof JobCancellationException)) { + errorRef.set(t); + } + } + } + }; + runner.start(); + + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + + final Long l50 = 50L; // the final committed offset in Kafka should be 50 + final long deadline = 30000 + System.currentTimeMillis(); + do { + Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0); + Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1); + Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2); + + if (l50.equals(o1) && l50.equals(o2) && l50.equals(o3)) { + break; + } + + Thread.sleep(100); + } + while (System.currentTimeMillis() < deadline); + + // cancel the job + JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout)); + + final Throwable t = errorRef.get(); + if (t != null) { + throw new RuntimeException("Job failed with an exception", t); + } + + // final check to see if offsets are correctly in Kafka + Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0); + Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1); + Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2); + Assert.assertEquals(Long.valueOf(50L), o1); + Assert.assertEquals(Long.valueOf(50L), o2); + Assert.assertEquals(Long.valueOf(50L), o3); + + kafkaOffsetHandler.close(); + deleteTestTopic(topicName); + } /** * Ensure Kafka is working on both producer and consumer side. @@ -1346,48 +1589,80 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { // Reading writing test data sets // ------------------------------------------------------------------------ + /** + * Runs a job using the provided environment to read a sequence of records from a single Kafka topic. + * The method allows to individually specify the expected starting offset and total read value count of each partition. + * The job will be considered successful only if all partition read results match the start offset and value count criteria. + */ protected void readSequence(StreamExecutionEnvironment env, Properties cc, - final int sourceParallelism, final String topicName, - final int valuesCount, final int startFrom) throws Exception { + final Map<Integer, Tuple2<Integer, Integer>> partitionsToValuesCountAndStartOffset) throws Exception { + final int sourceParallelism = partitionsToValuesCountAndStartOffset.keySet().size(); - final int finalCount = valuesCount * sourceParallelism; + int finalCountTmp = 0; + for (Map.Entry<Integer, Tuple2<Integer, Integer>> valuesCountAndStartOffset : partitionsToValuesCountAndStartOffset.entrySet()) { + finalCountTmp += valuesCountAndStartOffset.getValue().f0; + } + final int finalCount = finalCountTmp; final TypeInformation<Tuple2<Integer, Integer>> intIntTupleType = TypeInfoParser.parse("Tuple2<Integer, Integer>"); final TypeInformationSerializationSchema<Tuple2<Integer, Integer>> deser = - new TypeInformationSerializationSchema<>(intIntTupleType, env.getConfig()); + new TypeInformationSerializationSchema<>(intIntTupleType, env.getConfig()); // create the consumer cc.putAll(secureProps); FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> consumer = kafkaServer.getConsumer(topicName, deser, cc); DataStream<Tuple2<Integer, Integer>> source = env - .addSource(consumer).setParallelism(sourceParallelism) - .map(new ThrottledMapper<Tuple2<Integer, Integer>>(20)).setParallelism(sourceParallelism); + .addSource(consumer).setParallelism(sourceParallelism) + .map(new ThrottledMapper<Tuple2<Integer, Integer>>(20)).setParallelism(sourceParallelism); // verify data source.flatMap(new RichFlatMapFunction<Tuple2<Integer, Integer>, Integer>() { - private int[] values = new int[valuesCount]; + private HashMap<Integer, BitSet> partitionsToValueCheck; private int count = 0; @Override + public void open(Configuration parameters) throws Exception { + partitionsToValueCheck = new HashMap<>(); + for (Integer partition : partitionsToValuesCountAndStartOffset.keySet()) { + partitionsToValueCheck.put(partition, new BitSet()); + } + } + + @Override public void flatMap(Tuple2<Integer, Integer> value, Collector<Integer> out) throws Exception { - values[value.f1 - startFrom]++; + int partition = value.f0; + int val = value.f1; + + BitSet bitSet = partitionsToValueCheck.get(partition); + if (bitSet == null) { + throw new RuntimeException("Got a record from an unknown partition"); + } else { + bitSet.set(val - partitionsToValuesCountAndStartOffset.get(partition).f1); + } + count++; + LOG.info("Received message {}, total {} messages", value, count); // verify if we've seen everything if (count == finalCount) { - for (int i = 0; i < values.length; i++) { - int v = values[i]; - if (v != sourceParallelism) { - printTopic(topicName, valuesCount, deser); - throw new RuntimeException("Expected v to be " + sourceParallelism + - ", but was " + v + " on element " + i + " array=" + Arrays.toString(values)); + for (Map.Entry<Integer, BitSet> partitionsToValueCheck : this.partitionsToValueCheck.entrySet()) { + BitSet check = partitionsToValueCheck.getValue(); + int expectedValueCount = partitionsToValuesCountAndStartOffset.get(partitionsToValueCheck.getKey()).f0; + + if (check.cardinality() != expectedValueCount) { + throw new RuntimeException("Expected cardinality to be " + expectedValueCount + + ", but was " + check.cardinality()); + } else if (check.nextClearBit(0) != expectedValueCount) { + throw new RuntimeException("Expected next clear bit to be " + expectedValueCount + + ", but was " + check.cardinality()); } } + // test has passed throw new SuccessException(); } @@ -1400,6 +1675,21 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { LOG.info("Successfully read sequence for verification"); } + /** + * Variant of {@link KafkaConsumerTestBase#readSequence(StreamExecutionEnvironment, Properties, String, Map)} to + * expect reading from the same start offset and the same value count for all partitions of a single Kafka topic. + */ + protected void readSequence(StreamExecutionEnvironment env, Properties cc, + final int sourceParallelism, + final String topicName, + final int valuesCount, final int startFrom) throws Exception { + HashMap<Integer, Tuple2<Integer, Integer>> partitionsToValuesCountAndStartOffset = new HashMap<>(); + for (int i = 0; i < sourceParallelism; i++) { + partitionsToValuesCountAndStartOffset.put(i, new Tuple2<>(valuesCount, startFrom)); + } + readSequence(env, cc, topicName, partitionsToValuesCountAndStartOffset); + } + protected String writeSequence( String baseTopicName, final int numElements, @@ -1430,7 +1720,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { LOG.info("Writing attempt #1"); // -------- Write the Sequence -------- - + createTestTopic(topicName, parallelism, replicationFactor); StreamExecutionEnvironment writeEnv = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java index ded1fde..806d342 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java @@ -29,6 +29,8 @@ import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.Properties; /** @@ -83,6 +85,14 @@ public abstract class KafkaTestEnvironment { KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner); + // -- offset handlers + + public interface KafkaOffsetHandler { + Long getCommittedOffset(String topicName, int partition); + void close(); + } + + public abstract KafkaOffsetHandler createOffsetHandler(Properties props); // -- leader failure simulation http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java index 582311f..7db6ba4 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java @@ -225,7 +225,7 @@ public class AbstractFetcherTimestampsTest { } @Override - public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception { + public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception { throw new UnsupportedOperationException(); } }