Repository: flink
Updated Branches:
  refs/heads/master a079259f3 -> 6731ec1e4


http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 213ba4a..9d8fa9a 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -29,6 +29,8 @@ import kafka.utils.ZkUtils;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
@@ -94,10 +96,10 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
        }
 
        @Override
-       public <T> FlinkKafkaProducerBase<T> getProducer(String topic, 
KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> 
partitioner) {
+       public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, 
String topic, KeyedSerializationSchema<T> serSchema, Properties props, 
KafkaPartitioner<T> partitioner) {
                FlinkKafkaProducer09<T> prod = new 
FlinkKafkaProducer09<>(topic, serSchema, props, partitioner);
                prod.setFlushOnCheckpoint(true);
-               return prod;
+               return stream.addSink(prod);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 5b18c75..05028e6 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -21,6 +21,7 @@ import org.apache.commons.collections.map.LinkedMap;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.CheckpointListener;
@@ -36,6 +37,7 @@ import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.util.SerializedValue;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -169,6 +171,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
                        throw new IllegalStateException("A periodic watermark 
emitter has already been set.");
                }
                try {
+                       ClosureCleaner.clean(assigner, true);
                        this.punctuatedWatermarkAssigner = new 
SerializedValue<>(assigner);
                        return this;
                } catch (Exception e) {
@@ -203,6 +206,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
                        throw new IllegalStateException("A punctuated watermark 
emitter has already been set.");
                }
                try {
+                       ClosureCleaner.clean(assigner, true);
                        this.periodicWatermarkAssigner = new 
SerializedValue<>(assigner);
                        return this;
                } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index f0975dc..26a695e 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -105,7 +105,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN> im
        /**
         * If true, the producer will wait until all outstanding records have 
been send to the broker.
         */
-       private boolean flushOnCheckpoint;
+       protected boolean flushOnCheckpoint;
        
        // -------------------------------- Runtime fields 
------------------------------------------
 
@@ -139,7 +139,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN> im
                requireNonNull(defaultTopicId, "TopicID not set");
                requireNonNull(serializationSchema, "serializationSchema not 
set");
                requireNonNull(producerConfig, "producerConfig not set");
-               ClosureCleaner.ensureSerializable(customPartitioner);
+               ClosureCleaner.clean(customPartitioner, true);
                ClosureCleaner.ensureSerializable(serializationSchema);
 
                this.defaultTopicId = defaultTopicId;

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 7ce3a9d..7ee3079 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -49,24 +49,24 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public abstract class AbstractFetcher<T, KPH> {
        
-       private static final int NO_TIMESTAMPS_WATERMARKS = 0;
-       private static final int PERIODIC_WATERMARKS = 1;
-       private static final int PUNCTUATED_WATERMARKS = 2;
+       protected static final int NO_TIMESTAMPS_WATERMARKS = 0;
+       protected static final int PERIODIC_WATERMARKS = 1;
+       protected static final int PUNCTUATED_WATERMARKS = 2;
        
        // 
------------------------------------------------------------------------
        
        /** The source context to emit records and watermarks to */
-       private final SourceContext<T> sourceContext;
+       protected final SourceContext<T> sourceContext;
 
        /** The lock that guarantees that record emission and state updates are 
atomic,
         * from the view of taking a checkpoint */
-       private final Object checkpointLock;
+       protected final Object checkpointLock;
 
        /** All partitions (and their state) that this fetcher is subscribed to 
*/
        private final KafkaTopicPartitionState<KPH>[] allPartitions;
 
        /** The mode describing whether the fetcher also generates timestamps 
and watermarks */
-       private final int timestampWatermarkMode;
+       protected final int timestampWatermarkMode;
 
        /** Flag whether to register metrics for the fetcher */
        protected final boolean useMetrics;
@@ -207,30 +207,34 @@ public abstract class AbstractFetcher<T, KPH> {
        // 
------------------------------------------------------------------------
 
        /**
-        * 
         * <p>Implementation Note: This method is kept brief to be JIT inlining 
friendly.
         * That makes the fast path efficient, the extended paths are called as 
separate methods.
-        * 
         * @param record The record to emit
         * @param partitionState The state of the Kafka partition from which 
the record was fetched
-        * @param offset The offset from which the record was fetched
+        * @param offset The offset of the record
+        * @param timestamp The record's event-timestamp
         */
-       protected final void emitRecord(T record, KafkaTopicPartitionState<KPH> 
partitionState, long offset) {
+       protected void emitRecord(T record, KafkaTopicPartitionState<KPH> 
partitionState, long offset, long timestamp) throws Exception {
                if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
                        // fast path logic, in case there are no watermarks
 
                        // emit the record, using the checkpoint lock to 
guarantee
                        // atomicity of record emission and offset state update
                        synchronized (checkpointLock) {
-                               sourceContext.collect(record);
+                               if(timestamp != Long.MIN_VALUE) {
+                                       // this case is true for Kafka 0.10
+                                       
sourceContext.collectWithTimestamp(record, timestamp);
+                               } else {
+                                       sourceContext.collect(record);
+                               }
                                partitionState.setOffset(offset);
                        }
                }
                else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
-                       emitRecordWithTimestampAndPeriodicWatermark(record, 
partitionState, offset);
+                       emitRecordWithTimestampAndPeriodicWatermark(record, 
partitionState, offset, timestamp);
                }
                else {
-                       emitRecordWithTimestampAndPunctuatedWatermark(record, 
partitionState, offset);
+                       emitRecordWithTimestampAndPunctuatedWatermark(record, 
partitionState, offset, timestamp);
                }
        }
 
@@ -238,8 +242,8 @@ public abstract class AbstractFetcher<T, KPH> {
         * Record emission, if a timestamp will be attached from an assigner 
that is
         * also a periodic watermark generator.
         */
-       private void emitRecordWithTimestampAndPeriodicWatermark(
-                       T record, KafkaTopicPartitionState<KPH> partitionState, 
long offset)
+       protected void emitRecordWithTimestampAndPeriodicWatermark(
+                       T record, KafkaTopicPartitionState<KPH> partitionState, 
long offset, long kafkaEventTimestamp)
        {
                @SuppressWarnings("unchecked")
                final KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> 
withWatermarksState =
@@ -251,7 +255,7 @@ public abstract class AbstractFetcher<T, KPH> {
                final long timestamp;
                //noinspection SynchronizationOnLocalVariableOrMethodParameter
                synchronized (withWatermarksState) {
-                       timestamp = 
withWatermarksState.getTimestampForRecord(record);
+                       timestamp = 
withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp);
                }
 
                // emit the record with timestamp, using the usual checkpoint 
lock to guarantee
@@ -266,8 +270,8 @@ public abstract class AbstractFetcher<T, KPH> {
         * Record emission, if a timestamp will be attached from an assigner 
that is
         * also a punctuated watermark generator.
         */
-       private void emitRecordWithTimestampAndPunctuatedWatermark(
-                       T record, KafkaTopicPartitionState<KPH> partitionState, 
long offset)
+       protected void emitRecordWithTimestampAndPunctuatedWatermark(
+                       T record, KafkaTopicPartitionState<KPH> partitionState, 
long offset, long kafkaEventTimestamp)
        {
                @SuppressWarnings("unchecked")
                final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> 
withWatermarksState =
@@ -275,7 +279,7 @@ public abstract class AbstractFetcher<T, KPH> {
 
                // only one thread ever works on accessing timestamps and 
watermarks
                // from the punctuated extractor
-               final long timestamp = 
withWatermarksState.getTimestampForRecord(record);
+               final long timestamp = 
withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp);
                final Watermark newWatermark = 
withWatermarksState.checkAndGetNewWatermark(record, timestamp);
                        
                // emit the record with timestamp, using the usual checkpoint 
lock to guarantee
@@ -291,6 +295,7 @@ public abstract class AbstractFetcher<T, KPH> {
                        updateMinPunctuatedWatermark(newWatermark);
                }
        }
+
        /**
         *Checks whether a new per-partition watermark is also a new 
cross-partition watermark.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
index 99c5d69..efdc73f 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
@@ -49,8 +49,8 @@ public final class 
KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> extend
 
        // 
------------------------------------------------------------------------
        
-       public long getTimestampForRecord (T record) {
-               return timestampsAndWatermarks.extractTimestamp(record, 
Long.MIN_VALUE);
+       public long getTimestampForRecord(T record, long kafkaEventTimestamp) {
+               return timestampsAndWatermarks.extractTimestamp(record, 
kafkaEventTimestamp);
        }
        
        public long getCurrentWatermarkTimestamp() {

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
index b265990..edf40ce 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
@@ -54,8 +54,8 @@ public final class 
KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> exte
 
        // 
------------------------------------------------------------------------
        
-       public long getTimestampForRecord(T record) {
-               return timestampsAndWatermarks.extractTimestamp(record, 
Long.MIN_VALUE);
+       public long getTimestampForRecord(T record, long kafkaEventTimestamp) {
+               return timestampsAndWatermarks.extractTimestamp(record, 
kafkaEventTimestamp);
        }
 
        @Nullable

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index bafff4f..3c967ba 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -68,6 +68,7 @@ import 
org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -91,6 +92,7 @@ import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.testutils.junit.RetryOnException;
 import org.apache.flink.testutils.junit.RetryRule;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.StringUtils;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.junit.Assert;
@@ -185,7 +187,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        stream.print();
                        see.execute("No broker test");
                } catch(ProgramInvocationException pie) {
-                       if(kafkaServer.getVersion().equals("0.9")) {
+                       if(kafkaServer.getVersion().equals("0.9") || 
kafkaServer.getVersion().equals("0.10")) {
                                assertTrue(pie.getCause() instanceof 
JobExecutionException);
 
                                JobExecutionException jee = 
(JobExecutionException) pie.getCause();
@@ -287,8 +289,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                Properties producerProperties = 
FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
                producerProperties.setProperty("retries", "3");
                producerProperties.putAll(secureProps);
-               FlinkKafkaProducerBase<Tuple2<Long, String>> prod = 
kafkaServer.getProducer(topic, new 
KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null);
-               stream.addSink(prod);
+               kafkaServer.produceIntoKafka(stream, topic, new 
KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null);
 
                // ----------- add consumer dataflow ----------
 
@@ -516,7 +517,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
 
                // launch a producer thread
                DataGenerators.InfiniteStringsGenerator generator =
-                               new 
DataGenerators.InfiniteStringsGenerator(kafkaServer, topic);
+                               new 
DataGenerators.InfiniteStringsGenerator(kafkaServer, topic, flinkPort);
                generator.start();
 
                // launch a consumer asynchronously
@@ -539,7 +540,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
 
                                        env.addSource(source).addSink(new 
DiscardingSink<String>());
 
-                                       env.execute();
+                                       env.execute("Runner for 
CancelingOnFullInputTest");
                                }
                                catch (Throwable t) {
                                        jobError.set(t);
@@ -560,7 +561,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                }
 
                // cancel
-               
JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+               
JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout), 
"Runner for CancelingOnFullInputTest");
 
                // wait for the program to be done and validate that we failed 
with the right exception
                runnerThread.join();
@@ -570,6 +571,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                assertTrue(failueCause.getMessage().contains("Job was 
cancelled"));
 
                if (generator.isAlive()) {
+                       
JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout), 
"String generator");
                        generator.shutdown();
                        generator.join();
                }
@@ -613,7 +615,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
 
                                        env.addSource(source).addSink(new 
DiscardingSink<String>());
 
-                                       env.execute();
+                                       
env.execute("CancelingOnEmptyInputTest");
                                }
                                catch (Throwable t) {
                                        LOG.error("Job Runner failed with 
exception", t);
@@ -671,7 +673,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                                .addSink(new DiscardingSink<Integer>());
 
                try {
-                       env.execute();
+                       env.execute("test fail on deploy");
                        fail("this test should fail with an exception");
                }
                catch (ProgramInvocationException e) {
@@ -738,8 +740,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                Properties props = new Properties();
                props.putAll(standardProps);
                props.putAll(secureProps);
-
-               stream.addSink(kafkaServer.getProducer("dummy", schema, props, 
null));
+               kafkaServer.produceIntoKafka(stream, "dummy", schema, props, 
null);
 
                env.execute("Write to topics");
 
@@ -954,7 +955,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        }
                });
 
-               stream.addSink(kafkaServer.getProducer(topic, new 
KeyedSerializationSchemaWrapper<>(serSchema), producerProps, null));
+               kafkaServer.produceIntoKafka(stream, topic, new 
KeyedSerializationSchemaWrapper<>(serSchema), producerProps, null);
 
                tryExecute(env, "big topology test");
                deleteTestTopic(topic);
@@ -1046,8 +1047,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                KeyedSerializationSchema<Tuple2<Long, PojoValue>> schema = new 
TypeInformationKeyValueSerializationSchema<>(Long.class, PojoValue.class, 
env.getConfig());
                Properties producerProperties = 
FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
                producerProperties.setProperty("retries", "3");
-               producerProperties.putAll(secureProps);
-               kvStream.addSink(kafkaServer.getProducer(topic, schema, 
producerProperties, null));
+               kafkaServer.produceIntoKafka(kvStream, topic, schema, 
producerProperties, null);
                env.execute("Write KV to Kafka");
 
                // ----------- Read the data again -------------------
@@ -1132,8 +1132,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                Properties producerProperties = 
FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
                producerProperties.setProperty("retries", "3");
                producerProperties.putAll(secureProps);
-
-               kvStream.addSink(kafkaServer.getProducer(topic, schema, 
producerProperties, null));
+               kafkaServer.produceIntoKafka(kvStream, topic, schema, 
producerProperties, null);
 
                env.execute("Write deletes to Kafka");
 
@@ -1229,7 +1228,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                                        props.putAll(secureProps);
 
                                        
TypeInformationSerializationSchema<Tuple2<Integer, Integer>> schema = new 
TypeInformationSerializationSchema<>(TypeInfoParser.<Tuple2<Integer, 
Integer>>parse("Tuple2<Integer, Integer>"), env1.getConfig());
-                                       DataStream<Tuple2<Integer, Integer>> 
fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, props));
+                                       DataStream<Tuple2<Integer, Integer>> 
fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, 
standardProps));
                                        fromKafka.flatMap(new 
FlatMapFunction<Tuple2<Integer, Integer>, Void>() {
                                                @Override
                                                public void 
flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception 
{// no op
@@ -1254,7 +1253,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                                                }
                                        });
 
-                                       
fromGen.addSink(kafkaServer.getProducer(topic, new 
KeyedSerializationSchemaWrapper<>(schema), props, null));
+                                       kafkaServer.produceIntoKafka(fromGen, 
topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null);
 
                                        env1.execute("Metrics test job");
                                } catch(Throwable t) {
@@ -1369,8 +1368,8 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                final TypeInformationSerializationSchema<Tuple2<Integer, 
Integer>> deser =
                                new 
TypeInformationSerializationSchema<>(intIntTupleType, env.getConfig());
 
-               cc.putAll(secureProps);
                // create the consumer
+               cc.putAll(secureProps);
                FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> consumer = 
kafkaServer.getConsumer(topicName, deser, cc);
 
                DataStream<Tuple2<Integer, Integer>> source = env
@@ -1474,9 +1473,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        producerProperties.setProperty("retries", "0");
                        producerProperties.putAll(secureProps);
                        
-                       stream.addSink(kafkaServer.getProducer(
-                                                       topicName, serSchema, 
producerProperties,
-                                                       new 
Tuple2Partitioner(parallelism)))
+                       kafkaServer.produceIntoKafka(stream, topicName, 
serSchema, producerProperties, new Tuple2Partitioner(parallelism))
                                        .setParallelism(parallelism);
 
                        try {
@@ -1773,86 +1770,88 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
 //                     deleteTestTopic(topic);
 //             }
 //     }
-
-       private void executeAndCatchException(StreamExecutionEnvironment env, 
String execName) throws Exception {
-               try {
-                       tryExecutePropagateExceptions(env, execName);
-               }
-               catch (ProgramInvocationException | JobExecutionException e) {
-                       // look for NotLeaderForPartitionException
-                       Throwable cause = e.getCause();
-
-                       // search for nested SuccessExceptions
-                       int depth = 0;
-                       while (cause != null && depth++ < 20) {
-                               if (cause instanceof 
kafka.common.NotLeaderForPartitionException) {
-                                       throw (Exception) cause;
-                               }
-                               cause = cause.getCause();
-                       }
-                       throw e;
-               }
-       }
-
-       private void putDataInTopics(StreamExecutionEnvironment env,
-                                                               Properties 
producerProperties,
-                                                               final int 
elementsPerPartition,
-                                                               Map<String, 
Boolean> topics,
-                                                               
TypeInformation<Tuple2<Long, Integer>> outputTypeInfo) {
-               if(topics.size() != 2) {
-                       throw new RuntimeException("This method accepts two 
topics as arguments.");
-               }
-
-               TypeInformationSerializationSchema<Tuple2<Long, Integer>> 
sinkSchema =
-                       new 
TypeInformationSerializationSchema<>(outputTypeInfo, env.getConfig());
-
-               DataStream<Tuple2<Long, Integer>> stream = env
-                       .addSource(new RichParallelSourceFunction<Tuple2<Long, 
Integer>>() {
-                               private boolean running = true;
-
-                               @Override
-                               public void run(SourceContext<Tuple2<Long, 
Integer>> ctx) throws InterruptedException {
-                                       int topic = 0;
-                                       int currentTs = 1;
-
-                                       while (running && currentTs < 
elementsPerPartition) {
-                                               long timestamp = (currentTs % 
10 == 0) ? -1L : currentTs;
-                                               ctx.collect(new Tuple2<Long, 
Integer>(timestamp, topic));
-                                               currentTs++;
-                                       }
-
-                                       Tuple2<Long, Integer> toWrite2 = new 
Tuple2<Long, Integer>(-1L, topic);
-                                       ctx.collect(toWrite2);
-                               }
-
-                               @Override
-                               public void cancel() {
-                               running = false;
-                       }
-                       }).setParallelism(1);
-
-               List<Map.Entry<String, Boolean>> topicsL = new 
ArrayList<>(topics.entrySet());
-               stream.map(new MapFunction<Tuple2<Long,Integer>, 
Tuple2<Long,Integer>>() {
-
-                       @Override
-                       public Tuple2<Long, Integer> map(Tuple2<Long, Integer> 
value) throws Exception {
-                               return value;
-                       }
-               
}).setParallelism(1).addSink(kafkaServer.getProducer(topicsL.get(0).getKey(),
-                       new KeyedSerializationSchemaWrapper<>(sinkSchema), 
producerProperties, null)).setParallelism(1);
-
-               if(!topicsL.get(1).getValue()) {
-                       stream.map(new MapFunction<Tuple2<Long,Integer>, 
Tuple2<Long,Integer>>() {
-
-                               @Override
-                               public Tuple2<Long, Integer> map(Tuple2<Long, 
Integer> value) throws Exception {
-                                       long timestamp = (value.f0 == -1) ? -1L 
: 1000 + value.f0;
-                                       return new Tuple2<Long, 
Integer>(timestamp, 1);
-                               }
-                       
}).setParallelism(1).addSink(kafkaServer.getProducer(topicsL.get(1).getKey(),
-                               new 
KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, 
null)).setParallelism(1);
-               }
-       }
+//
+//     private void executeAndCatchException(StreamExecutionEnvironment env, 
String execName) throws Exception {
+//             try {
+//                     tryExecutePropagateExceptions(env, execName);
+//             }
+//             catch (ProgramInvocationException | JobExecutionException e) {
+//                     // look for NotLeaderForPartitionException
+//                     Throwable cause = e.getCause();
+//
+//                     // search for nested SuccessExceptions
+//                     int depth = 0;
+//                     while (cause != null && depth++ < 20) {
+//                             if (cause instanceof 
kafka.common.NotLeaderForPartitionException) {
+//                                     throw (Exception) cause;
+//                             }
+//                             cause = cause.getCause();
+//                     }
+//                     throw e;
+//             }
+//     }
+//
+//     private void putDataInTopics(StreamExecutionEnvironment env,
+//                                                             Properties 
producerProperties,
+//                                                             final int 
elementsPerPartition,
+//                                                             Map<String, 
Boolean> topics,
+//                                                             
TypeInformation<Tuple2<Long, Integer>> outputTypeInfo) {
+//             if(topics.size() != 2) {
+//                     throw new RuntimeException("This method accepts two 
topics as arguments.");
+//             }
+//
+//             TypeInformationSerializationSchema<Tuple2<Long, Integer>> 
sinkSchema =
+//                     new 
TypeInformationSerializationSchema<>(outputTypeInfo, env.getConfig());
+//
+//             DataStream<Tuple2<Long, Integer>> stream = env
+//                     .addSource(new RichParallelSourceFunction<Tuple2<Long, 
Integer>>() {
+//                             private boolean running = true;
+//
+//                             @Override
+//                             public void run(SourceContext<Tuple2<Long, 
Integer>> ctx) throws InterruptedException {
+//                                     int topic = 0;
+//                                     int currentTs = 1;
+//
+//                                     while (running && currentTs < 
elementsPerPartition) {
+//                                             long timestamp = (currentTs % 
10 == 0) ? -1L : currentTs;
+//                                             ctx.collect(new Tuple2<Long, 
Integer>(timestamp, topic));
+//                                             currentTs++;
+//                                     }
+//
+//                                     Tuple2<Long, Integer> toWrite2 = new 
Tuple2<Long, Integer>(-1L, topic);
+//                                     ctx.collect(toWrite2);
+//                             }
+//
+//                             @Override
+//                             public void cancel() {
+//                             running = false;
+//                     }
+//                     }).setParallelism(1);
+//
+//             List<Map.Entry<String, Boolean>> topicsL = new 
ArrayList<>(topics.entrySet());
+//
+//             stream = stream.map(new MapFunction<Tuple2<Long,Integer>, 
Tuple2<Long,Integer>>() {
+//
+//                     @Override
+//                     public Tuple2<Long, Integer> map(Tuple2<Long, Integer> 
value) throws Exception {
+//                             return value;
+//                     }
+//             }).setParallelism(1);
+//             kafkaServer.produceIntoKafka(stream, topicsL.get(0).getKey(),
+//                     new KeyedSerializationSchemaWrapper<>(sinkSchema), 
producerProperties, null).setParallelism(1);
+//
+//             if(!topicsL.get(1).getValue()) {
+//                     stream.map(new MapFunction<Tuple2<Long,Integer>, 
Tuple2<Long,Integer>>() {
+//
+//                             @Override
+//                             public Tuple2<Long, Integer> map(Tuple2<Long, 
Integer> value) throws Exception {
+//                                     long timestamp = (value.f0 == -1) ? -1L 
: 1000 + value.f0;
+//                                     return new Tuple2<>(timestamp, 1);
+//                             }
+//                     
}).setParallelism(1).addSink(kafkaServer.produceIntoKafka(topicsL.get(1).getKey(),
+//                             new 
KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, 
null)).setParallelism(1);
+//             }
+//     }
 
        private DataStreamSink<Tuple2<Long, Integer>> 
runPunctuatedComsumer(StreamExecutionEnvironment env,
                                                                                
                                                                List<String> 
topics,

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 5bcf406..c925c8f 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -109,11 +109,10 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
                        props.putAll(secureProps);
                        
                        // sink partitions into 
-                       stream.addSink(kafkaServer.getProducer(topic,
+                       kafkaServer.produceIntoKafka(stream, topic,
                                        new 
KeyedSerializationSchemaWrapper<>(serSchema),
                                        props,
-                                       new CustomPartitioner(parallelism)))
-                       .setParallelism(parallelism);
+                                       new 
CustomPartitioner(parallelism)).setParallelism(parallelism);
 
                        // ------ consuming topology ---------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index 9236e78..5c03b78 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -165,12 +165,10 @@ public class KafkaShortRetentionTestBase implements 
Serializable {
                                running = false;
                        }
                });
-
                Properties props = new Properties();
                props.putAll(standardProps);
                props.putAll(secureProps);
-
-               stream.addSink(kafkaServer.getProducer(topic, new 
KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props, null));
+               kafkaServer.produceIntoKafka(stream, topic, new 
KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props, null);
 
                // ----------- add consumer dataflow ----------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index 6ecde71..ded1fde 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -18,6 +18,9 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import kafka.server.KafkaServer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
@@ -76,7 +79,9 @@ public abstract class KafkaTestEnvironment {
 
        public abstract <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> 
topics, KeyedDeserializationSchema<T> readSchema, Properties props);
 
-       public abstract <T> FlinkKafkaProducerBase<T> getProducer(String topic, 
KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> 
partitioner);
+       public abstract <T> DataStreamSink<T> produceIntoKafka(DataStream<T> 
stream, String topic,
+                                                                               
                                KeyedSerializationSchema<T> serSchema, 
Properties props,
+                                                                               
                                KafkaPartitioner<T> partitioner);
 
 
        // -- leader failure simulation

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
index 9b5d2e6..582311f 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
@@ -46,10 +46,11 @@ public class AbstractFetcherTimestampsTest {
        
        @Test
        public void testPunctuatedWatermarks() throws Exception {
+               final String testTopic = "test topic name";
                List<KafkaTopicPartition> originalPartitions = Arrays.asList(
-                               new KafkaTopicPartition("test topic name", 7),
-                               new KafkaTopicPartition("test topic name", 13),
-                               new KafkaTopicPartition("test topic name", 21));
+                               new KafkaTopicPartition(testTopic, 7),
+                               new KafkaTopicPartition(testTopic, 13),
+                               new KafkaTopicPartition(testTopic, 21));
 
                TestSourceContext<Long> sourceContext = new 
TestSourceContext<>();
 
@@ -65,22 +66,22 @@ public class AbstractFetcherTimestampsTest {
                // elements generate a watermark if the timestamp is a multiple 
of three
                
                // elements for partition 1
-               fetcher.emitRecord(1L, part1, 1L);
-               fetcher.emitRecord(2L, part1, 2L);
-               fetcher.emitRecord(3L, part1, 3L);
+               fetcher.emitRecord(1L, part1, 1L, Long.MIN_VALUE);
+               fetcher.emitRecord(2L, part1, 2L, Long.MIN_VALUE);
+               fetcher.emitRecord(3L, part1, 3L, Long.MIN_VALUE);
                assertEquals(3L, 
sourceContext.getLatestElement().getValue().longValue());
                assertEquals(3L, 
sourceContext.getLatestElement().getTimestamp());
                assertFalse(sourceContext.hasWatermark());
 
                // elements for partition 2
-               fetcher.emitRecord(12L, part2, 1L);
+               fetcher.emitRecord(12L, part2, 1L, Long.MIN_VALUE);
                assertEquals(12L, 
sourceContext.getLatestElement().getValue().longValue());
                assertEquals(12L, 
sourceContext.getLatestElement().getTimestamp());
                assertFalse(sourceContext.hasWatermark());
 
                // elements for partition 3
-               fetcher.emitRecord(101L, part3, 1L);
-               fetcher.emitRecord(102L, part3, 2L);
+               fetcher.emitRecord(101L, part3, 1L, Long.MIN_VALUE);
+               fetcher.emitRecord(102L, part3, 2L, Long.MIN_VALUE);
                assertEquals(102L, 
sourceContext.getLatestElement().getValue().longValue());
                assertEquals(102L, 
sourceContext.getLatestElement().getTimestamp());
                
@@ -89,25 +90,25 @@ public class AbstractFetcherTimestampsTest {
                assertEquals(3L, 
sourceContext.getLatestWatermark().getTimestamp());
                
                // advance partition 3
-               fetcher.emitRecord(1003L, part3, 3L);
-               fetcher.emitRecord(1004L, part3, 4L);
-               fetcher.emitRecord(1005L, part3, 5L);
+               fetcher.emitRecord(1003L, part3, 3L, Long.MIN_VALUE);
+               fetcher.emitRecord(1004L, part3, 4L, Long.MIN_VALUE);
+               fetcher.emitRecord(1005L, part3, 5L, Long.MIN_VALUE);
                assertEquals(1005L, 
sourceContext.getLatestElement().getValue().longValue());
                assertEquals(1005L, 
sourceContext.getLatestElement().getTimestamp());
 
                // advance partition 1 beyond partition 2 - this bumps the 
watermark
-               fetcher.emitRecord(30L, part1, 4L);
+               fetcher.emitRecord(30L, part1, 4L, Long.MIN_VALUE);
                assertEquals(30L, 
sourceContext.getLatestElement().getValue().longValue());
                assertEquals(30L, 
sourceContext.getLatestElement().getTimestamp());
                assertTrue(sourceContext.hasWatermark());
                assertEquals(12L, 
sourceContext.getLatestWatermark().getTimestamp());
 
                // advance partition 2 again - this bumps the watermark
-               fetcher.emitRecord(13L, part2, 2L);
+               fetcher.emitRecord(13L, part2, 2L, Long.MIN_VALUE);
                assertFalse(sourceContext.hasWatermark());
-               fetcher.emitRecord(14L, part2, 3L);
+               fetcher.emitRecord(14L, part2, 3L, Long.MIN_VALUE);
                assertFalse(sourceContext.hasWatermark());
-               fetcher.emitRecord(15L, part2, 3L);
+               fetcher.emitRecord(15L, part2, 3L, Long.MIN_VALUE);
                assertTrue(sourceContext.hasWatermark());
                assertEquals(15L, 
sourceContext.getLatestWatermark().getTimestamp());
        }
@@ -117,11 +118,12 @@ public class AbstractFetcherTimestampsTest {
 
                ExecutionConfig config = new ExecutionConfig();
                config.setAutoWatermarkInterval(10);
-               
+
+               final String testTopic = "test topic name";
                List<KafkaTopicPartition> originalPartitions = Arrays.asList(
-                               new KafkaTopicPartition("test topic name", 7),
-                               new KafkaTopicPartition("test topic name", 13),
-                               new KafkaTopicPartition("test topic name", 21));
+                               new KafkaTopicPartition(testTopic, 7),
+                               new KafkaTopicPartition(testTopic, 13),
+                               new KafkaTopicPartition(testTopic, 21));
 
                TestSourceContext<Long> sourceContext = new 
TestSourceContext<>();
 
@@ -142,20 +144,20 @@ public class AbstractFetcherTimestampsTest {
                        // elements generate a watermark if the timestamp is a 
multiple of three
        
                        // elements for partition 1
-                       fetcher.emitRecord(1L, part1, 1L);
-                       fetcher.emitRecord(2L, part1, 2L);
-                       fetcher.emitRecord(3L, part1, 3L);
+                       fetcher.emitRecord(1L, part1, 1L, Long.MIN_VALUE);
+                       fetcher.emitRecord(2L, part1, 2L, Long.MIN_VALUE);
+                       fetcher.emitRecord(3L, part1, 3L, Long.MIN_VALUE);
                        assertEquals(3L, 
sourceContext.getLatestElement().getValue().longValue());
                        assertEquals(3L, 
sourceContext.getLatestElement().getTimestamp());
        
                        // elements for partition 2
-                       fetcher.emitRecord(12L, part2, 1L);
+                       fetcher.emitRecord(12L, part2, 1L, Long.MIN_VALUE);
                        assertEquals(12L, 
sourceContext.getLatestElement().getValue().longValue());
                        assertEquals(12L, 
sourceContext.getLatestElement().getTimestamp());
        
                        // elements for partition 3
-                       fetcher.emitRecord(101L, part3, 1L);
-                       fetcher.emitRecord(102L, part3, 2L);
+                       fetcher.emitRecord(101L, part3, 1L, Long.MIN_VALUE);
+                       fetcher.emitRecord(102L, part3, 2L, Long.MIN_VALUE);
                        assertEquals(102L, 
sourceContext.getLatestElement().getValue().longValue());
                        assertEquals(102L, 
sourceContext.getLatestElement().getTimestamp());
        
@@ -163,14 +165,14 @@ public class AbstractFetcherTimestampsTest {
                        assertEquals(3L, 
sourceContext.getLatestWatermark().getTimestamp());
        
                        // advance partition 3
-                       fetcher.emitRecord(1003L, part3, 3L);
-                       fetcher.emitRecord(1004L, part3, 4L);
-                       fetcher.emitRecord(1005L, part3, 5L);
+                       fetcher.emitRecord(1003L, part3, 3L, Long.MIN_VALUE);
+                       fetcher.emitRecord(1004L, part3, 4L, Long.MIN_VALUE);
+                       fetcher.emitRecord(1005L, part3, 5L, Long.MIN_VALUE);
                        assertEquals(1005L, 
sourceContext.getLatestElement().getValue().longValue());
                        assertEquals(1005L, 
sourceContext.getLatestElement().getTimestamp());
        
                        // advance partition 1 beyond partition 2 - this bumps 
the watermark
-                       fetcher.emitRecord(30L, part1, 4L);
+                       fetcher.emitRecord(30L, part1, 4L, Long.MIN_VALUE);
                        assertEquals(30L, 
sourceContext.getLatestElement().getValue().longValue());
                        assertEquals(30L, 
sourceContext.getLatestElement().getTimestamp());
                        
@@ -178,9 +180,9 @@ public class AbstractFetcherTimestampsTest {
                        assertEquals(12L, 
sourceContext.getLatestWatermark().getTimestamp());
        
                        // advance partition 2 again - this bumps the watermark
-                       fetcher.emitRecord(13L, part2, 2L);
-                       fetcher.emitRecord(14L, part2, 3L);
-                       fetcher.emitRecord(15L, part2, 3L);
+                       fetcher.emitRecord(13L, part2, 2L, Long.MIN_VALUE);
+                       fetcher.emitRecord(14L, part2, 3L, Long.MIN_VALUE);
+                       fetcher.emitRecord(15L, part2, 3L, Long.MIN_VALUE);
        
                        // this blocks until the periodic thread emitted the 
watermark
                        long watermarkTs = 
sourceContext.getLatestWatermark().getTimestamp();

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
index 58a5cc3..3f035fd 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -23,10 +23,10 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
 import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
@@ -35,6 +35,7 @@ import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWra
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import 
org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
 
+import java.io.Serializable;
 import java.util.Properties;
 import java.util.Random;
 
@@ -63,7 +64,7 @@ public class DataGenerators {
                                                int partition = 
getRuntimeContext().getIndexOfThisSubtask();
 
                                                while (running && cnt <= to) {
-                                                       ctx.collect(new 
Tuple2<Integer, Integer>(partition, cnt));
+                                                       ctx.collect(new 
Tuple2<>(partition, cnt));
                                                        cnt++;
                                                }
                                        }
@@ -74,11 +75,11 @@ public class DataGenerators {
                                        }
                                });
 
-               stream.addSink(testServer.getProducer(topic,
+               testServer.produceIntoKafka(stream, topic,
                                new KeyedSerializationSchemaWrapper<>(new 
TypeInformationSerializationSchema<>(resultType, env.getConfig())),
                                
FlinkKafkaProducerBase.getPropertiesFromBrokerList(testServer.getBrokerConnectionString()),
                                new Tuple2Partitioner(numPartitions)
-               ));
+               );
 
                env.execute("Data generator (Int, Int) stream to topic " + 
topic);
        }
@@ -141,38 +142,40 @@ public class DataGenerators {
                if(secureProps != null) {
                        props.putAll(testServer.getSecureProperties());
                }
-
-               stream
-                               .rebalance()
-                               .addSink(testServer.getProducer(topic,
-                                               new 
KeyedSerializationSchemaWrapper<>(new 
TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, 
env.getConfig())),
-                                               props,
-                                               new KafkaPartitioner<Integer>() 
{
-                                                       @Override
-                                                       public int 
partition(Integer next, byte[] serializedKey, byte[] serializedValue, int 
numPartitions) {
-                                                               return next % 
numPartitions;
-                                                       }
-                                               }));
+               
+               stream = stream.rebalance();
+               testServer.produceIntoKafka(stream, topic,
+                               new KeyedSerializationSchemaWrapper<>(new 
TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, 
env.getConfig())),
+                               props,
+                               new KafkaPartitioner<Integer>() {
+                                       @Override
+                                       public int partition(Integer next, 
byte[] serializedKey, byte[] serializedValue, int numPartitions) {
+                                               return next % numPartitions;
+                                       }
+                               });
 
                env.execute("Scrambles int sequence generator");
        }
        
        // 
------------------------------------------------------------------------
        
-       public static class InfiniteStringsGenerator extends Thread {
+       public static class InfiniteStringsGenerator extends Thread implements 
Serializable {
 
-               private final KafkaTestEnvironment server;
+               private transient KafkaTestEnvironment server;
                
                private final String topic;
-               
+
+               private final int flinkPort;
+
                private volatile Throwable error;
                
                private volatile boolean running = true;
 
                
-               public InfiniteStringsGenerator(KafkaTestEnvironment server, 
String topic) {
+               public InfiniteStringsGenerator(KafkaTestEnvironment server, 
String topic, int flinkPort) {
                        this.server = server;
                        this.topic = topic;
+                       this.flinkPort = flinkPort;
                }
 
                @Override
@@ -180,28 +183,36 @@ public class DataGenerators {
                        // we manually feed data into the Kafka sink
                        FlinkKafkaProducerBase<String> producer = null;
                        try {
+                               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+                               DataStream<String> stream = env.addSource(new 
SourceFunction<String>() {
+                                       @Override
+                                       public void run(SourceContext<String> 
ctx) throws Exception {
+                                               final StringBuilder bld = new 
StringBuilder();
+                                               final Random rnd = new Random();
+                                               while (running) {
+                                                       bld.setLength(0);
+                                                       int len = 
rnd.nextInt(100) + 1;
+                                                       for (int i = 0; i < 
len; i++) {
+                                                               
bld.append((char) (rnd.nextInt(20) + 'a'));
+                                                       }
+
+                                                       String next = 
bld.toString();
+                                                       ctx.collect(next);
+                                               }
+                                       }
+
+                                       @Override
+                                       public void cancel() {
+                                               running = false;
+                                       }
+                               });
+
                                Properties producerProperties = 
FlinkKafkaProducerBase.getPropertiesFromBrokerList(server.getBrokerConnectionString());
                                producerProperties.setProperty("retries", "3");
-                               producer = server.getProducer(topic,
+                               server.produceIntoKafka(stream, topic,
                                                new 
KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
                                                producerProperties, new 
FixedPartitioner<String>());
-                               producer.setRuntimeContext(new 
MockRuntimeContext(1,0));
-                               producer.open(new Configuration());
-                               
-                               final StringBuilder bld = new StringBuilder();
-                               final Random rnd = new Random();
-                               
-                               while (running) {
-                                       bld.setLength(0);
-                                       
-                                       int len = rnd.nextInt(100) + 1;
-                                       for (int i = 0; i < len; i++) {
-                                               bld.append((char) 
(rnd.nextInt(20) + 'a') );
-                                       }
-                                       
-                                       String next = bld.toString();
-                                       producer.invoke(next);
-                               }
+                               env.execute("String generator");
                        }
                        catch (Throwable t) {
                                this.error = t;

http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
index 028045a..acdad5a 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
@@ -54,6 +54,10 @@ public class JobManagerCommunicationUtils {
        }
 
        public static void cancelCurrentJob(ActorGateway jobManager) throws 
Exception {
+               cancelCurrentJob(jobManager, null);
+       }
+
+       public static void cancelCurrentJob(ActorGateway jobManager, String 
name) throws Exception {
                JobStatusMessage status = null;
                
                for (int i = 0; i < 200; i++) {
@@ -78,8 +82,21 @@ public class JobManagerCommunicationUtils {
                        else if (jobs.size() == 1) {
                                status = jobs.get(0);
                        }
-                       else {
-                               throw new Exception("Could not cancel job - 
more than one running job.");
+                       else if(name != null) {
+                               for(JobStatusMessage msg: jobs) {
+                                       if(msg.getJobName().equals(name)) {
+                                               status = msg;
+                                       }
+                               }
+                               if(status == null) {
+                                       throw new Exception("Could not cancel 
job - no job matched expected name = '" + name +"' in " + jobs);
+                               }
+                       } else {
+                               String jobNames = "";
+                               for(JobStatusMessage jsm: jobs) {
+                                       jobNames += jsm.getJobName() + ", ";
+                               }
+                               throw new Exception("Could not cancel job - 
more than one running job: " + jobNames);
                        }
                }
                

Reply via email to