[FLINK-4035] Refactor the Kafka 0.10 connector to be based upon the 0.9 
connector

Add a test case for Kafka's new timestamp functionality and update the 
documentation.

This closes #2369


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6731ec1e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6731ec1e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6731ec1e

Branch: refs/heads/master
Commit: 6731ec1e48d0a0092dd2330adda73bcf37fda8d7
Parents: 63859c6
Author: Robert Metzger <rmetz...@apache.org>
Authored: Tue Aug 9 16:38:21 2016 +0200
Committer: Robert Metzger <rmetz...@apache.org>
Committed: Tue Oct 11 10:04:25 2016 +0200

----------------------------------------------------------------------
 docs/dev/connectors/kafka.md                    |  67 +++-
 docs/page/js/flink.js                           |   3 +-
 .../flink-connector-kafka-0.10/pom.xml          |  50 +--
 .../connectors/kafka/FlinkKafkaConsumer010.java | 121 +------
 .../connectors/kafka/FlinkKafkaProducer010.java | 315 +++++++++++++++++--
 .../kafka/Kafka010JsonTableSource.java          |   2 +-
 .../connectors/kafka/Kafka010TableSource.java   |   2 +-
 .../kafka/internal/Kafka010Fetcher.java         | 268 ++--------------
 .../connectors/kafka/Kafka010ITCase.java        | 266 +++++++++++-----
 .../connectors/kafka/KafkaProducerTest.java     | 119 -------
 .../kafka/KafkaTestEnvironmentImpl.java         |  80 ++++-
 .../kafka/internals/SimpleConsumerThread.java   |   2 +-
 .../kafka/KafkaTestEnvironmentImpl.java         |   7 +-
 .../connectors/kafka/FlinkKafkaConsumer09.java  |   4 +-
 .../connectors/kafka/FlinkKafkaProducer09.java  |   2 +-
 .../kafka/internal/Kafka09Fetcher.java          |  22 +-
 .../kafka/KafkaTestEnvironmentImpl.java         |   6 +-
 .../kafka/FlinkKafkaConsumerBase.java           |   4 +
 .../kafka/FlinkKafkaProducerBase.java           |   4 +-
 .../kafka/internals/AbstractFetcher.java        |  43 +--
 ...picPartitionStateWithPeriodicWatermarks.java |   4 +-
 ...cPartitionStateWithPunctuatedWatermarks.java |   4 +-
 .../connectors/kafka/KafkaConsumerTestBase.java | 201 ++++++------
 .../connectors/kafka/KafkaProducerTestBase.java |   5 +-
 .../kafka/KafkaShortRetentionTestBase.java      |   4 +-
 .../connectors/kafka/KafkaTestEnvironment.java  |   7 +-
 .../AbstractFetcherTimestampsTest.java          |  68 ++--
 .../kafka/testutils/DataGenerators.java         |  87 ++---
 .../testutils/JobManagerCommunicationUtils.java |  21 +-
 29 files changed, 936 insertions(+), 852 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index d2221fa..9a360d4 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -46,14 +46,6 @@ For most users, the `FlinkKafkaConsumer08` (part of 
`flink-connector-kafka`) is
   </thead>
   <tbody>
     <tr>
-        <td>flink-connector-kafka</td>
-        <td>0.9.1, 0.10</td>
-        <td>FlinkKafkaConsumer082<br>
-        FlinkKafkaProducer</td>
-        <td>0.8.x</td>
-        <td>Uses the <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example";>SimpleConsumer</a>
 API of Kafka internally. Offsets are committed to ZK by Flink.</td>
-    </tr>
-     <tr>
         <td>flink-connector-kafka-0.8{{ site.scala_version_suffix }}</td>
         <td>1.0.0</td>
         <td>FlinkKafkaConsumer08<br>
@@ -61,7 +53,7 @@ For most users, the `FlinkKafkaConsumer08` (part of 
`flink-connector-kafka`) is
         <td>0.8.x</td>
         <td>Uses the <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example";>SimpleConsumer</a>
 API of Kafka internally. Offsets are committed to ZK by Flink.</td>
     </tr>
-     <tr>
+    <tr>
         <td>flink-connector-kafka-0.9{{ site.scala_version_suffix }}</td>
         <td>1.0.0</td>
         <td>FlinkKafkaConsumer09<br>
@@ -69,6 +61,14 @@ For most users, the `FlinkKafkaConsumer08` (part of 
`flink-connector-kafka`) is
         <td>0.9.x</td>
         <td>Uses the new <a 
href="http://kafka.apache.org/documentation.html#newconsumerapi";>Consumer 
API</a> Kafka.</td>
     </tr>
+    <tr>
+        <td>flink-connector-kafka-0.10{{ site.scala_version_suffix }}</td>
+        <td>1.2.0</td>
+        <td>FlinkKafkaConsumer010<br>
+        FlinkKafkaProducer010</td>
+        <td>0.10.x</td>
+        <td>This connector supports <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message";>Kafka
 messages with timestamps</a> both for producing and consuming.</td>
+    </tr>
   </tbody>
 </table>
 
@@ -87,7 +87,6 @@ Note that the streaming connectors are currently not part of 
the binary distribu
 ### Installing Apache Kafka
 
 * Follow the instructions from [Kafka's 
quickstart](https://kafka.apache.org/documentation.html#quickstart) to download 
the code and launch a server (launching a Zookeeper and a Kafka server is 
required every time before starting the application).
-* On 32 bit computers 
[this](http://stackoverflow.com/questions/22325364/unrecognized-vm-option-usecompressedoops-when-running-kafka-from-my-ubuntu-in)
 problem may occur.
 * If the Kafka and Zookeeper servers are running on a remote machine, then the 
`advertised.host.name` setting in the `config/server.properties` file must be 
set to the machine's IP address.
 
 ### Kafka Consumer
@@ -256,17 +255,28 @@ records to partitions.
 
 Example:
 
+
 <div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
+<div data-lang="java, Kafka 0.8+" markdown="1">
 {% highlight java %}
 stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092", "my-topic", 
new SimpleStringSchema()));
 {% endhighlight %}
 </div>
-<div data-lang="scala" markdown="1">
+<div data-lang="java, Kafka 0.10+" markdown="1">
+{% highlight java %}
+FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new 
SimpleStringSchema(), properties);
+{% endhighlight %}
+</div>
+<div data-lang="scala, Kafka 0.8+" markdown="1">
 {% highlight scala %}
 stream.addSink(new FlinkKafkaProducer08[String]("localhost:9092", "my-topic", 
new SimpleStringSchema()))
 {% endhighlight %}
 </div>
+<div data-lang="scala, Kafka 0.10+" markdown="1">
+{% highlight scala %}
+FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new 
SimpleStringSchema(), properties);
+{% endhighlight %}
+</div>
 </div>
 
 You can also define a custom Kafka producer configuration for the KafkaSink 
with the constructor. Please refer to
@@ -287,3 +297,36 @@ higher value.
 
 There is currently no transactional producer for Kafka, so Flink can not 
guarantee exactly-once delivery
 into a Kafka topic.
+
+### Using Kafka timestamps and Flink event time in Kafka 0.10
+
+Since Apache Kafka 0.10., Kafka's messages can carry 
[timestamps](https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message),
 indicating
