Repository: flink
Updated Branches:
  refs/heads/master 069de27df -> f46ca3918


[FLINK-4723] [kafka] Unify committed offsets to Kafka to be next record to 
process

This closes #2580


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f46ca391
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f46ca391
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f46ca391

Branch: refs/heads/master
Commit: f46ca39188dce1764ee6615eb6697588fdc04a2a
Parents: 069de27
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Authored: Sun Oct 2 16:54:57 2016 +0800
Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Committed: Tue Oct 18 11:44:30 2016 +0800

----------------------------------------------------------------------
 .../connectors/kafka/Kafka010FetcherTest.java   |   8 +-
 .../connectors/kafka/Kafka010ITCase.java        |  18 ++
 .../kafka/KafkaTestEnvironmentImpl.java         |  33 +-
 .../kafka/internals/Kafka08Fetcher.java         |  15 +-
 .../internals/PeriodicOffsetCommitter.java      |   6 +-
 .../kafka/internals/ZookeeperOffsetHandler.java |  19 +-
 .../connectors/kafka/Kafka08ITCase.java         | 178 ++--------
 .../kafka/KafkaTestEnvironmentImpl.java         |  31 ++
 .../kafka/internal/Kafka09Fetcher.java          |   2 +-
 .../connectors/kafka/Kafka09FetcherTest.java    |  10 +-
 .../connectors/kafka/Kafka09ITCase.java         |  18 ++
 .../kafka/KafkaTestEnvironmentImpl.java         |  32 +-
 .../kafka/FlinkKafkaConsumerBase.java           |  35 +-
 .../kafka/internals/AbstractFetcher.java        |   9 +-
 .../kafka/FlinkKafkaConsumerBaseTest.java       |  56 ++--
 .../connectors/kafka/KafkaConsumerTestBase.java | 322 ++++++++++++++++++-
 .../connectors/kafka/KafkaTestEnvironment.java  |  10 +
 .../AbstractFetcherTimestampsTest.java          |   2 +-
 18 files changed, 559 insertions(+), 245 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
index 8f0b170..76e3950 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
@@ -143,7 +143,7 @@ public class Kafka010FetcherTest {
             @Override
             public void run() {
                 try {
-                    fetcher.commitSpecificOffsetsToKafka(testCommitData);
+                    fetcher.commitInternalOffsetsToKafka(testCommitData);
                 } catch (Throwable t) {
                     commitError.set(t);
                 }
@@ -255,7 +255,7 @@ public class Kafka010FetcherTest {
 
         // ----- trigger the first offset commit -----
 
-        fetcher.commitSpecificOffsetsToKafka(testCommitData1);
+        fetcher.commitInternalOffsetsToKafka(testCommitData1);
         Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
 
         for (Entry<TopicPartition, OffsetAndMetadata> entry : 
result1.entrySet()) {
@@ -272,7 +272,7 @@ public class Kafka010FetcherTest {
 
         // ----- trigger the second offset commit -----
 
-        fetcher.commitSpecificOffsetsToKafka(testCommitData2);
+        fetcher.commitInternalOffsetsToKafka(testCommitData2);
         Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
 
         for (Entry<TopicPartition, OffsetAndMetadata> entry : 
result2.entrySet()) {
@@ -297,4 +297,4 @@ public class Kafka010FetcherTest {
             throw new Exception("Exception in the fetcher", caughtError);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
index 28bf6d5..77407ff 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
@@ -131,6 +131,24 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
                runEndOfStreamTest();
        }
 
+       // --- offset committing ---
+
+       @Test(timeout = 60000)
+       public void testCommitOffsetsToKafka() throws Exception {
+               runCommitOffsetsToKafka();
+       }
+
+       @Test(timeout = 60000)
+       public void testStartFromKafkaCommitOffsets() throws Exception {
+               runStartFromKafkaCommitOffsets();
+       }
+
+       // TODO: This test will not pass until FLINK-4727 is resolved
+//     @Test(timeout = 60000)
+//     public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
+//             runAutoOffsetRetrievalAndCommitToKafka();
+//     }
+
        /**
         * Kafka 0.10 specific test, ensuring Timestamps are properly written 
to and read from Kafka
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 78fc1c6..7d12cde 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import kafka.admin.AdminUtils;
 import kafka.common.KafkaException;
-import kafka.network.SocketServer;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
 import kafka.utils.SystemTime$;
@@ -35,6 +34,9 @@ import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.util.NetUtils;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.slf4j.Logger;
@@ -118,6 +120,11 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
        }
 
        @Override
+       public KafkaOffsetHandler createOffsetHandler(Properties props) {
+               return new KafkaOffsetHandlerImpl(props);
+       }
+
+       @Override
        public void restartBroker(int leaderId) throws Exception {
                brokers.set(leaderId, getKafkaServer(leaderId, 
tmpKafkaDirs.get(leaderId)));
        }
@@ -213,11 +220,11 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                standardProps.setProperty("zookeeper.connect", 
zookeeperConnectionString);
                standardProps.setProperty("bootstrap.servers", 
brokerConnectionString);
                standardProps.setProperty("group.id", "flink-tests");
-               standardProps.setProperty("auto.commit.enable", "false");
+               standardProps.setProperty("enable.auto.commit", "false");
                standardProps.setProperty("zookeeper.session.timeout.ms", 
String.valueOf(zkTimeout));
                standardProps.setProperty("zookeeper.connection.timeout.ms", 
String.valueOf(zkTimeout));
                standardProps.setProperty("auto.offset.reset", "earliest"); // 
read from the beginning. (earliest is kafka 0.10 value)
-               standardProps.setProperty("fetch.message.max.bytes", "256"); // 
make a lot of fetches (MESSAGES MUST BE SMALLER!)
+               standardProps.setProperty("max.partition.fetch.bytes", "256"); 
// make a lot of fetches (MESSAGES MUST BE SMALLER!)
        }
 
        @Override
@@ -381,4 +388,24 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                throw new Exception("Could not start Kafka after " + numTries + 
" retries due to port conflicts.");
        }
 
+       private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler {
+
+               private final KafkaConsumer<byte[], byte[]> offsetClient;
+
+               public KafkaOffsetHandlerImpl(Properties props) {
+                       offsetClient = new KafkaConsumer<>(props);
+               }
+
+               @Override
+               public Long getCommittedOffset(String topicName, int partition) 
{
+                       OffsetAndMetadata committed = 
offsetClient.committed(new TopicPartition(topicName, partition));
+                       return (committed != null) ? committed.offset() : null;
+               }
+
+               @Override
+               public void close() {
+                       offsetClient.close();
+               }
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
index aee3acc..5861058 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
@@ -140,11 +140,13 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, 
TopicAndPartition> {
                                        }
                                }
 
-                               Map<KafkaTopicPartition, Long> zkOffsets = 
zookeeperOffsetHandler.getOffsets(partitionsWithNoOffset);
+                               Map<KafkaTopicPartition, Long> zkOffsets = 
zookeeperOffsetHandler.getCommittedOffsets(partitionsWithNoOffset);
                                for 
(KafkaTopicPartitionState<TopicAndPartition> partition : 
subscribedPartitions()) {
-                                       Long offset = 
zkOffsets.get(partition.getKafkaTopicPartition());
-                                       if (offset != null) {
-                                               partition.setOffset(offset);
+                                       Long zkOffset = 
zkOffsets.get(partition.getKafkaTopicPartition());
+                                       if (zkOffset != null) {
+                                               // the offset in ZK represents 
the "next record to process", so we need to subtract it by 1
+                                               // to correctly represent our 
internally checkpointed offsets
+                                               partition.setOffset(zkOffset - 
1);
                                        }
                                }
                        }
@@ -324,10 +326,11 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, 
TopicAndPartition> {
        // 
------------------------------------------------------------------------
 
        @Override
-       public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> 
offsets) throws Exception {
+       public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> 
offsets) throws Exception {
                ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
                if (zkHandler != null) {
-                       zkHandler.writeOffsets(offsets);
+                       // the ZK handler takes care of incrementing the 
offsets by 1 before committing
+                       zkHandler.prepareAndCommitOffsets(offsets);
                }
 
                // Set committed offsets in topic partition state

http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
index 6aaeca9..27d90f2 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
@@ -62,12 +62,12 @@ public class PeriodicOffsetCommitter extends Thread {
                                Thread.sleep(commitInterval);
 
                                // create copy a deep copy of the current 
offsets
-                               HashMap<KafkaTopicPartition, Long> 
currentOffsets = new HashMap<>(partitionStates.length);
+                               HashMap<KafkaTopicPartition, Long> 
offsetsToCommit = new HashMap<>(partitionStates.length);
                                for (KafkaTopicPartitionState<?> partitionState 
: partitionStates) {
-                                       
currentOffsets.put(partitionState.getKafkaTopicPartition(), 
partitionState.getOffset());
+                                       
offsetsToCommit.put(partitionState.getKafkaTopicPartition(), 
partitionState.getOffset());
                                }
                                
-                               offsetHandler.writeOffsets(currentOffsets);
+                               
offsetHandler.prepareAndCommitOffsets(offsetsToCommit);
                        }
                }
                catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
index a1a81ed..8f2ef09 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
@@ -77,29 +77,30 @@ public class ZookeeperOffsetHandler {
        // 
------------------------------------------------------------------------
 
        /**
-        * Writes given set of offsets for Kafka partitions to ZooKeeper.
+        * Commits offsets for Kafka partitions to ZooKeeper. The given offsets 
to this method should be the offsets of
+        * the last processed records; this method will take care of 
incrementing the offsets by 1 before committing them so
+        * that the committed offsets to Zookeeper represent the next record to 
process.
         * 
-        * @param offsetsToWrite The offsets for the partitions to write.
+        * @param internalOffsets The internal offsets (representing last 
processed records) for the partitions to commit.
         * @throws Exception The method forwards exceptions.
         */
-       public void writeOffsets(Map<KafkaTopicPartition, Long> offsetsToWrite) 
throws Exception {
-               for (Map.Entry<KafkaTopicPartition, Long> entry : 
offsetsToWrite.entrySet()) {
+       public void prepareAndCommitOffsets(Map<KafkaTopicPartition, Long> 
internalOffsets) throws Exception {
+               for (Map.Entry<KafkaTopicPartition, Long> entry : 
internalOffsets.entrySet()) {
                        KafkaTopicPartition tp = entry.getKey();
-                       long offset = entry.getValue();
 
-                       if (offset >= 0) {
-                               setOffsetInZooKeeper(curatorClient, groupId, 
tp.getTopic(), tp.getPartition(), offset);
+                       Long lastProcessedOffset = entry.getValue();
+                       if (lastProcessedOffset != null && lastProcessedOffset 
>= 0) {
+                               setOffsetInZooKeeper(curatorClient, groupId, 
tp.getTopic(), tp.getPartition(), lastProcessedOffset + 1);
                        }
                }
        }
 
        /**
-        * 
         * @param partitions The partitions to read offsets for.
         * @return The mapping from partition to offset.
         * @throws Exception This method forwards exceptions.
         */
-       public Map<KafkaTopicPartition, Long> 
getOffsets(List<KafkaTopicPartition> partitions) throws Exception {
+       public Map<KafkaTopicPartition, Long> 
getCommittedOffsets(List<KafkaTopicPartition> partitions) throws Exception {
                Map<KafkaTopicPartition, Long> ret = new 
HashMap<>(partitions.size());
                for (KafkaTopicPartition tp : partitions) {
                        Long offset = getOffsetFromZooKeeper(curatorClient, 
groupId, tp.getTopic(), tp.getPartition());

http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index 1c69d78..fabb0fe 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -135,31 +135,21 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
                runBrokerFailureTest();
        }
 
-       // --- special executions ---
+       // --- offset committing ---
 
        @Test(timeout = 60000)
-       public void testBigRecordJob() throws Exception {
-               runBigRecordTestTopology();
+       public void testCommitOffsetsToZookeeper() throws Exception {
+               runCommitOffsetsToKafka();
        }
 
        @Test(timeout = 60000)
-       public void testMultipleTopics() throws Exception {
-               runProduceConsumeMultipleTopics();
-       }
-
-       @Test(timeout = 60000)
-       public void testAllDeletes() throws Exception {
-               runAllDeletesTest();
-       }
-
-       @Test(timeout=60000)
-       public void testEndOfStream() throws Exception {
-               runEndOfStreamTest();
+       public void testStartFromZookeeperCommitOffsets() throws Exception {
+               runStartFromKafkaCommitOffsets();
        }
 
        @Test(timeout = 60000)
-       public void testMetrics() throws Throwable {
-               runMetricsTest();
+       public void testAutoOffsetRetrievalAndCommitToZookeeper() throws 
Exception {
+               runAutoOffsetRetrievalAndCommitToKafka();
        }
 
        @Test
@@ -186,59 +176,6 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
                        fail(e.getMessage());
                }
        }
-       
-       /**
-        * Tests that offsets are properly committed to ZooKeeper and initial 
offsets are read from ZooKeeper.
-        *
-        * This test is only applicable if the Flink Kafka Consumer uses the 
ZooKeeperOffsetHandler.
-        */
-       @Test(timeout = 60000)
-       public void testOffsetInZookeeper() throws Exception {
-               final int parallelism = 3;
-
-               // write a sequence from 0 to 99 to each of the 3 partitions.
-               final String topicName = writeSequence("testOffsetInZK", 100, 
parallelism, 1);
-
-               StreamExecutionEnvironment env1 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-               env1.getConfig().disableSysoutLogging();
-               env1.enableCheckpointing(50);
-               
env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-               env1.setParallelism(parallelism);
-
-               StreamExecutionEnvironment env2 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-               env2.getConfig().disableSysoutLogging();
-               env2.enableCheckpointing(50);
-               
env2.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-               env2.setParallelism(parallelism);
-
-               readSequence(env1, standardProps, parallelism, topicName, 100, 
0);
-
-               CuratorFramework curatorClient = 
((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
-
-               Long o1 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, 
standardProps.getProperty("group.id"), topicName, 0);
-               Long o2 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, 
standardProps.getProperty("group.id"), topicName, 1);
-               Long o3 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, 
standardProps.getProperty("group.id"), topicName, 2);
-
-               LOG.info("Got final offsets from zookeeper o1={}, o2={}, 
o3={}", o1, o2, o3);
-
-               assertTrue(o1 == null || (o1 >= 0 && o1 <= 100));
-               assertTrue(o2 == null || (o2 >= 0 && o2 <= 100));
-               assertTrue(o3 == null || (o3 >= 0 && o3 <= 100));
-
-               LOG.info("Manipulating offsets");
-
-               // set the offset to 50 for the three partitions
-               ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, 
standardProps.getProperty("group.id"), topicName, 0, 49);
-               ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, 
standardProps.getProperty("group.id"), topicName, 1, 49);
-               ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, 
standardProps.getProperty("group.id"), topicName, 2, 49);
-
-               curatorClient.close();
-
-               // create new env
-               readSequence(env2, standardProps, parallelism, topicName, 50, 
50);
-
-               deleteTestTopic(topicName);
-       }
 
        @Test(timeout = 60000)
        public void testOffsetAutocommitTest() throws Exception {
@@ -275,94 +212,37 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 
                // ensure that the offset has been committed
                boolean atLeastOneOffsetSet = (o1 != null && o1 > 0 && o1 <= 
100) ||
-                               (o2 != null && o2 > 0 && o2 <= 100) ||
-                               (o3 != null && o3 > 0 && o3 <= 100);
+                       (o2 != null && o2 > 0 && o2 <= 100) ||
+                       (o3 != null && o3 > 0 && o3 <= 100);
                assertTrue("Expecting at least one offset to be set o1="+o1+" 
o2="+o2+" o3="+o3, atLeastOneOffsetSet);
 
                deleteTestTopic(topicName);
        }
 
-       /**
-        * This test ensures that when the consumers retrieve some start offset 
from kafka (earliest, latest), that this offset
-        * is committed to Zookeeper, even if some partitions are not read
-        *
-        * Test:
-        * - Create 3 topics
-        * - write 50 messages into each.
-        * - Start three consumers with auto.offset.reset='latest' and wait 
until they committed into ZK.
-        * - Check if the offsets in ZK are set to 50 for the three partitions
-        *
-        * See FLINK-3440 as well
-        */
-       @Test(timeout = 60000)
-       public void testKafkaOffsetRetrievalToZookeeper() throws Exception {
-               final int parallelism = 3;
+       // --- special executions ---
 
-               // write a sequence from 0 to 49 to each of the 3 partitions.
-               final String topicName =  writeSequence("testKafkaOffsetToZk", 
50, parallelism, 1);
+       @Test(timeout = 60000)
+       public void testBigRecordJob() throws Exception {
+               runBigRecordTestTopology();
+       }
 
-               final StreamExecutionEnvironment env2 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-               env2.getConfig().disableSysoutLogging();
-               
env2.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-               env2.setParallelism(parallelism);
-               env2.enableCheckpointing(200);
+       @Test(timeout = 60000)
+       public void testMultipleTopics() throws Exception {
+               runProduceConsumeMultipleTopics();
+       }
 
-               Properties readProps = new Properties();
-               readProps.putAll(standardProps);
-               readProps.setProperty("auto.offset.reset", "latest");
-
-               DataStream<String> stream = 
env2.addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), 
readProps));
-               stream.addSink(new DiscardingSink<String>());
-
-               final AtomicReference<Throwable> errorRef = new 
AtomicReference<>();
-               final Thread runner = new Thread("runner") {
-                       @Override
-                       public void run() {
-                               try {
-                                       env2.execute();
-                               }
-                               catch (Throwable t) {
-                                       if (!(t.getCause() instanceof 
JobCancellationException)) {
-                                               errorRef.set(t);
-                                       }
-                               }
-                       }
-               };
-               runner.start();
-
-               final CuratorFramework curatorFramework = 
((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
-               final Long l49 = 49L;
-                               
-               final long deadline = 30000 + System.currentTimeMillis();
-               do {
-                       Long o1 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, 
standardProps.getProperty("group.id"), topicName, 0);
-                       Long o2 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, 
standardProps.getProperty("group.id"), topicName, 1);
-                       Long o3 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, 
standardProps.getProperty("group.id"), topicName, 2);
-
-                       if (l49.equals(o1) && l49.equals(o2) && l49.equals(o3)) 
{
-                               break;
-                       }
-                       
-                       Thread.sleep(100);
-               }
-               while (System.currentTimeMillis() < deadline);
-               
-               // cancel the job
-               
JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
-               
-               final Throwable t = errorRef.get();
-               if (t != null) {
-                       throw new RuntimeException("Job failed with an 
exception", t);
-               }
+       @Test(timeout = 60000)
+       public void testAllDeletes() throws Exception {
+               runAllDeletesTest();
+       }
 
-               // check if offsets are correctly in ZK
-               Long o1 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, 
standardProps.getProperty("group.id"), topicName, 0);
-               Long o2 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, 
standardProps.getProperty("group.id"), topicName, 1);
-               Long o3 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, 
standardProps.getProperty("group.id"), topicName, 2);
-               Assert.assertEquals(Long.valueOf(49L), o1);
-               Assert.assertEquals(Long.valueOf(49L), o2);
-               Assert.assertEquals(Long.valueOf(49L), o3);
+       @Test(timeout=60000)
+       public void testEndOfStream() throws Exception {
+               runEndOfStreamTest();
+       }
 
-               curatorFramework.close();
+       @Test(timeout = 60000)
+       public void testMetrics() throws Throwable {
+               runMetricsTest();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index a0d5002..567d22d 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -35,6 +35,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
+import 
org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
@@ -111,6 +112,11 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
        }
 
        @Override
+       public KafkaOffsetHandler createOffsetHandler(Properties props) {
+               return new KafkaOffsetHandlerImpl(props);
+       }
+
+       @Override
        public void restartBroker(int leaderId) throws Exception {
                brokers.set(leaderId, getKafkaServer(leaderId, 
tmpKafkaDirs.get(leaderId)));
        }
@@ -351,4 +357,29 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                throw new Exception("Could not start Kafka after " + numTries + 
" retries due to port conflicts.");
        }
 
+       private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler {
+
+               private final CuratorFramework offsetClient;
+               private final String groupId;
+
+               public KafkaOffsetHandlerImpl(Properties props) {
+                       offsetClient = createCuratorClient();
+                       groupId = props.getProperty("group.id");
+               }
+
+               @Override
+               public Long getCommittedOffset(String topicName, int partition) 
{
+                       try {
+                               return 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(offsetClient, groupId, topicName, 
partition);
+                       } catch (Exception e) {
+                               throw new RuntimeException("Exception when 
getting offsets from Zookeeper", e);
+                       }
+               }
+
+               @Override
+               public void close() {
+                       offsetClient.close();
+               }
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index 37e40fc..3a3d3de 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -300,7 +300,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, 
TopicPartition> implem
        }
 
        @Override
-       public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> 
offsets) throws Exception {
+       public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> 
offsets) throws Exception {
                KafkaTopicPartitionState<TopicPartition>[] partitions = 
subscribedPartitions();
                Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new 
HashMap<>(partitions.length);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
index 5a638b2..c5cf0cc 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
@@ -143,7 +143,7 @@ public class Kafka09FetcherTest {
                        @Override
                        public void run() {
                                try {
-                                       
fetcher.commitSpecificOffsetsToKafka(testCommitData);
+                                       
fetcher.commitInternalOffsetsToKafka(testCommitData);
                                } catch (Throwable t) {
                                        commitError.set(t);
                                }
@@ -259,7 +259,7 @@ public class Kafka09FetcherTest {
 
                // ----- trigger the first offset commit -----
 
-               fetcher.commitSpecificOffsetsToKafka(testCommitData1);
+               fetcher.commitInternalOffsetsToKafka(testCommitData1);
                Map<TopicPartition, OffsetAndMetadata> result1 = 
commitStore.take();
 
                for (Entry<TopicPartition, OffsetAndMetadata> entry : 
result1.entrySet()) {
@@ -270,13 +270,13 @@ public class Kafka09FetcherTest {
                        }
                        else if (partition.topic().equals("another")) {
                                assertEquals(99, partition.partition());
-                               assertEquals(18L, entry.getValue().offset());
+                               assertEquals(17L, entry.getValue().offset());
                        }
                }
 
                // ----- trigger the second offset commit -----
 
-               fetcher.commitSpecificOffsetsToKafka(testCommitData2);
+               fetcher.commitInternalOffsetsToKafka(testCommitData2);
                Map<TopicPartition, OffsetAndMetadata> result2 = 
commitStore.take();
 
                for (Entry<TopicPartition, OffsetAndMetadata> entry : 
result2.entrySet()) {
@@ -287,7 +287,7 @@ public class Kafka09FetcherTest {
                        }
                        else if (partition.topic().equals("another")) {
                                assertEquals(99, partition.partition());
-                               assertEquals(28L, entry.getValue().offset());
+                               assertEquals(27L, entry.getValue().offset());
                        }
                }
                

http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
index b9ec18a..3d347dc 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -109,4 +109,22 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
        public void testMetrics() throws Throwable {
                runMetricsTest();
        }
+
+       // --- offset committing ---
+
+       @Test(timeout = 60000)
+       public void testCommitOffsetsToKafka() throws Exception {
+               runCommitOffsetsToKafka();
+       }
+
+       @Test(timeout = 60000)
+       public void testStartFromKafkaCommitOffsets() throws Exception {
+               runStartFromKafkaCommitOffsets();
+       }
+
+       // TODO: This test will not pass until FLINK-4727 is resolved
+//     @Test(timeout = 60000)
+//     public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
+//             runAutoOffsetRetrievalAndCommitToKafka();
+//     }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 9d8fa9a..223dacb 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -36,6 +36,9 @@ import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.util.NetUtils;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -103,6 +106,11 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
        }
 
        @Override
+       public KafkaOffsetHandler createOffsetHandler(Properties props) {
+               return new KafkaOffsetHandlerImpl(props);
+       }
+
+       @Override
        public void restartBroker(int leaderId) throws Exception {
                brokers.set(leaderId, getKafkaServer(leaderId, 
tmpKafkaDirs.get(leaderId)));
        }
@@ -203,11 +211,11 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                standardProps.setProperty("zookeeper.connect", 
zookeeperConnectionString);
                standardProps.setProperty("bootstrap.servers", 
brokerConnectionString);
                standardProps.setProperty("group.id", "flink-tests");
-               standardProps.setProperty("auto.commit.enable", "false");
+               standardProps.setProperty("enable.auto.commit", "false");
                standardProps.setProperty("zookeeper.session.timeout.ms", 
zkTimeout);
                standardProps.setProperty("zookeeper.connection.timeout.ms", 
zkTimeout);
                standardProps.setProperty("auto.offset.reset", "earliest"); // 
read from the beginning. (earliest is kafka 0.9 value)
-               standardProps.setProperty("fetch.message.max.bytes", "256"); // 
make a lot of fetches (MESSAGES MUST BE SMALLER!)
+               standardProps.setProperty("max.partition.fetch.bytes", "256"); 
// make a lot of fetches (MESSAGES MUST BE SMALLER!)
 
        }
 
@@ -396,4 +404,24 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                return prop;
        }
 
+       private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler {
+
+               private final KafkaConsumer<byte[], byte[]> offsetClient;
+
+               public KafkaOffsetHandlerImpl(Properties props) {
+                       offsetClient = new KafkaConsumer<>(props);
+               }
+
+               @Override
+               public Long getCommittedOffset(String topicName, int partition) 
{
+                       OffsetAndMetadata committed = 
offsetClient.committed(new TopicPartition(topicName, partition));
+                       return (committed != null) ? committed.offset() : null;
+               }
+
+               @Override
+               public void close() {
+                       offsetClient.close();
+               }
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 05028e6..7d6bd76 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -103,8 +103,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
        //  runtime state (used individually by each parallel subtask) 
        // 
------------------------------------------------------------------------
        
-       /** Data for pending but uncommitted checkpoints */
-       private final LinkedMap pendingCheckpoints = new LinkedMap();
+       /** Data for pending but uncommitted offsets */
+       private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
 
        /** The fetcher implements the connections to the Kafka brokers */
        private transient volatile AbstractFetcher<T, ?> kafkaFetcher;
@@ -347,12 +347,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
                                if (restoreToOffset != null) {
                                        // the map cannot be asynchronously 
updated, because only one checkpoint call can happen
                                        // on this function at a time: either 
snapshotState() or notifyCheckpointComplete()
-                                       pendingCheckpoints.put(checkpointId, 
restoreToOffset);
-
-                                       // truncate the map, to prevent 
infinite growth
-                                       while (pendingCheckpoints.size() > 
MAX_NUM_PENDING_CHECKPOINTS) {
-                                               pendingCheckpoints.remove(0);
-                                       }
+                                       
pendingOffsetsToCommit.put(checkpointId, restoreToOffset);
 
                                        for (Map.Entry<KafkaTopicPartition, 
Long> kafkaTopicPartitionLongEntry : restoreToOffset.entrySet()) {
                                                
listState.add(Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), 
kafkaTopicPartitionLongEntry.getValue()));
@@ -367,17 +362,17 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
 
                                // the map cannot be asynchronously updated, 
because only one checkpoint call can happen
                                // on this function at a time: either 
snapshotState() or notifyCheckpointComplete()
-                               pendingCheckpoints.put(checkpointId, 
currentOffsets);
-
-                               // truncate the map, to prevent infinite growth
-                               while (pendingCheckpoints.size() > 
MAX_NUM_PENDING_CHECKPOINTS) {
-                                       pendingCheckpoints.remove(0);
-                               }
+                               pendingOffsetsToCommit.put(checkpointId, 
currentOffsets);
 
                                for (Map.Entry<KafkaTopicPartition, Long> 
kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {
                                        
listState.add(Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), 
kafkaTopicPartitionLongEntry.getValue()));
                                }
                        }
+
+                       // truncate the map of pending offsets to commit, to 
prevent infinite growth
+                       while (pendingOffsetsToCommit.size() > 
MAX_NUM_PENDING_CHECKPOINTS) {
+                               pendingOffsetsToCommit.remove(0);
+                       }
                }
        }
 
@@ -400,26 +395,26 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
                }
 
                try {
-                       final int posInMap = 
pendingCheckpoints.indexOf(checkpointId);
+                       final int posInMap = 
pendingOffsetsToCommit.indexOf(checkpointId);
                        if (posInMap == -1) {
                                LOG.warn("Received confirmation for unknown 
checkpoint id {}", checkpointId);
                                return;
                        }
 
                        @SuppressWarnings("unchecked")
-                       HashMap<KafkaTopicPartition, Long> checkpointOffsets = 
-                                       (HashMap<KafkaTopicPartition, Long>) 
pendingCheckpoints.remove(posInMap);
+                       HashMap<KafkaTopicPartition, Long> offsets =
+                                       (HashMap<KafkaTopicPartition, Long>) 
pendingOffsetsToCommit.remove(posInMap);
 
                        // remove older checkpoints in map
                        for (int i = 0; i < posInMap; i++) {
-                               pendingCheckpoints.remove(0);
+                               pendingOffsetsToCommit.remove(0);
                        }
 
-                       if (checkpointOffsets == null || 
checkpointOffsets.size() == 0) {
+                       if (offsets == null || offsets.size() == 0) {
                                LOG.debug("Checkpoint state was empty.");
                                return;
                        }
-                       fetcher.commitSpecificOffsetsToKafka(checkpointOffsets);
+                       fetcher.commitInternalOffsetsToKafka(offsets);
                }
                catch (Exception e) {
                        if (running) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 7ee3079..eb01b78 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -159,12 +159,15 @@ public abstract class AbstractFetcher<T, KPH> {
 
        /**
         * Commits the given partition offsets to the Kafka brokers (or to 
ZooKeeper for
-        * older Kafka versions).
+        * older Kafka versions). The given offsets are the internal 
checkpointed offsets, representing
+        * the last processed record of each partition. Version-specific 
implementations of this method
+        * need to hold the contract that the given offsets must be incremented 
by 1 before
+        * committing them, so that committed offsets to Kafka represent "the 
next record to process".
         * 
-        * @param offsets The offsets to commit to Kafka.
+        * @param offsets The offsets to commit to Kafka (implementations must 
increment offsets by 1 before committing).
         * @throws Exception This method forwards exceptions.
         */
-       public abstract void 
commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws 
Exception;
+       public abstract void 
commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws 
Exception;
        
        // 
------------------------------------------------------------------------
        //  snapshot and restore the state

http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 6d2dc70..97220c2 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -161,9 +161,17 @@ public class FlinkKafkaConsumerBaseTest {
                assertFalse(listState.get().iterator().hasNext());
        }
 
+       /**
+        * Tests that on snapshots, states and offsets to commit to Kafka are 
correct
+        */
        @Test
        @SuppressWarnings("unchecked")
        public void testSnapshotState() throws Exception {
+
+               // 
--------------------------------------------------------------------
+               //   prepare fake states
+               // 
--------------------------------------------------------------------
+
                final HashMap<KafkaTopicPartition, Long> state1 = new 
HashMap<>();
                state1.put(new KafkaTopicPartition("abc", 13), 16768L);
                state1.put(new KafkaTopicPartition("def", 7), 987654321L);
@@ -176,13 +184,15 @@ public class FlinkKafkaConsumerBaseTest {
                state3.put(new KafkaTopicPartition("abc", 13), 16780L);
                state3.put(new KafkaTopicPartition("def", 7), 987654377L);
 
+               // 
--------------------------------------------------------------------
+               
                final AbstractFetcher<String, ?> fetcher = 
mock(AbstractFetcher.class);
                when(fetcher.snapshotCurrentState()).thenReturn(state1, state2, 
state3);
-
-               final LinkedMap pendingCheckpoints = new LinkedMap();
-
-               FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, 
pendingCheckpoints, true);
-               assertEquals(0, pendingCheckpoints.size());
+                       
+               final LinkedMap pendingOffsetsToCommit = new LinkedMap();
+       
+               FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, 
pendingOffsetsToCommit, true);
+               assertEquals(0, pendingOffsetsToCommit.size());
 
                OperatorStateStore backend = mock(OperatorStateStore.class);
 
@@ -207,8 +217,8 @@ public class FlinkKafkaConsumerBaseTest {
                }
 
                assertEquals(state1, snapshot1);
-               assertEquals(1, pendingCheckpoints.size());
-               assertEquals(state1, pendingCheckpoints.get(138L));
+               assertEquals(1, pendingOffsetsToCommit.size());
+               assertEquals(state1, pendingOffsetsToCommit.get(138L));
 
                // checkpoint 2
                consumer.prepareSnapshot(140L, 140L);
@@ -221,13 +231,13 @@ public class FlinkKafkaConsumerBaseTest {
                }
 
                assertEquals(state2, snapshot2);
-               assertEquals(2, pendingCheckpoints.size());
-               assertEquals(state2, pendingCheckpoints.get(140L));
-
+               assertEquals(2, pendingOffsetsToCommit.size());
+               assertEquals(state2, pendingOffsetsToCommit.get(140L));
+               
                // ack checkpoint 1
                consumer.notifyCheckpointComplete(138L);
-               assertEquals(1, pendingCheckpoints.size());
-               assertTrue(pendingCheckpoints.containsKey(140L));
+               assertEquals(1, pendingOffsetsToCommit.size());
+               assertTrue(pendingOffsetsToCommit.containsKey(140L));
 
                // checkpoint 3
                consumer.prepareSnapshot(141L, 141L);
@@ -240,16 +250,16 @@ public class FlinkKafkaConsumerBaseTest {
                }
 
                assertEquals(state3, snapshot3);
-               assertEquals(2, pendingCheckpoints.size());
-               assertEquals(state3, pendingCheckpoints.get(141L));
-
+               assertEquals(2, pendingOffsetsToCommit.size());
+               assertEquals(state3, pendingOffsetsToCommit.get(141L));
+               
                // ack checkpoint 3, subsumes number 2
                consumer.notifyCheckpointComplete(141L);
-               assertEquals(0, pendingCheckpoints.size());
+               assertEquals(0, pendingOffsetsToCommit.size());
 
 
                consumer.notifyCheckpointComplete(666); // invalid checkpoint
-               assertEquals(0, pendingCheckpoints.size());
+               assertEquals(0, pendingOffsetsToCommit.size());
 
                OperatorStateStore operatorStateStore = 
mock(OperatorStateStore.class);
                TestingListState<Tuple2<KafkaTopicPartition, Long>> listState = 
new TestingListState<>();
@@ -260,24 +270,24 @@ public class FlinkKafkaConsumerBaseTest {
                        consumer.prepareSnapshot(i, i);
                        listState.clear();
                }
-               
assertEquals(FlinkKafkaConsumerBase.MAX_NUM_PENDING_CHECKPOINTS, 
pendingCheckpoints.size());
+               
assertEquals(FlinkKafkaConsumerBase.MAX_NUM_PENDING_CHECKPOINTS, 
pendingOffsetsToCommit.size());
 
                // commit only the second last
                consumer.notifyCheckpointComplete(598);
-               assertEquals(1, pendingCheckpoints.size());
+               assertEquals(1, pendingOffsetsToCommit.size());
 
                // access invalid checkpoint
                consumer.notifyCheckpointComplete(590);
 
                // and the last
                consumer.notifyCheckpointComplete(599);
-               assertEquals(0, pendingCheckpoints.size());
+               assertEquals(0, pendingOffsetsToCommit.size());
        }
 
        // 
------------------------------------------------------------------------
 
        private static <T> FlinkKafkaConsumerBase<T> getConsumer(
-                       AbstractFetcher<T, ?> fetcher, LinkedMap 
pendingCheckpoints, boolean running) throws Exception
+                       AbstractFetcher<T, ?> fetcher, LinkedMap 
pendingOffsetsToCommit, boolean running) throws Exception
        {
                FlinkKafkaConsumerBase<T> consumer = new 
DummyFlinkKafkaConsumer<>();
 
@@ -285,9 +295,9 @@ public class FlinkKafkaConsumerBaseTest {
                fetcherField.setAccessible(true);
                fetcherField.set(consumer, fetcher);
 
-               Field mapField = 
FlinkKafkaConsumerBase.class.getDeclaredField("pendingCheckpoints");
+               Field mapField = 
FlinkKafkaConsumerBase.class.getDeclaredField("pendingOffsetsToCommit");
                mapField.setAccessible(true);
-               mapField.set(consumer, pendingCheckpoints);
+               mapField.set(consumer, pendingOffsetsToCommit);
 
                Field runningField = 
FlinkKafkaConsumerBase.class.getDeclaredField("running");
                runningField.setAccessible(true);

http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 0810a3e..7b06cfd 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -29,6 +29,7 @@ import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -97,7 +98,6 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collections;
 import java.util.Date;
@@ -201,6 +201,249 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        }
                }
        }
+
+       /**
+        * Ensures that the committed offsets to Kafka are the offsets of "the 
next record to process"
+        */
+       public void runCommitOffsetsToKafka() throws Exception {
+               // 3 partitions with 50 records each (0-49, so the expected 
commit offset of each partition should be 50)
+               final int parallelism = 3;
+               final int recordsInEachPartition = 50;
+
+               final String topicName = 
writeSequence("testCommitOffsetsToKafkaTopic", recordsInEachPartition, 
parallelism, 1);
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               env.getConfig().disableSysoutLogging();
+               
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+               env.setParallelism(parallelism);
+               env.enableCheckpointing(200);
+
+               DataStream<String> stream = 
env.addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), 
standardProps));
+               stream.addSink(new DiscardingSink<String>());
+
+               final AtomicReference<Throwable> errorRef = new 
AtomicReference<>();
+               final Thread runner = new Thread("runner") {
+                       @Override
+                       public void run() {
+                               try {
+                                       env.execute();
+                               }
+                               catch (Throwable t) {
+                                       if (!(t.getCause() instanceof 
JobCancellationException)) {
+                                               errorRef.set(t);
+                                       }
+                               }
+                       }
+               };
+               runner.start();
+
+               final Long l50 = 50L; // the final committed offset in Kafka 
should be 50
+               final long deadline = 30000 + System.currentTimeMillis();
+
+               KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
+
+               do {
+                       Long o1 = 
kafkaOffsetHandler.getCommittedOffset(topicName, 0);
+                       Long o2 = 
kafkaOffsetHandler.getCommittedOffset(topicName, 1);
+                       Long o3 = 
kafkaOffsetHandler.getCommittedOffset(topicName, 2);
+
+                       if (l50.equals(o1) && l50.equals(o2) && l50.equals(o3)) 
{
+                               break;
+                       }
+
+                       Thread.sleep(100);
+               }
+               while (System.currentTimeMillis() < deadline);
+
+               // cancel the job
+               
JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+
+               final Throwable t = errorRef.get();
+               if (t != null) {
+                       throw new RuntimeException("Job failed with an 
exception", t);
+               }
+
+               // final check to see if offsets are correctly in Kafka
+               Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0);
+               Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1);
+               Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2);
+               Assert.assertEquals(Long.valueOf(50L), o1);
+               Assert.assertEquals(Long.valueOf(50L), o2);
+               Assert.assertEquals(Long.valueOf(50L), o3);
+
+               kafkaOffsetHandler.close();
+               deleteTestTopic(topicName);
+       }
+
+       /**
+        * This test first writes a total of 300 records to a test topic, reads 
the first 150 so that some offsets are
+        * committed to Kafka, and then startup the consumer again to read the 
remaining records starting from the committed offsets.
+        * The test ensures that whatever offsets were committed to Kafka, the 
consumer correctly picks them up
+        * and starts at the correct position.
+        */
+       public void runStartFromKafkaCommitOffsets() throws Exception {
+               final int parallelism = 3;
+               final int recordsInEachPartition = 300;
+
+               final String topicName = 
writeSequence("testStartFromKafkaCommitOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
+
+               KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
+
+               Long o1;
+               Long o2;
+               Long o3;
+               int attempt = 0;
+               // make sure that o1, o2, o3 are not all null before proceeding
+               do {
+                       attempt++;
+                       LOG.info("Attempt " + attempt + " to read records and 
commit some offsets to Kafka");
+
+                       final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+                       env.getConfig().disableSysoutLogging();
+                       
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+                       env.setParallelism(parallelism);
+                       env.enableCheckpointing(20); // fast checkpoints to 
make sure we commit some offsets
+
+                       env
+                               .addSource(kafkaServer.getConsumer(topicName, 
new SimpleStringSchema(), standardProps))
+                               .map(new ThrottledMapper<String>(50))
+                               .map(new MapFunction<String, Object>() {
+                                       int count = 0;
+                                       @Override
+                                       public Object map(String value) throws 
Exception {
+                                               count++;
+                                               if (count == 150) {
+                                                       throw new 
SuccessException();
+                                               }
+                                               return null;
+                                       }
+                               })
+                               .addSink(new DiscardingSink<>());
+
+                       tryExecute(env, "Read some records to commit offsets to 
Kafka");
+
+                       o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 
0);
+                       o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 
1);
+                       o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 
2);
+               } while (o1 == null && o2 == null && o3 == null && attempt < 3);
+
+               if (o1 == null && o2 == null && o3 == null) {
+                       throw new RuntimeException("No offsets have been 
committed after 3 attempts");
+               }
+
+               LOG.info("Got final committed offsets from Kafka o1={}, o2={}, 
o3={}", o1, o2, o3);
+
+               final StreamExecutionEnvironment env2 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               env2.getConfig().disableSysoutLogging();
+               
env2.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+               env2.setParallelism(parallelism);
+
+               // whatever offsets were committed for each partition, the 
consumer should pick
+               // them up and start from the correct position so that the 
remaining records are all read
+               HashMap<Integer, Tuple2<Integer, Integer>> 
partitionsToValuesCountAndStartOffset = new HashMap<>();
+               partitionsToValuesCountAndStartOffset.put(0, new Tuple2<>(
+                       (o1 != null) ? (int) (recordsInEachPartition - o1) : 
recordsInEachPartition,
+                       (o1 != null) ? o1.intValue() : 0
+               ));
+               partitionsToValuesCountAndStartOffset.put(1, new Tuple2<>(
+                       (o2 != null) ? (int) (recordsInEachPartition - o2) : 
recordsInEachPartition,
+                       (o2 != null) ? o2.intValue() : 0
+               ));
+               partitionsToValuesCountAndStartOffset.put(2, new Tuple2<>(
+                       (o3 != null) ? (int) (recordsInEachPartition - o3) : 
recordsInEachPartition,
+                       (o3 != null) ? o3.intValue() : 0
+               ));
+
+               readSequence(env2, standardProps, topicName, 
partitionsToValuesCountAndStartOffset);
+
+               kafkaOffsetHandler.close();
+               deleteTestTopic(topicName);
+       }
+
+       /**
+        * This test ensures that when the consumers retrieve some start offset 
from kafka (earliest, latest), that this offset
+        * is committed to Kafka, even if some partitions are not read.
+        *
+        * Test:
+        * - Create 3 partitions
+        * - write 50 messages into each.
+        * - Start three consumers with auto.offset.reset='latest' and wait 
until they committed into Kafka.
+        * - Check if the offsets in Kafka are set to 50 for the three 
partitions
+        *
+        * See FLINK-3440 as well
+        */
+       public void runAutoOffsetRetrievalAndCommitToKafka() throws Exception {
+               // 3 partitions with 50 records each (0-49, so the expected 
commit offset of each partition should be 50)
+               final int parallelism = 3;
+               final int recordsInEachPartition = 50;
+
+               final String topicName = 
writeSequence("testAutoOffsetRetrievalAndCommitToKafkaTopic", 
recordsInEachPartition, parallelism, 1);
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               env.getConfig().disableSysoutLogging();
+               
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+               env.setParallelism(parallelism);
+               env.enableCheckpointing(200);
+
+               Properties readProps = new Properties();
+               readProps.putAll(standardProps);
+               readProps.setProperty("auto.offset.reset", "latest"); // set to 
reset to latest, so that partitions are initially not read
+
+               DataStream<String> stream = 
env.addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), 
readProps));
+               stream.addSink(new DiscardingSink<String>());
+
+               final AtomicReference<Throwable> errorRef = new 
AtomicReference<>();
+               final Thread runner = new Thread("runner") {
+                       @Override
+                       public void run() {
+                               try {
+                                       env.execute();
+                               }
+                               catch (Throwable t) {
+                                       if (!(t.getCause() instanceof 
JobCancellationException)) {
+                                               errorRef.set(t);
+                                       }
+                               }
+                       }
+               };
+               runner.start();
+
+               KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
+
+               final Long l50 = 50L; // the final committed offset in Kafka 
should be 50
+               final long deadline = 30000 + System.currentTimeMillis();
+               do {
+                       Long o1 = 
kafkaOffsetHandler.getCommittedOffset(topicName, 0);
+                       Long o2 = 
kafkaOffsetHandler.getCommittedOffset(topicName, 1);
+                       Long o3 = 
kafkaOffsetHandler.getCommittedOffset(topicName, 2);
+
+                       if (l50.equals(o1) && l50.equals(o2) && l50.equals(o3)) 
{
+                               break;
+                       }
+
+                       Thread.sleep(100);
+               }
+               while (System.currentTimeMillis() < deadline);
+
+               // cancel the job
+               
JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+
+               final Throwable t = errorRef.get();
+               if (t != null) {
+                       throw new RuntimeException("Job failed with an 
exception", t);
+               }
+
+               // final check to see if offsets are correctly in Kafka
+               Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0);
+               Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1);
+               Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2);
+               Assert.assertEquals(Long.valueOf(50L), o1);
+               Assert.assertEquals(Long.valueOf(50L), o2);
+               Assert.assertEquals(Long.valueOf(50L), o3);
+
+               kafkaOffsetHandler.close();
+               deleteTestTopic(topicName);
+       }
        
        /**
         * Ensure Kafka is working on both producer and consumer side.
@@ -1346,48 +1589,80 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
        //  Reading writing test data sets
        // 
------------------------------------------------------------------------
 
+       /**
+        * Runs a job using the provided environment to read a sequence of 
records from a single Kafka topic.
+        * The method allows to individually specify the expected starting 
offset and total read value count of each partition.
+        * The job will be considered successful only if all partition read 
results match the start offset and value count criteria.
+        */
        protected void readSequence(StreamExecutionEnvironment env, Properties 
cc,
-                                                               final int 
sourceParallelism,
                                                                final String 
topicName,
-                                                               final int 
valuesCount, final int startFrom) throws Exception {
+                                                               final 
Map<Integer, Tuple2<Integer, Integer>> partitionsToValuesCountAndStartOffset) 
throws Exception {
+               final int sourceParallelism = 
partitionsToValuesCountAndStartOffset.keySet().size();
 
-               final int finalCount = valuesCount * sourceParallelism;
+               int finalCountTmp = 0;
+               for (Map.Entry<Integer, Tuple2<Integer, Integer>> 
valuesCountAndStartOffset : partitionsToValuesCountAndStartOffset.entrySet()) {
+                       finalCountTmp += 
valuesCountAndStartOffset.getValue().f0;
+               }
+               final int finalCount = finalCountTmp;
 
                final TypeInformation<Tuple2<Integer, Integer>> intIntTupleType 
= TypeInfoParser.parse("Tuple2<Integer, Integer>");
 
                final TypeInformationSerializationSchema<Tuple2<Integer, 
Integer>> deser =
-                               new 
TypeInformationSerializationSchema<>(intIntTupleType, env.getConfig());
+                       new 
TypeInformationSerializationSchema<>(intIntTupleType, env.getConfig());
 
                // create the consumer
                cc.putAll(secureProps);
                FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> consumer = 
kafkaServer.getConsumer(topicName, deser, cc);
 
                DataStream<Tuple2<Integer, Integer>> source = env
-                               
.addSource(consumer).setParallelism(sourceParallelism)
-                               .map(new ThrottledMapper<Tuple2<Integer, 
Integer>>(20)).setParallelism(sourceParallelism);
+                       .addSource(consumer).setParallelism(sourceParallelism)
+                       .map(new ThrottledMapper<Tuple2<Integer, 
Integer>>(20)).setParallelism(sourceParallelism);
 
                // verify data
                source.flatMap(new RichFlatMapFunction<Tuple2<Integer, 
Integer>, Integer>() {
 
-                       private int[] values = new int[valuesCount];
+                       private HashMap<Integer, BitSet> partitionsToValueCheck;
                        private int count = 0;
 
                        @Override
+                       public void open(Configuration parameters) throws 
Exception {
+                               partitionsToValueCheck = new HashMap<>();
+                               for (Integer partition : 
partitionsToValuesCountAndStartOffset.keySet()) {
+                                       partitionsToValueCheck.put(partition, 
new BitSet());
+                               }
+                       }
+
+                       @Override
                        public void flatMap(Tuple2<Integer, Integer> value, 
Collector<Integer> out) throws Exception {
-                               values[value.f1 - startFrom]++;
+                               int partition = value.f0;
+                               int val = value.f1;
+
+                               BitSet bitSet = 
partitionsToValueCheck.get(partition);
+                               if (bitSet == null) {
+                                       throw new RuntimeException("Got a 
record from an unknown partition");
+                               } else {
+                                       bitSet.set(val - 
partitionsToValuesCountAndStartOffset.get(partition).f1);
+                               }
+
                                count++;
+
                                LOG.info("Received message {}, total {} 
messages", value, count);
 
                                // verify if we've seen everything
                                if (count == finalCount) {
-                                       for (int i = 0; i < values.length; i++) 
{
-                                               int v = values[i];
-                                               if (v != sourceParallelism) {
-                                                       printTopic(topicName, 
valuesCount, deser);
-                                                       throw new 
RuntimeException("Expected v to be " + sourceParallelism + 
-                                                                       ", but 
was " + v + " on element " + i + " array=" + Arrays.toString(values));
+                                       for (Map.Entry<Integer, BitSet> 
partitionsToValueCheck : this.partitionsToValueCheck.entrySet()) {
+                                               BitSet check = 
partitionsToValueCheck.getValue();
+                                               int expectedValueCount = 
partitionsToValuesCountAndStartOffset.get(partitionsToValueCheck.getKey()).f0;
+
+                                               if (check.cardinality() != 
expectedValueCount) {
+                                                       throw new 
RuntimeException("Expected cardinality to be " + expectedValueCount +
+                                                               ", but was " + 
check.cardinality());
+                                               } else if 
(check.nextClearBit(0) != expectedValueCount) {
+                                                       throw new 
RuntimeException("Expected next clear bit to be " + expectedValueCount +
+                                                               ", but was " + 
check.cardinality());
                                                }
                                        }
+
                                        // test has passed
                                        throw new SuccessException();
                                }
@@ -1400,6 +1675,21 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                LOG.info("Successfully read sequence for verification");
        }
 
+       /**
+        * Variant of {@link 
KafkaConsumerTestBase#readSequence(StreamExecutionEnvironment, Properties, 
String, Map)} to
+        * expect reading from the same start offset and the same value count 
for all partitions of a single Kafka topic.
+        */
+       protected void readSequence(StreamExecutionEnvironment env, Properties 
cc,
+                                                               final int 
sourceParallelism,
+                                                               final String 
topicName,
+                                                               final int 
valuesCount, final int startFrom) throws Exception {
+               HashMap<Integer, Tuple2<Integer, Integer>> 
partitionsToValuesCountAndStartOffset = new HashMap<>();
+               for (int i = 0; i < sourceParallelism; i++) {
+                       partitionsToValuesCountAndStartOffset.put(i, new 
Tuple2<>(valuesCount, startFrom));
+               }
+               readSequence(env, cc, topicName, 
partitionsToValuesCountAndStartOffset);
+       }
+
        protected String writeSequence(
                        String baseTopicName,
                        final int numElements,
@@ -1430,7 +1720,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        LOG.info("Writing attempt #1");
                        
                        // -------- Write the Sequence --------
-                       
+
                        createTestTopic(topicName, parallelism, 
replicationFactor);
 
                        StreamExecutionEnvironment writeEnv = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);

http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index ded1fde..806d342 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -29,6 +29,8 @@ import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.Properties;
 
 /**
@@ -83,6 +85,14 @@ public abstract class KafkaTestEnvironment {
                                                                                
                                KeyedSerializationSchema<T> serSchema, 
Properties props,
                                                                                
                                KafkaPartitioner<T> partitioner);
 
+       // -- offset handlers
+
+       public interface KafkaOffsetHandler {
+               Long getCommittedOffset(String topicName, int partition);
+               void close();
+       }
+
+       public abstract KafkaOffsetHandler createOffsetHandler(Properties 
props);
 
        // -- leader failure simulation
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
index 582311f..7db6ba4 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
@@ -225,7 +225,7 @@ public class AbstractFetcherTimestampsTest {
                }
 
                @Override
-               public void 
commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws 
Exception {
+               public void 
commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws 
Exception {
                        throw new UnsupportedOperationException();
                }
        }

Reply via email to