+the time the event has occurred (see ["event time" in Apache 
Flink](../event_time.html)) or the time when the message
+has been written to the Kafka broker.
+
+The `FlinkKafkaConsumer010` will emit records with the timestamp attached, if 
the time characteristic in Flink is 
+set to `TimeCharacteristic.EventTime` 
(`StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)`).
+
+The Kafka consumer does not emit watermarks. To emit watermarks, the same 
mechanisms as described above in 
+"Kafka Consumers and Timestamp Extraction/Watermark Emission"  using the 
`assignTimestampsAndWatermarks` method are applicable.
+
+There is no need to define a timestamp extractor when using the timestamps 
from Kafka. The `previousElementTimestamp` argument of 
+the `extractTimestamp()` method contains the timestamp carried by the Kafka 
message.
+
+A timestamp extractor for a Kafka consumer would look like this:
+{% highlight java %}
+public long extractTimestamp(Long element, long previousElementTimestamp) {
+    return previousElementTimestamp;
+}
+{% endhighlight %}
+
+
+
+The `FlinkKafkaProducer010` only emits the record timestamp, if 
`setWriteTimestampToKafka(true)` is set.
+
+{% highlight java %}
+FlinkKafkaProducer010.FlinkKafkaProducer010Configuration config = 
FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, 
new SimpleStringSchema(), standardProps);
+config.setWriteTimestampToKafka(true);
+{% endhighlight %}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/docs/page/js/flink.js
----------------------------------------------------------------------
diff --git a/docs/page/js/flink.js b/docs/page/js/flink.js
index fdf972c..885a8ff 100644
--- a/docs/page/js/flink.js
+++ b/docs/page/js/flink.js
@@ -42,6 +42,7 @@ function codeTabs() {
       var image = $(this).data("image");
       var notabs = $(this).data("notabs");
       var capitalizedLang = lang.substr(0, 1).toUpperCase() + lang.substr(1);
+      lang = lang.replace(/[^a-zA-Z0-9]/g, "_");
       var id = "tab_" + lang + "_" + counter;
       $(this).attr("id", id);
       if (image != null && langImages[lang]) {
@@ -99,9 +100,7 @@ function viewSolution() {
 // A script to fix internal hash links because we have an overlapping top bar.
 // Based on 
https://github.com/twitter/bootstrap/issues/193#issuecomment-2281510
 function maybeScrollToHash() {
-  console.log("HERE");
   if (window.location.hash && $(window.location.hash).length) {
-    console.log("HERE2", $(window.location.hash), 
$(window.location.hash).offset().top);
     var newTop = $(window.location.hash).offset().top - 57;
     $(window).scrollTop(newTop);
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml 
b/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
index f2bcb11..0b426b5 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
@@ -26,7 +26,7 @@ under the License.
        <parent>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-connectors</artifactId>
-               <version>1.1-SNAPSHOT</version>
+               <version>1.2-SNAPSHOT</version>
                <relativePath>..</relativePath>
        </parent>
 
@@ -37,7 +37,7 @@ under the License.
 
        <!-- Allow users to pass custom connector versions -->
        <properties>
-               <kafka.version>0.10.0.0</kafka.version>
+               <kafka.version>0.10.0.1</kafka.version>
        </properties>
 
        <dependencies>
@@ -46,21 +46,16 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-streaming-java_2.10</artifactId>
+                       <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
                        <version>${project.version}</version>
-                       <scope>provided</scope>
                </dependency>
 
+               <!-- Add Kafka 0.10.x as a dependency -->
+
                <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-connector-kafka-base_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <exclusions>
-                               <exclusion>
-                                       <groupId>org.apache.kafka</groupId>
-                                       
<artifactId>kafka_${scala.binary.version}</artifactId>
-                               </exclusion>
-                       </exclusions>
+                       <groupId>org.apache.kafka</groupId>
+                       <artifactId>kafka-clients</artifactId>
+                       <version>${kafka.version}</version>
                </dependency>
 
                <dependency>
@@ -73,20 +68,29 @@ under the License.
                        <optional>true</optional>
                </dependency>
 
+               <!-- test dependencies -->
+
                <dependency>
-                       <groupId>org.apache.kafka</groupId>
-                       <artifactId>kafka-clients</artifactId>
-                       <version>${kafka.version}</version>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <exclusions>
+                               <!-- exclude Kafka dependencies -->
+                               <exclusion>
+                                       <groupId>org.apache.kafka</groupId>
+                                       
<artifactId>kafka_${scala.binary.version}</artifactId>
+                               </exclusion>
+                       </exclusions>
+                       <type>test-jar</type>
+                       <scope>test</scope>
                </dependency>
 
-               <!-- test dependencies -->
-
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-connector-kafka-base_2.10</artifactId>
                        <version>${project.version}</version>
                        <exclusions>
-                               <!-- exclude 0.8 dependencies -->
+                               <!-- exclude Kafka dependencies -->
                                <exclusion>
                                        <groupId>org.apache.kafka</groupId>
                                        
<artifactId>kafka_${scala.binary.version}</artifactId>
@@ -127,6 +131,14 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-metrics-jmx</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+
        </dependencies>
 
        <build>

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
index 78ccd4a..267ff57 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
@@ -28,20 +28,10 @@ import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 import org.apache.flink.util.SerializedValue;
 
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * The Flink Kafka Consumer is a streaming data source that pulls a parallel 
data stream from
@@ -64,30 +54,10 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * is constructed. That means that the client that submits the program needs 
to be able to
  * reach the Kafka brokers or ZooKeeper.</p>
  */
-public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumerBase<T> {
+public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
 
        private static final long serialVersionUID = 2324564345203409112L;
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKafkaConsumer010.class);
-
-       /**  Configuration key to change the polling timeout **/
-       public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";
-
-       /** Boolean configuration key to disable metrics tracking **/
-       public static final String KEY_DISABLE_METRICS = 
"flink.disable-metrics";
-
-       /** From Kafka's Javadoc: The time, in milliseconds, spent waiting in 
poll if data is not
-        * available. If 0, returns immediately with any records that are 
available now. */
-       public static final long DEFAULT_POLL_TIMEOUT = 100L;
-
-       // 
------------------------------------------------------------------------
-
-       /** User-supplied properties for Kafka **/
-       private final Properties properties;
-
-       /** From Kafka's Javadoc: The time, in milliseconds, spent waiting in 
poll if data is not
-        * available. If 0, returns immediately with any records that are 
available now */
-       private final long pollTimeout;
 
        // 
------------------------------------------------------------------------
 
@@ -151,51 +121,7 @@ public class FlinkKafkaConsumer010<T> extends 
FlinkKafkaConsumerBase<T> {
         *           The properties that are used to configure both the fetcher 
and the offset handler.
         */
        public FlinkKafkaConsumer010(List<String> topics, 
KeyedDeserializationSchema<T> deserializer, Properties props) {
-               super(deserializer);
-
-               checkNotNull(topics, "topics");
-               this.properties = checkNotNull(props, "props");
-               setDeserializer(this.properties);
-
-               // configure the polling timeout
-               try {
-                       if (properties.containsKey(KEY_POLL_TIMEOUT)) {
-                               this.pollTimeout = 
Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT));
-                       } else {
-                               this.pollTimeout = DEFAULT_POLL_TIMEOUT;
-                       }
-               }
-               catch (Exception e) {
-                       throw new IllegalArgumentException("Cannot parse poll 
timeout for '" + KEY_POLL_TIMEOUT + '\'', e);
-               }
-
-               // read the partitions that belong to the listed topics
-               final List<KafkaTopicPartition> partitions = new ArrayList<>();
-
-               try (KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(this.properties)) {
-                       for (final String topic: topics) {
-                               // get partitions for each topic
-                               List<PartitionInfo> partitionsForTopic = 
consumer.partitionsFor(topic);
-                               // for non existing topics, the list might be 
null.
-                               if (partitionsForTopic != null) {
-                                       
partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic));
-                               }
-                       }
-               }
-
-               if (partitions.isEmpty()) {
-                       throw new RuntimeException("Unable to retrieve any 
partitions for the requested topics " + topics);
-               }
-
-               // we now have a list of partitions which is the same for all 
parallel consumer instances.
-               LOG.info("Got {} partitions from these topics: {}", 
partitions.size(), topics);
-
-               if (LOG.isInfoEnabled()) {
-                       logPartitionInfo(LOG, partitions);
-               }
-
-               // register these partitions
-               setSubscribedPartitions(partitions);
+               super(topics, deserializer, props);
        }
 
        @Override
@@ -212,48 +138,5 @@ public class FlinkKafkaConsumer010<T> extends 
FlinkKafkaConsumerBase<T> {
                                watermarksPeriodic, watermarksPunctuated,
                                runtimeContext, deserializer,
                                properties, pollTimeout, useMetrics);
-               
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Utilities 
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Converts a list of Kafka PartitionInfo's to Flink's 
KafkaTopicPartition (which are serializable)
-        * 
-        * @param partitions A list of Kafka PartitionInfos.
-        * @return A list of KafkaTopicPartitions
-        */
-       private static List<KafkaTopicPartition> 
convertToFlinkKafkaTopicPartition(List<PartitionInfo> partitions) {
-               checkNotNull(partitions);
-
-               List<KafkaTopicPartition> ret = new 
ArrayList<>(partitions.size());
-               for (PartitionInfo pi : partitions) {
-                       ret.add(new KafkaTopicPartition(pi.topic(), 
pi.partition()));
-               }
-               return ret;
-       }
-
-       /**
-        * Makes sure that the ByteArrayDeserializer is registered in the Kafka 
properties.
-        * 
-        * @param props The Kafka properties to register the serializer in.
-        */
-       private static void setDeserializer(Properties props) {
-               final String deSerName = 
ByteArrayDeserializer.class.getCanonicalName();
-
-               Object keyDeSer = 
props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
-               Object valDeSer = 
props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
-
-               if (keyDeSer != null && !keyDeSer.equals(deSerName)) {
-                       LOG.warn("Ignoring configured key DeSerializer ({})", 
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
-               }
-               if (valDeSer != null && !valDeSer.equals(deSerName)) {
-                       LOG.warn("Ignoring configured value DeSerializer ({})", 
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
-               }
-
-               props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
deSerName);
-               props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
deSerName);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index 49bce39..cc0194b 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -17,27 +17,123 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.kafka.clients.producer.ProducerRecord;
 
 import java.util.Properties;
 
+import static 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList;
+
 
 /**
- * Flink Sink to produce data into a Kafka topic. This producer is compatible 
with Kafka 0.8.
+ * Flink Sink to produce data into a Kafka topic. This producer is compatible 
with Kafka 0.10.x
+ *
+ * Implementation note: This producer is a hybrid between a regular regular 
sink function (a)
+ * and a custom operator (b).
+ *
+ * For (a), the class implements the SinkFunction and RichFunction interfaces.
+ * For (b), it extends the StreamTask class.
+ *
+ * Details about approach (a):
  *
- * Please note that this producer does not have any reliability guarantees.
+ *  Pre Kafka 0.10 producers only follow approach (a), allowing users to use 
the producer using the
+ *  DataStream.addSink() method.
+ *  Since the APIs exposed in that variant do not allow accessing the the 
timestamp attached to the record
+ *  the Kafka 0.10 producer has a second invocation option, approach (b).
  *
- * @param <IN> Type of the messages to write into Kafka.
+ * Details about approach (b):
+ *  Kafka 0.10 supports writing the timestamp attached to a record to Kafka. 
When adding the
+ *  FlinkKafkaProducer010 using the 
FlinkKafkaProducer010.writeToKafkaWithTimestamps() method, the Kafka producer
+ *  can access the internal record timestamp of the record and write it to 
Kafka.
+ *
+ * All methods and constructors in this class are marked with the approach 
they are needed for.
  */
-public class FlinkKafkaProducer010<IN> extends FlinkKafkaProducerBase<IN> {
+public class FlinkKafkaProducer010<T> extends StreamSink<T> implements 
SinkFunction<T>, RichFunction {
+
+       /**
+        * Flag controlling whether we are writing the Flink record's timestamp 
into Kafka.
+        */
+       private boolean writeTimestampToKafka = false;
+
+       // ---------------------- "Constructors" for timestamp writing 
------------------
+
+       /**
+        * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
+        * the topic.
+        *
+        * This constructor allows writing timestamps to Kafka, it follow 
approach (b) (see above)
+        *
+        * @param inStream The stream to write to Kafka
+        * @param topicId ID of the Kafka topic.
+        * @param serializationSchema User defined serialization schema 
supporting key/value messages
+        * @param producerConfig Properties with the producer configuration.
+        */
+       public static <T> FlinkKafkaProducer010Configuration<T> 
writeToKafkaWithTimestamps(DataStream<T> inStream,
+                                                                               
                                                                                
        String topicId,
+                                                                               
                                                                                
        KeyedSerializationSchema<T> serializationSchema,
+                                                                               
                                                                                
        Properties producerConfig) {
+               return writeToKafkaWithTimestamps(inStream, topicId, 
serializationSchema, producerConfig, new FixedPartitioner<T>());
+       }
+
+
+       /**
+        * Creates a FlinkKafkaProducer for a given topic. the sink produces a 
DataStream to
+        * the topic.
+        *
+        * This constructor allows writing timestamps to Kafka, it follow 
approach (b) (see above)
+        *
+        * @param inStream The stream to write to Kafka
+        * @param topicId ID of the Kafka topic.
+        * @param serializationSchema User defined (keyless) serialization 
schema.
+        * @param producerConfig Properties with the producer configuration.
+        */
+       public static <T> FlinkKafkaProducer010Configuration<T> 
writeToKafkaWithTimestamps(DataStream<T> inStream,
+                                                                               
                                                                                
        String topicId,
+                                                                               
                                                                                
        SerializationSchema<T> serializationSchema,
+                                                                               
                                                                                
        Properties producerConfig) {
+               return writeToKafkaWithTimestamps(inStream, topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new 
FixedPartitioner<T>());
+       }
+
+       /**
+        * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
+        * the topic.
+        *
+        * This constructor allows writing timestamps to Kafka, it follow 
approach (b) (see above)
+        *
+        *  @param inStream The stream to write to Kafka
+        *  @param topicId The name of the target topic
+        *  @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
+        *  @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+        *  @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions.
+        */
+       public static <T> FlinkKafkaProducer010Configuration<T> 
writeToKafkaWithTimestamps(DataStream<T> inStream,
+                                                                               
                                                                                
        String topicId,
+                                                                               
                                                                                
        KeyedSerializationSchema<T> serializationSchema,
+                                                                               
                                                                                
        Properties producerConfig,
+                                                                               
                                                                                
        KafkaPartitioner<T> customPartitioner) {
 
-       private static final long serialVersionUID = 1L;
+               GenericTypeInfo<Object> objectTypeInfo = new 
GenericTypeInfo<>(Object.class);
+               FlinkKafkaProducer010<T> kafkaProducer = new 
FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, 
customPartitioner);
+               SingleOutputStreamOperator<Object> transformation = 
inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer);
+               return new FlinkKafkaProducer010Configuration<>(transformation, 
kafkaProducer);
+       }
 
-       // ------------------- Keyless serialization schema constructors 
----------------------
+       // ---------------------- Regular constructors w/o timestamp support  
------------------
 
        /**
         * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
@@ -50,8 +146,8 @@ public class FlinkKafkaProducer010<IN> extends 
FlinkKafkaProducerBase<IN> {
         * @param serializationSchema
         *                      User defined (keyless) serialization schema.
         */
-       public FlinkKafkaProducer010(String brokerList, String topicId, 
SerializationSchema<IN> serializationSchema) {
-               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), 
getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
+       public FlinkKafkaProducer010(String brokerList, String topicId, 
SerializationSchema<T> serializationSchema) {
+               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), 
getPropertiesFromBrokerList(brokerList), new FixedPartitioner<T>());
        }
 
        /**
@@ -65,8 +161,8 @@ public class FlinkKafkaProducer010<IN> extends 
FlinkKafkaProducerBase<IN> {
         * @param producerConfig
         *                      Properties with the producer configuration.
         */
-       public FlinkKafkaProducer010(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig) {
-               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new 
FixedPartitioner<IN>());
+       public FlinkKafkaProducer010(String topicId, SerializationSchema<T> 
serializationSchema, Properties producerConfig) {
+               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new 
FixedPartitioner<T>());
        }
 
        /**
@@ -78,9 +174,8 @@ public class FlinkKafkaProducer010<IN> extends 
FlinkKafkaProducerBase<IN> {
         * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
         * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
         */
-       public FlinkKafkaProducer010(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig, KafkaPartitioner<IN> 
customPartitioner) {
+       public FlinkKafkaProducer010(String topicId, SerializationSchema<T> 
serializationSchema, Properties producerConfig, KafkaPartitioner<T> 
customPartitioner) {
                this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, 
customPartitioner);
-
        }
 
        // ------------------- Key/Value serialization schema constructors 
----------------------
@@ -96,8 +191,8 @@ public class FlinkKafkaProducer010<IN> extends 
FlinkKafkaProducerBase<IN> {
         * @param serializationSchema
         *                      User defined serialization schema supporting 
key/value messages
         */
-       public FlinkKafkaProducer010(String brokerList, String topicId, 
KeyedSerializationSchema<IN> serializationSchema) {
-               this(topicId, serializationSchema, 
getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
+       public FlinkKafkaProducer010(String brokerList, String topicId, 
KeyedSerializationSchema<T> serializationSchema) {
+               this(topicId, serializationSchema, 
getPropertiesFromBrokerList(brokerList), new FixedPartitioner<T>());
        }
 
        /**
@@ -111,27 +206,193 @@ public class FlinkKafkaProducer010<IN> extends 
FlinkKafkaProducerBase<IN> {
         * @param producerConfig
         *                      Properties with the producer configuration.
         */
-       public FlinkKafkaProducer010(String topicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
-               this(topicId, serializationSchema, producerConfig, new 
FixedPartitioner<IN>());
+       public FlinkKafkaProducer010(String topicId, 
KeyedSerializationSchema<T> serializationSchema, Properties producerConfig) {
+               this(topicId, serializationSchema, producerConfig, new 
FixedPartitioner<T>());
        }
 
        /**
-        * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
-        * the topic.
+        * Create Kafka producer
         *
-        * @param topicId The topic to write data to
-        * @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
-        * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
-        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions.
+        * This constructor does not allow writing timestamps to Kafka, it 
follow approach (a) (see above)
         */
-       public FlinkKafkaProducer010(String topicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, 
KafkaPartitioner<IN> customPartitioner) {
-               super(topicId, serializationSchema, producerConfig, 
customPartitioner);
+       public FlinkKafkaProducer010(String topicId, 
KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, 
KafkaPartitioner<T> customPartitioner) {
+               // We create a Kafka 09 producer instance here and only 
"override" (by intercepting) the
+               // invoke call.
+               super(new FlinkKafkaProducer09<>(topicId, serializationSchema, 
producerConfig, customPartitioner));
+       }
+
+
+       // ----------------------------- Generic element processing  
---------------------------
+
+       private void invokeInternal(T next, long elementTimestamp) throws 
Exception {
+
+               final FlinkKafkaProducerBase<T> internalProducer = 
(FlinkKafkaProducerBase<T>) userFunction;
+
+               internalProducer.checkErroneous();
+
+               byte[] serializedKey = 
internalProducer.schema.serializeKey(next);
+               byte[] serializedValue = 
internalProducer.schema.serializeValue(next);
+               String targetTopic = 
internalProducer.schema.getTargetTopic(next);
+               if (targetTopic == null) {
+                       targetTopic = internalProducer.defaultTopicId;
+               }
+
+               Long timestamp = null;
+               if(this.writeTimestampToKafka) {
+                       timestamp = elementTimestamp;
+               }
+
+               ProducerRecord<byte[], byte[]> record;
+               if (internalProducer.partitioner == null) {
+                       record = new ProducerRecord<>(targetTopic, null, 
timestamp, serializedKey, serializedValue);
+               } else {
+                       record = new ProducerRecord<>(targetTopic, 
internalProducer.partitioner.partition(next, serializedKey, serializedValue, 
internalProducer.partitions.length), timestamp, serializedKey, serializedValue);
+               }
+               if (internalProducer.flushOnCheckpoint) {
+                       synchronized (internalProducer.pendingRecordsLock) {
+                               internalProducer.pendingRecords++;
+                       }
+               }
+               internalProducer.producer.send(record, 
internalProducer.callback);
        }
 
+
+       // ----------------- Helper methods implementing methods from 
SinkFunction and RichFunction (Approach (a)) ----
+
+
+       // ---- Configuration setters
+
+       /**
+        * Defines whether the producer should fail on errors, or only log them.
+        * If this is set to true, then exceptions will be only logged, if set 
to false,
+        * exceptions will be eventually thrown and cause the streaming program 
to
+        * fail (and enter recovery).
+        *
+        * Method is only accessible for approach (a) (see above)
+        *
+        * @param logFailuresOnly The flag to indicate logging-only on 
exceptions.
+        */
+       public void setLogFailuresOnly(boolean logFailuresOnly) {
+               final FlinkKafkaProducerBase<T> internalProducer = 
(FlinkKafkaProducerBase<T>) userFunction;
+               internalProducer.setLogFailuresOnly(logFailuresOnly);
+       }
+
+       /**
+        * If set to true, the Flink producer will wait for all outstanding 
messages in the Kafka buffers
+        * to be acknowledged by the Kafka producer on a checkpoint.
+        * This way, the producer can guarantee that messages in the Kafka 
buffers are part of the checkpoint.
+        *
+        * Method is only accessible for approach (a) (see above)
+        *
+        * @param flush Flag indicating the flushing mode (true = flush on 
checkpoint)
+        */
+       public void setFlushOnCheckpoint(boolean flush) {
+               final FlinkKafkaProducerBase<T> internalProducer = 
(FlinkKafkaProducerBase<T>) userFunction;
+               internalProducer.setFlushOnCheckpoint(flush);
+       }
+
+       /**
+        * This method is used for approach (a) (see above)
+        *
+        */
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               final FlinkKafkaProducerBase<T> internalProducer = 
(FlinkKafkaProducerBase<T>) userFunction;
+               internalProducer.open(parameters);
+       }
+
+       /**
+        * This method is used for approach (a) (see above)
+        */
+       @Override
+       public IterationRuntimeContext getIterationRuntimeContext() {
+               final FlinkKafkaProducerBase<T> internalProducer = 
(FlinkKafkaProducerBase<T>) userFunction;
+               return internalProducer.getIterationRuntimeContext();
+       }
+
+       /**
+        * This method is used for approach (a) (see above)
+        */
        @Override
-       protected void flush() {
-               if (this.producer != null) {
-                       producer.flush();
+       public void setRuntimeContext(RuntimeContext t) {
+               final FlinkKafkaProducerBase<T> internalProducer = 
(FlinkKafkaProducerBase<T>) userFunction;
+               internalProducer.setRuntimeContext(t);
+       }
+
+       /**
+        * Invoke method for using the Sink as DataStream.addSink() sink.
+        *
+        * This method is used for approach (a) (see above)
+        *
+        * @param value The input record.
+        */
+       @Override
+       public void invoke(T value) throws Exception {
+               invokeInternal(value, Long.MAX_VALUE);
+       }
+
+
+       // ----------------- Helper methods and classes implementing methods 
from StreamSink (Approach (b)) ----
+
+
+       /**
+        * Process method for using the sink with timestamp support.
+        *
+        * This method is used for approach (b) (see above)
+        */
+       @Override
+       public void processElement(StreamRecord<T> element) throws Exception {
+               invokeInternal(element.getValue(), element.getTimestamp());
+       }
+
+       /**
+        * Configuration object returned by the writeToKafkaWithTimestamps() 
call.
+        */
+       public static class FlinkKafkaProducer010Configuration<T> extends 
DataStreamSink<T> {
+
+               private final FlinkKafkaProducerBase wrappedProducerBase;
+               private final FlinkKafkaProducer010 producer;
+
+               private FlinkKafkaProducer010Configuration(DataStream stream, 
FlinkKafkaProducer010<T> producer) {
+                       //noinspection unchecked
+                       super(stream, producer);
+                       this.producer = producer;
+                       this.wrappedProducerBase = (FlinkKafkaProducerBase) 
producer.userFunction;
+               }
+
+               /**
+                * Defines whether the producer should fail on errors, or only 
log them.
+                * If this is set to true, then exceptions will be only logged, 
if set to false,
+                * exceptions will be eventually thrown and cause the streaming 
program to
+                * fail (and enter recovery).
+                *
+                * @param logFailuresOnly The flag to indicate logging-only on 
exceptions.
+                */
+               public void setLogFailuresOnly(boolean logFailuresOnly) {
+                       
this.wrappedProducerBase.setLogFailuresOnly(logFailuresOnly);
+               }
+
+               /**
+                * If set to true, the Flink producer will wait for all 
outstanding messages in the Kafka buffers
+                * to be acknowledged by the Kafka producer on a checkpoint.
+                * This way, the producer can guarantee that messages in the 
Kafka buffers are part of the checkpoint.
+                *
+                * @param flush Flag indicating the flushing mode (true = flush 
on checkpoint)
+                */
+               public void setFlushOnCheckpoint(boolean flush) {
+                       this.wrappedProducerBase.setFlushOnCheckpoint(flush);
+               }
+
+               /**
+                * If set to true, Flink will write the (event time) timestamp 
attached to each record into Kafka.
+                * Timestamps must be positive for Kafka to accept them.
+                *
+                * @param writeTimestampToKafka Flag indicating if Flink's 
internal timestamps are written to Kafka.
+                */
+               public void setWriteTimestampToKafka(boolean 
writeTimestampToKafka) {
+                       this.producer.writeTimestampToKafka = 
writeTimestampToKafka;
                }
        }
+
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
index cda68ce..ddf1ad3 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
@@ -28,7 +28,7 @@ import java.util.Properties;
 /**
  * Kafka {@link StreamTableSource} for Kafka 0.10.
  */
-public class Kafka010JsonTableSource extends KafkaJsonTableSource {
+public class Kafka010JsonTableSource extends Kafka09JsonTableSource {
 
        /**
         * Creates a Kafka 0.10 JSON {@link StreamTableSource}.

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
index cee1b90..732440b 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
@@ -28,7 +28,7 @@ import java.util.Properties;
 /**
  * Kafka {@link StreamTableSource} for Kafka 0.10.
  */
-public class Kafka010TableSource extends KafkaTableSource {
+public class Kafka010TableSource extends Kafka09TableSource {
 
        /**
         * Creates a Kafka 0.10 {@link StreamTableSource}.

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
index 70f530b..47bee22 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
@@ -18,37 +18,20 @@
 
 package org.apache.flink.streaming.connectors.kafka.internal;
 
-import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
-import org.apache.flink.streaming.connectors.kafka.internals.ExceptionProxy;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
-import 
org.apache.flink.streaming.connectors.kafka.internals.metrics.DefaultKafkaMetricAccumulator;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.util.SerializedValue;
 
-import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 
 /**
@@ -56,40 +39,7 @@ import java.util.Properties;
  * 
  * @param <T> The type of elements produced by the fetcher.
  */
-public class Kafka010Fetcher<T> extends AbstractFetcher<T, TopicPartition> 
implements Runnable {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(Kafka010Fetcher.class);
-
-       // 
------------------------------------------------------------------------
-
-       /** The schema to convert between Kafka's byte messages, and Flink's 
objects */
-       private final KeyedDeserializationSchema<T> deserializer;
-
-       /** The subtask's runtime context */
-       private final RuntimeContext runtimeContext;
-
-       /** The configuration for the Kafka consumer */
-       private final Properties kafkaProperties;
-
-       /** The maximum number of milliseconds to wait for a fetch batch */
-       private final long pollTimeout;
-
-       /** Flag whether to register Kafka metrics as Flink accumulators */
-       private final boolean forwardKafkaMetrics;
-
-       /** Mutex to guard against concurrent access to the non-threadsafe 
Kafka consumer */
-       private final Object consumerLock = new Object();
-
-       /** Reference to the Kafka consumer, once it is created */
-       private volatile KafkaConsumer<byte[], byte[]> consumer;
-
-       /** Reference to the proxy, forwarding exceptions from the fetch thread 
to the main thread */
-       private volatile ExceptionProxy errorHandler;
-
-       /** Flag to mark the main work loop as alive */
-       private volatile boolean running = true;
-
-       // 
------------------------------------------------------------------------
+public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
 
        public Kafka010Fetcher(
                        SourceContext<T> sourceContext,
@@ -100,213 +50,47 @@ public class Kafka010Fetcher<T> extends 
AbstractFetcher<T, TopicPartition> imple
                        KeyedDeserializationSchema<T> deserializer,
                        Properties kafkaProperties,
                        long pollTimeout,
-                       boolean forwardKafkaMetrics) throws Exception
+                       boolean useMetrics) throws Exception
        {
-               super(sourceContext, assignedPartitions, watermarksPeriodic, 
watermarksPunctuated, runtimeContext);
-
-               this.deserializer = deserializer;
-               this.runtimeContext = runtimeContext;
-               this.kafkaProperties = kafkaProperties;
-               this.pollTimeout = pollTimeout;
-               this.forwardKafkaMetrics = forwardKafkaMetrics;
-
-               // if checkpointing is enabled, we are not automatically 
committing to Kafka.
-               
kafkaProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
-                               
Boolean.toString(!runtimeContext.isCheckpointingEnabled()));
+               super(sourceContext, assignedPartitions, watermarksPeriodic, 
watermarksPunctuated, runtimeContext, deserializer, kafkaProperties, 
pollTimeout, useMetrics);
        }
 
-       // 
------------------------------------------------------------------------
-       //  Fetcher work methods
-       // 
------------------------------------------------------------------------
-
        @Override
-       public void runFetchLoop() throws Exception {
-               this.errorHandler = new ExceptionProxy(Thread.currentThread());
-
-               // rather than running the main fetch loop directly here, we 
spawn a dedicated thread
-               // this makes sure that no interrupt() call upon canceling 
reaches the Kafka consumer code
-               Thread runner = new Thread(this, "Kafka 0.10 Fetcher for " + 
runtimeContext.getTaskNameWithSubtasks());
-               runner.setDaemon(true);
-               runner.start();
-
-               try {
-                       runner.join();
-               } catch (InterruptedException e) {
-                       // may be the result of a wake-up after an exception. 
we ignore this here and only
-                       // restore the interruption state
-                       Thread.currentThread().interrupt();
-               }
-
-               // make sure we propagate any exception that occurred in the 
concurrent fetch thread,
-               // before leaving this method
-               this.errorHandler.checkAndThrowException();
+       protected void assignPartitionsToConsumer(KafkaConsumer<byte[], byte[]> 
consumer, List<TopicPartition> topicPartitions) {
+               consumer.assign(topicPartitions);
        }
 
        @Override
-       public void cancel() {
-               // flag the main thread to exit
-               running = false;
-
-               // NOTE:
-               //   - We cannot interrupt the runner thread, because the Kafka 
consumer may
-               //     deadlock when the thread is interrupted while in certain 
methods
-               //   - We cannot call close() on the consumer, because it will 
actually throw
-               //     an exception if a concurrent call is in progress
-
-               // make sure the consumer finds out faster that we are shutting 
down 
-               if (consumer != null) {
-                       consumer.wakeup();
-               }
+       protected void emitRecord(T record, 
KafkaTopicPartitionState<TopicPartition> partition, long offset, ConsumerRecord 
consumerRecord) throws Exception {
+               // get timestamp from provided ConsumerRecord (only possible 
with kafka 0.10.x)
+               super.emitRecord(record, partition, offset, 
consumerRecord.timestamp());
        }
 
+       /**
+        * Emit record Kafka-timestamp aware.
+        */
        @Override
-       public void run() {
-               // This method initializes the KafkaConsumer and guarantees it 
is torn down properly.
-               // This is important, because the consumer has multi-threading 
issues,
-               // including concurrent 'close()' calls.
-
-               final KafkaConsumer<byte[], byte[]> consumer;
-               try {
-                       consumer = new KafkaConsumer<>(kafkaProperties);
-               }
-               catch (Throwable t) {
-                       running = false;
-                       errorHandler.reportError(t);
-                       return;
-               }
-
-               // from here on, the consumer will be closed properly
-               try {
-                       
consumer.assign(convertKafkaPartitions(subscribedPartitions()));
-
-                       // register Kafka metrics to Flink accumulators
-                       if (forwardKafkaMetrics) {
-                               Map<MetricName, ? extends Metric> metrics = 
consumer.metrics();
-                               if (metrics == null) {
-                                       // MapR's Kafka implementation returns 
null here.
-                                       LOG.info("Consumer implementation does 
not support metrics");
-                               } else {
-                                       // we have metrics, register them where 
possible
-                                       for (Map.Entry<MetricName, ? extends 
Metric> metric : metrics.entrySet()) {
-                                               String name = "KafkaConsumer-" 
+ metric.getKey().name();
-                                               DefaultKafkaMetricAccumulator 
kafkaAccumulator =
-                                                               
DefaultKafkaMetricAccumulator.createFor(metric.getValue());
-
-                                               // best effort: we only add the 
accumulator if available.
-                                               if (kafkaAccumulator != null) {
-                                                       
runtimeContext.addAccumulator(name, kafkaAccumulator);
-                                               }
-                                       }
-                               }
-                       }
-
-                       // seek the consumer to the initial offsets
-                       for (KafkaTopicPartitionState<TopicPartition> partition 
: subscribedPartitions()) {
-                               if (partition.isOffsetDefined()) {
-                                       
consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
-                               }
-                       }
-
-                       // from now on, external operations may call the 
consumer
-                       this.consumer = consumer;
-
-                       // main fetch loop
-                       while (running) {
-                               // get the next batch of records
-                               final ConsumerRecords<byte[], byte[]> records;
-                               synchronized (consumerLock) {
-                                       try {
-                                               records = 
consumer.poll(pollTimeout);
-                                       }
-                                       catch (WakeupException we) {
-                                               if (running) {
-                                                       throw we;
-                                               } else {
-                                                       continue;
-                                               }
-                                       }
-                               }
-
-                               // get the records for each topic partition
-                               for (KafkaTopicPartitionState<TopicPartition> 
partition : subscribedPartitions()) {
-                                       
-                                       List<ConsumerRecord<byte[], byte[]>> 
partitionRecords = records.records(partition.getKafkaPartitionHandle());
-
-                                       for (ConsumerRecord<byte[], byte[]> 
record : partitionRecords) {
-                                               T value = 
deserializer.deserialize(
-                                                               record.key(), 
record.value(),
-                                                               record.topic(), 
record.partition(), record.offset());
-
-                                               if 
(deserializer.isEndOfStream(value)) {
-                                                       // end of stream 
signaled
-                                                       running = false;
-                                                       break;
-                                               }
-
-                                               // emit the actual record. this 
also update offset state atomically
-                                               // and deals with timestamps 
and watermark generation
-                                               emitRecord(value, partition, 
record.offset());
-                                       }
-                               }
+       protected void emitRecord(T record, 
KafkaTopicPartitionState<TopicPartition> partitionState, long offset, long 
timestamp) throws Exception {
+               if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
+                       // fast path logic, in case there are no watermarks
+
+                       // emit the record, using the checkpoint lock to 
guarantee
+                       // atomicity of record emission and offset state update
+                       synchronized (checkpointLock) {
+                               sourceContext.collectWithTimestamp(record, 
timestamp);
+                               partitionState.setOffset(offset);
                        }
-                       // end main fetch loop
                }
-               catch (Throwable t) {
-                       if (running) {
-                               running = false;
-                               errorHandler.reportError(t);
-                       } else {
-                               LOG.debug("Stopped ConsumerThread threw 
exception", t);
-                       }
+               else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
+                       emitRecordWithTimestampAndPeriodicWatermark(record, 
partitionState, offset, timestamp);
                }
-               finally {
-                       try {
-                               synchronized (consumerLock) {
-                                       consumer.close();
-                               }
-                       } catch (Throwable t) {
-                               LOG.warn("Error while closing Kafka 0.10 
consumer", t);
-                       }
+               else {
+                       emitRecordWithTimestampAndPunctuatedWatermark(record, 
partitionState, offset, timestamp);
                }
        }
 
-       // 
------------------------------------------------------------------------
-       //  Kafka 0.10 specific fetcher behavior
-       // 
------------------------------------------------------------------------
-
        @Override
-       public TopicPartition createKafkaPartitionHandle(KafkaTopicPartition 
partition) {
-               return new TopicPartition(partition.getTopic(), 
partition.getPartition());
-       }
-
-       @Override
-       public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> 
offsets) throws Exception {
-               KafkaTopicPartitionState<TopicPartition>[] partitions = 
subscribedPartitions();
-               Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new 
HashMap<>(partitions.length);
-
-               for (KafkaTopicPartitionState<TopicPartition> partition : 
partitions) {
-                       Long offset = 
offsets.get(partition.getKafkaTopicPartition());
-                       if (offset != null) {
-                               
offsetsToCommit.put(partition.getKafkaPartitionHandle(), new 
OffsetAndMetadata(offset, ""));
-                       }
-               }
-
-               if (this.consumer != null) {
-                       synchronized (consumerLock) {
-                               this.consumer.commitSync(offsetsToCommit);
-                       }
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-
-       public static Collection<TopicPartition> 
convertKafkaPartitions(KafkaTopicPartitionState<TopicPartition>[] partitions) {
-               ArrayList<TopicPartition> result = new 
ArrayList<>(partitions.length);
-               for (KafkaTopicPartitionState<TopicPartition> p : partitions) {
-                       result.add(p.getKafkaPartitionHandle());
-               }
-               return result;
+       protected String getFetcherName() {
+               return "Kafka 0.10 Fetcher";
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
index 5427853..28bf6d5 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
@@ -17,14 +17,32 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import 
org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
 import org.junit.Test;
 
-import java.util.UUID;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import javax.annotation.Nullable;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
 
 
 public class Kafka010ITCase extends KafkaConsumerTestBase {
@@ -33,10 +51,6 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
        //  Suite of Tests
        // 
------------------------------------------------------------------------
 
-       @Override
-       public String getExpectedKafkaVersion() {
-               return "0.10";
-       }
 
        @Test(timeout = 60000)
        public void testFailOnNoBroker() throws Exception {
@@ -48,16 +62,6 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
                runSimpleConcurrentProducerConsumerTopology();
        }
 
-//     @Test(timeout = 60000)
-//     public void testPunctuatedExplicitWMConsumer() throws Exception {
-//             runExplicitPunctuatedWMgeneratingConsumerTest(false);
-//     }
-
-//     @Test(timeout = 60000)
-//     public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws 
Exception {
-//             runExplicitPunctuatedWMgeneratingConsumerTest(true);
-//     }
-
        @Test(timeout = 60000)
        public void testKeyValueSupport() throws Exception {
                runKeyValueTest();
@@ -124,68 +128,168 @@ public class Kafka010ITCase extends 
KafkaConsumerTestBase {
 
        @Test(timeout = 60000)
        public void testMetricsAndEndOfStream() throws Exception {
-               runMetricsAndEndOfStreamTest();
-       }
-
-       @Test
-       public void testJsonTableSource() throws Exception {
-               String topic = UUID.randomUUID().toString();
-
-               // Names and types are determined in the actual test method of 
the
-               // base test class.
-               Kafka010JsonTableSource tableSource = new 
Kafka010JsonTableSource(
-                               topic,
-                               standardProps,
-                               new String[] {
-                                               "long",
-                                               "string",
-                                               "boolean",
-                                               "double",
-                                               "missing-field"},
-                               new TypeInformation<?>[] {
-                                               BasicTypeInfo.LONG_TYPE_INFO,
-                                               BasicTypeInfo.STRING_TYPE_INFO,
-                                               BasicTypeInfo.BOOLEAN_TYPE_INFO,
-                                               BasicTypeInfo.DOUBLE_TYPE_INFO,
-                                               BasicTypeInfo.LONG_TYPE_INFO });
-
-               // Don't fail on missing field, but set to null (default)
-               tableSource.setFailOnMissingField(false);
-
-               runJsonTableSource(topic, tableSource);
-       }
-
-       @Test
-       public void testJsonTableSourceWithFailOnMissingField() throws 
Exception {
-               String topic = UUID.randomUUID().toString();
-
-               // Names and types are determined in the actual test method of 
the
-               // base test class.
-               Kafka010JsonTableSource tableSource = new 
Kafka010JsonTableSource(
-                               topic,
-                               standardProps,
-                               new String[] {
-                                               "long",
-                                               "string",
-                                               "boolean",
-                                               "double",
-                                               "missing-field"},
-                               new TypeInformation<?>[] {
-                                               BasicTypeInfo.LONG_TYPE_INFO,
-                                               BasicTypeInfo.STRING_TYPE_INFO,
-                                               BasicTypeInfo.BOOLEAN_TYPE_INFO,
-                                               BasicTypeInfo.DOUBLE_TYPE_INFO,
-                                               BasicTypeInfo.LONG_TYPE_INFO });
-
-               // Don't fail on missing field, but set to null (default)
-               tableSource.setFailOnMissingField(true);
-
-               try {
-                       runJsonTableSource(topic, tableSource);
-                       fail("Did not throw expected Exception");
-               } catch (Exception e) {
-                       Throwable rootCause = 
e.getCause().getCause().getCause();
-                       assertTrue("Unexpected root cause", rootCause 
instanceof IllegalStateException);
+               runEndOfStreamTest();
+       }
+
+       /**
+        * Kafka 0.10 specific test, ensuring Timestamps are properly written 
to and read from Kafka
+        */
+       @Test(timeout = 60000)
+       public void testTimestamps() throws Exception {
+
+               final String topic = "tstopic";
+               createTestTopic(topic, 3, 1);
+
+               // ---------- Produce an event time stream into Kafka 
-------------------
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               env.setParallelism(1);
+               
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+               env.getConfig().disableSysoutLogging();
+               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+               DataStream<Long> streamWithTimestamps = env.addSource(new 
SourceFunction<Long>() {
+                       boolean running = true;
+
+                       @Override
+                       public void run(SourceContext<Long> ctx) throws 
Exception {
+                               long i = 0;
+                               while(running) {
+                                       ctx.collectWithTimestamp(i, i*2);
+                                       if(i++ == 1000L) {
+                                               running = false;
+                                       }
+                               }
+                       }
+
+                       @Override
+                       public void cancel() {
+                               running = false;
+                       }
+               });
+
+               final TypeInformationSerializationSchema<Long> longSer = new 
TypeInformationSerializationSchema<>(TypeInfoParser.<Long>parse("Long"), 
env.getConfig());
+               FlinkKafkaProducer010.FlinkKafkaProducer010Configuration prod = 
FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, 
new KeyedSerializationSchemaWrapper<>(longSer), standardProps, new 
KafkaPartitioner<Long>() {
+                       @Override
+                       public int partition(Long next, byte[] serializedKey, 
byte[] serializedValue, int numPartitions) {
+                               return (int)(next % 3);
+                       }
+               });
+               prod.setParallelism(3);
+               prod.setWriteTimestampToKafka(true);
+               env.execute("Produce some");
+
+               // ---------- Consume stream from Kafka -------------------
+
+               env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               env.setParallelism(1);
+               
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+               env.getConfig().disableSysoutLogging();
+               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+               FlinkKafkaConsumer010<Long> kafkaSource = new 
FlinkKafkaConsumer010<>(topic, new LimitedLongDeserializer(), standardProps);
+               kafkaSource.assignTimestampsAndWatermarks(new 
AssignerWithPunctuatedWatermarks<Long>() {
+                       @Nullable
+                       @Override
+                       public Watermark checkAndGetNextWatermark(Long 
lastElement, long extractedTimestamp) {
+                               if(lastElement % 10 == 0) {
+                                       return new Watermark(lastElement);
+                               }
+                               return null;
+                       }
+
+                       @Override
+                       public long extractTimestamp(Long element, long 
previousElementTimestamp) {
+                               return previousElementTimestamp;
+                       }
+               });
+
+               DataStream<Long> stream = env.addSource(kafkaSource);
+               GenericTypeInfo<Object> objectTypeInfo = new 
GenericTypeInfo<>(Object.class);
+               stream.transform("timestamp validating operator", 
objectTypeInfo, new TimestampValidatingOperator()).setParallelism(1);
+
+               env.execute("Consume again");
+
+               deleteTestTopic(topic);
+       }
+
+       private static class TimestampValidatingOperator extends 
StreamSink<Long> {
+
+               public TimestampValidatingOperator() {
+                       super(new SinkFunction<Long>() {
+                               @Override
+                               public void invoke(Long value) throws Exception 
{
+                                       throw new 
RuntimeException("Unexpected");
+                               }
+                       });
+               }
+
+               long elCount = 0;
+               long wmCount = 0;
+               long lastWM = Long.MIN_VALUE;
+
+               @Override
+               public void processElement(StreamRecord<Long> element) throws 
Exception {
+                       elCount++;
+                       if(element.getValue() * 2 != element.getTimestamp()) {
+                               throw new RuntimeException("Invalid timestamp: 
" + element);
+                       }
+               }
+
+               @Override
+               public void processWatermark(Watermark mark) throws Exception {
+                       wmCount++;
+
+                       if(lastWM <= mark.getTimestamp()) {
+                               lastWM = mark.getTimestamp();
+                       } else {
+                               throw new RuntimeException("Received watermark 
higher than the last one");
+                       }
+
+                       if( mark.getTimestamp() % 10 != 0 && 
mark.getTimestamp() != Long.MAX_VALUE ) {
+                               throw new RuntimeException("Invalid watermark: 
" + mark.getTimestamp());
+                       }
+               }
+
+               @Override
+               public void close() throws Exception {
+                       super.close();
+                       if(elCount != 1000L) {
+                               throw new RuntimeException("Wrong final element 
count " + elCount);
+                       }
+
+                       if(wmCount <= 2) {
+                               throw new RuntimeException("Almost no 
watermarks have been sent " + wmCount);
+                       }
+               }
+       }
+
+       private static class LimitedLongDeserializer implements 
KeyedDeserializationSchema<Long> {
+
+               private final TypeInformation<Long> ti;
+               private final TypeSerializer<Long> ser;
+               long cnt = 0;
+
+               public LimitedLongDeserializer() {
+                       this.ti = TypeInfoParser.parse("Long");
+                       this.ser = ti.createSerializer(new ExecutionConfig());
+               }
+               @Override
+               public TypeInformation<Long> getProducedType() {
+                       return ti;
+               }
+
+               @Override
+               public Long deserialize(byte[] messageKey, byte[] message, 
String topic, int partition, long offset) throws IOException {
+                       cnt++;
+                       DataInputView in = new DataInputViewStreamWrapper(new 
ByteArrayInputStream(message));
+                       Long e = ser.deserialize(in);
+                       return e;
+               }
+
+               @Override
+               public boolean isEndOfStream(Long nextElement) {
+                       return cnt > 1000L;
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
deleted file mode 100644
index 5f5ac63..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.configuration.Configuration;
-import 
org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import org.apache.flink.util.TestLogger;
-
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.PartitionInfo;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.Collections;
-import java.util.Properties;
-import java.util.concurrent.Future;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(FlinkKafkaProducerBase.class)
-public class KafkaProducerTest extends TestLogger {
-       
-       @Test
-       @SuppressWarnings("unchecked")
-       public void testPropagateExceptions() {
-               try {
-                       // mock kafka producer
-                       KafkaProducer<?, ?> kafkaProducerMock = 
mock(KafkaProducer.class);
-                       
-                       // partition setup
-                       
when(kafkaProducerMock.partitionsFor(anyString())).thenReturn(
-                                       Collections.singletonList(new 
PartitionInfo("mock_topic", 42, null, null, null)));
-
-                       // failure when trying to send an element
-                       when(kafkaProducerMock.send(any(ProducerRecord.class), 
any(Callback.class)))
-                               .thenAnswer(new 
Answer<Future<RecordMetadata>>() {
-                                       @Override
-                                       public Future<RecordMetadata> 
answer(InvocationOnMock invocation) throws Throwable {
-                                               Callback callback = (Callback) 
invocation.getArguments()[1];
-                                               callback.onCompletion(null, new 
Exception("Test error"));
-                                               return null;
-                                       }
-                               });
-                       
-                       // make sure the FlinkKafkaProducer instantiates our 
mock producer
-                       
whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock);
-                       
-                       // (1) producer that propagates errors
-
-                       FlinkKafkaProducer010<String> producerPropagating = new 
FlinkKafkaProducer010<>(
-                                       "mock_topic", new SimpleStringSchema(), 
new Properties(), null);
-
-                       producerPropagating.setRuntimeContext(new 
MockRuntimeContext(17, 3));
-                       producerPropagating.open(new Configuration());
-                       
-                       try {
-                               producerPropagating.invoke("value");
-                               producerPropagating.invoke("value");
-                               fail("This should fail with an exception");
-                       }
-                       catch (Exception e) {
-                               assertNotNull(e.getCause());
-                               assertNotNull(e.getCause().getMessage());
-                               
assertTrue(e.getCause().getMessage().contains("Test error"));
-                       }
-
-                       // (2) producer that only logs errors
-
-                       FlinkKafkaProducer010<String> producerLogging = new 
FlinkKafkaProducer010<>(
-                                       "mock_topic", new SimpleStringSchema(), 
new Properties(), null);
-                       producerLogging.setLogFailuresOnly(true);
-                       
-                       producerLogging.setRuntimeContext(new 
MockRuntimeContext(17, 3));
-                       producerLogging.open(new Configuration());
-
-                       producerLogging.invoke("value");
-                       producerLogging.invoke("value");
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 45f0478..af6d254 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -28,6 +28,8 @@ import kafka.utils.ZkUtils;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
@@ -64,6 +66,9 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
        private String brokerConnectionString = "";
        private Properties standardProps;
        private Properties additionalServerProperties;
+       private boolean secureMode = false;
+       // 6 seconds is default. Seems to be too small for travis. 30 seconds
+       private int zkTimeout = 30000;
 
        public String getBrokerConnectionString() {
                return brokerConnectionString;
@@ -75,6 +80,22 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
        }
 
        @Override
+       public Properties getSecureProperties() {
+               Properties prop = new Properties();
+               if(secureMode) {
+                       prop.put("security.inter.broker.protocol", 
"SASL_PLAINTEXT");
+                       prop.put("security.protocol", "SASL_PLAINTEXT");
+                       prop.put("sasl.kerberos.service.name", "kafka");
+
+                       //add special timeout for Travis
+                       prop.setProperty("zookeeper.session.timeout.ms", 
String.valueOf(zkTimeout));
+                       prop.setProperty("zookeeper.connection.timeout.ms", 
String.valueOf(zkTimeout));
+                       prop.setProperty("metadata.fetch.timeout.ms","120000");
+               }
+               return prop;
+       }
+
+       @Override
        public String getVersion() {
                return "0.10";
        }
@@ -90,10 +111,13 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
        }
 
        @Override
-       public <T> FlinkKafkaProducerBase<T> getProducer(String topic, 
KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> 
partitioner) {
+       public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, 
String topic, KeyedSerializationSchema<T> serSchema, Properties props, 
KafkaPartitioner<T> partitioner) {
                FlinkKafkaProducer010<T> prod = new 
FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
                prod.setFlushOnCheckpoint(true);
-               return prod;
+               return stream.addSink(prod);
+       /*      FlinkKafkaProducer010.FlinkKafkaProducer010Configuration<T> 
sink = FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, topic, 
serSchema, props, partitioner);
+               sink.setFlushOnCheckpoint(true);
+               return sink; */
        }
 
        @Override
@@ -130,8 +154,21 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
        }
 
        @Override
-       public void prepare(int numKafkaServers, Properties 
additionalServerProperties) {
+       public boolean isSecureRunSupported() {
+               return true;
+       }
+
+       @Override
+       public void prepare(int numKafkaServers, Properties 
additionalServerProperties, boolean secureMode) {
+               //increase the timeout since in Travis ZK connection takes long 
time for secure connection.
+               if(secureMode) {
+                       //run only one kafka server to avoid multiple ZK 
connections from many instances - Travis timeout
+                       numKafkaServers = 1;
+                       zkTimeout = zkTimeout * 15;
+               }
+
                this.additionalServerProperties = additionalServerProperties;
+               this.secureMode = secureMode;
                File tempDir = new File(System.getProperty("java.io.tmpdir"));
 
                tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + 
(UUID.randomUUID().toString()));
@@ -151,9 +188,9 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                brokers = null;
 
                try {
-                       LOG.info("Starting Zookeeper");
-                       zookeeper = new TestingServer(-1, tmpZkDir);
+                       zookeeper = new TestingServer(- 1, tmpZkDir);
                        zookeeperConnectionString = 
zookeeper.getConnectString();
+                       LOG.info("Starting Zookeeper with 
zookeeperConnectionString: {}", zookeeperConnectionString);
 
                        LOG.info("Starting KafkaServer");
                        brokers = new ArrayList<>(numKafkaServers);
@@ -161,8 +198,11 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                        for (int i = 0; i < numKafkaServers; i++) {
                                brokers.add(getKafkaServer(i, 
tmpKafkaDirs.get(i)));
 
-                               SocketServer socketServer = 
brokers.get(i).socketServer();
-                               brokerConnectionString += 
hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, 
brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
+                               if(secureMode) {
+                                       brokerConnectionString += 
hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, 
brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ",";
+                               } else {
+                                       brokerConnectionString += 
hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, 
brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
+                               }
                        }
 
                        LOG.info("ZK and KafkaServer started.");
@@ -177,8 +217,8 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                standardProps.setProperty("bootstrap.servers", 
brokerConnectionString);
                standardProps.setProperty("group.id", "flink-tests");
                standardProps.setProperty("auto.commit.enable", "false");
-               standardProps.setProperty("zookeeper.session.timeout.ms", 
"30000"); // 6 seconds is default. Seems to be too small for travis.
-               standardProps.setProperty("zookeeper.connection.timeout.ms", 
"30000");
+               standardProps.setProperty("zookeeper.session.timeout.ms", 
String.valueOf(zkTimeout));
+               standardProps.setProperty("zookeeper.connection.timeout.ms", 
String.valueOf(zkTimeout));
                standardProps.setProperty("auto.offset.reset", "earliest"); // 
read from the beginning. (earliest is kafka 0.10 value)
                standardProps.setProperty("fetch.message.max.bytes", "256"); // 
make a lot of fetches (MESSAGES MUST BE SMALLER!)
        }
@@ -244,7 +284,14 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                final long deadline = System.currentTimeMillis() + 30000;
                do {
                        try {
-                               Thread.sleep(100);
+                               if(secureMode) {
+                                       //increase wait time since in Travis ZK 
timeout occurs frequently
+                                       int wait = zkTimeout / 100;
+                                       LOG.info("waiting for {} msecs before 
the topic {} can be checked", wait, topic);
+                                       Thread.sleep(wait);
+                               } else {
+                                       Thread.sleep(100);
+                               }
                        } catch (InterruptedException e) {
                                // restore interrupted state
                        }
@@ -295,8 +342,8 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                kafkaProperties.put("replica.fetch.max.bytes", 
String.valueOf(50 * 1024 * 1024));
 
                // for CI stability, increase zookeeper session timeout
-               kafkaProperties.put("zookeeper.session.timeout.ms", "30000");
-               kafkaProperties.put("zookeeper.connection.timeout.ms", "30000");
+               kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout);
+               kafkaProperties.put("zookeeper.connection.timeout.ms", 
zkTimeout);
                if(additionalServerProperties != null) {
                        kafkaProperties.putAll(additionalServerProperties);
                }
@@ -306,6 +353,15 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                for (int i = 1; i <= numTries; i++) {
                        int kafkaPort = NetUtils.getAvailablePort();
                        kafkaProperties.put("port", 
Integer.toString(kafkaPort));
+
+                       //to support secure kafka cluster
+                       if(secureMode) {
+                               LOG.info("Adding Kafka secure configurations");
+                               kafkaProperties.put("listeners", 
"SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
+                               kafkaProperties.put("advertised.listeners", 
"SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
+                               kafkaProperties.putAll(getSecureProperties());
+                       }
+
                        KafkaConfig kafkaConfig = new 
KafkaConfig(kafkaProperties);
 
                        try {

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
index 35e491a..1302348 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
@@ -376,7 +376,7 @@ class SimpleConsumerThread<T> extends Thread {
                                                                continue 
partitionsLoop;
                                                        }
                                                        
-                                                       owner.emitRecord(value, 
currentPartition, offset);
+                                                       owner.emitRecord(value, 
currentPartition, offset, Long.MIN_VALUE);
                                                }
                                                else {
                                                        // no longer running

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index cbf3d06..a0d5002 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -31,6 +31,9 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.test.TestingServer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
@@ -101,10 +104,10 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
        }
 
        @Override
-       public <T> FlinkKafkaProducerBase<T> getProducer(String topic, 
KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> 
partitioner) {
+       public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, 
String topic, KeyedSerializationSchema<T> serSchema, Properties props, 
KafkaPartitioner<T> partitioner) {
                FlinkKafkaProducer08<T> prod = new 
FlinkKafkaProducer08<>(topic, serSchema, props, partitioner);
                prod.setFlushOnCheckpoint(true);
-               return prod;
+               return stream.addSink(prod);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
index 9708777..a97476a 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
@@ -81,11 +81,11 @@ public class FlinkKafkaConsumer09<T> extends 
FlinkKafkaConsumerBase<T> {
        // 
------------------------------------------------------------------------
 
        /** User-supplied properties for Kafka **/
-       private final Properties properties;
+       protected final Properties properties;
 
        /** From Kafka's Javadoc: The time, in milliseconds, spent waiting in 
poll if data is not
         * available. If 0, returns immediately with any records that are 
available now */
-       private final long pollTimeout;
+       protected final long pollTimeout;
 
        // 
------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
index eb3440a..2a3e39d 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
@@ -27,7 +27,7 @@ import java.util.Properties;
 
 
 /**
- * Flink Sink to produce data into a Kafka topic. This producer is compatible 
with Kafka 0.8.
+ * Flink Sink to produce data into a Kafka topic. This producer is compatible 
with Kafka 0.9.
  *
  * Please note that this producer does not have any reliability guarantees.
  *

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index aaec9dc..37e40fc 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -131,7 +131,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, 
TopicPartition> implem
 
                // rather than running the main fetch loop directly here, we 
spawn a dedicated thread
                // this makes sure that no interrupt() call upon canceling 
reaches the Kafka consumer code
-               Thread runner = new Thread(this, "Kafka 0.9 Fetcher for " + 
runtimeContext.getTaskNameWithSubtasks());
+               Thread runner = new Thread(this, getFetcherName() + " for " + 
runtimeContext.getTaskNameWithSubtasks());
                runner.setDaemon(true);
                runner.start();
 
@@ -183,7 +183,8 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, 
TopicPartition> implem
 
                // from here on, the consumer will be closed properly
                try {
-                       
consumer.assign(convertKafkaPartitions(subscribedPartitions()));
+                       assignPartitionsToConsumer(consumer, 
convertKafkaPartitions(subscribedPartitions()));
+
 
                        if (useMetrics) {
                                final MetricGroup kafkaMetricGroup = 
runtimeContext.getMetricGroup().addGroup("KafkaConsumer");
@@ -250,7 +251,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, 
TopicPartition> implem
 
                                                // emit the actual record. this 
also update offset state atomically
                                                // and deals with timestamps 
and watermark generation
-                                               emitRecord(value, partition, 
record.offset());
+                                               emitRecord(value, partition, 
record.offset(), record);
                                        }
                                }
                        }
@@ -274,6 +275,21 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, 
TopicPartition> implem
                }
        }
 
+       // Kafka09Fetcher ignores the timestamp, Kafka010Fetcher is extracting 
the timestamp and passing it to the emitRecord() method.
+       protected void emitRecord(T record, 
KafkaTopicPartitionState<TopicPartition> partition, long offset, ConsumerRecord 
consumerRecord) throws Exception {
+               emitRecord(record, partition, offset, Long.MIN_VALUE);
+       }
+       /**
+        * Protected method to make the partition assignment pluggable, for 
different Kafka versions.
+        */
+       protected void assignPartitionsToConsumer(KafkaConsumer<byte[], byte[]> 
consumer, List<TopicPartition> topicPartitions) {
+               consumer.assign(topicPartitions);
+       }
+
+       protected String getFetcherName() {
+               return "Kafka 0.9 Fetcher";
+       }
+
        // 
------------------------------------------------------------------------
        //  Kafka 0.9 specific fetcher behavior
        // 
------------------------------------------------------------------------

Reply via email to