Repository: flink Updated Branches: refs/heads/master a079259f3 -> 6731ec1e4
http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 213ba4a..9d8fa9a 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -29,6 +29,8 @@ import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; import org.apache.commons.io.FileUtils; import org.apache.curator.test.TestingServer; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; @@ -94,10 +96,10 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { } @Override - public <T> FlinkKafkaProducerBase<T> getProducer(String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) { + public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) { FlinkKafkaProducer09<T> prod = new FlinkKafkaProducer09<>(topic, serSchema, props, partitioner); prod.setFlushOnCheckpoint(true); - return prod; + return stream.addSink(prod); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java index 5b18c75..05028e6 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java @@ -21,6 +21,7 @@ import org.apache.commons.collections.map.LinkedMap; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.CheckpointListener; @@ -36,6 +37,7 @@ import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.util.SerializedValue; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -169,6 +171,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti throw new IllegalStateException("A periodic watermark emitter has already been set."); } try { + ClosureCleaner.clean(assigner, true); this.punctuatedWatermarkAssigner = new SerializedValue<>(assigner); return this; } catch (Exception e) { @@ -203,6 +206,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti throw new IllegalStateException("A punctuated watermark emitter has already been set."); } try { + ClosureCleaner.clean(assigner, true); this.periodicWatermarkAssigner = new SerializedValue<>(assigner); return this; } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java index f0975dc..26a695e 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java @@ -105,7 +105,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im /** * If true, the producer will wait until all outstanding records have been send to the broker. */ - private boolean flushOnCheckpoint; + protected boolean flushOnCheckpoint; // -------------------------------- Runtime fields ------------------------------------------ @@ -139,7 +139,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im requireNonNull(defaultTopicId, "TopicID not set"); requireNonNull(serializationSchema, "serializationSchema not set"); requireNonNull(producerConfig, "producerConfig not set"); - ClosureCleaner.ensureSerializable(customPartitioner); + ClosureCleaner.clean(customPartitioner, true); ClosureCleaner.ensureSerializable(serializationSchema); this.defaultTopicId = defaultTopicId; http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java index 7ce3a9d..7ee3079 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java @@ -49,24 +49,24 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public abstract class AbstractFetcher<T, KPH> { - private static final int NO_TIMESTAMPS_WATERMARKS = 0; - private static final int PERIODIC_WATERMARKS = 1; - private static final int PUNCTUATED_WATERMARKS = 2; + protected static final int NO_TIMESTAMPS_WATERMARKS = 0; + protected static final int PERIODIC_WATERMARKS = 1; + protected static final int PUNCTUATED_WATERMARKS = 2; // ------------------------------------------------------------------------ /** The source context to emit records and watermarks to */ - private final SourceContext<T> sourceContext; + protected final SourceContext<T> sourceContext; /** The lock that guarantees that record emission and state updates are atomic, * from the view of taking a checkpoint */ - private final Object checkpointLock; + protected final Object checkpointLock; /** All partitions (and their state) that this fetcher is subscribed to */ private final KafkaTopicPartitionState<KPH>[] allPartitions; /** The mode describing whether the fetcher also generates timestamps and watermarks */ - private final int timestampWatermarkMode; + protected final int timestampWatermarkMode; /** Flag whether to register metrics for the fetcher */ protected final boolean useMetrics; @@ -207,30 +207,34 @@ public abstract class AbstractFetcher<T, KPH> { // ------------------------------------------------------------------------ /** - * * <p>Implementation Note: This method is kept brief to be JIT inlining friendly. * That makes the fast path efficient, the extended paths are called as separate methods. - * * @param record The record to emit * @param partitionState The state of the Kafka partition from which the record was fetched - * @param offset The offset from which the record was fetched + * @param offset The offset of the record + * @param timestamp The record's event-timestamp */ - protected final void emitRecord(T record, KafkaTopicPartitionState<KPH> partitionState, long offset) { + protected void emitRecord(T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long timestamp) throws Exception { if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) { // fast path logic, in case there are no watermarks // emit the record, using the checkpoint lock to guarantee // atomicity of record emission and offset state update synchronized (checkpointLock) { - sourceContext.collect(record); + if(timestamp != Long.MIN_VALUE) { + // this case is true for Kafka 0.10 + sourceContext.collectWithTimestamp(record, timestamp); + } else { + sourceContext.collect(record); + } partitionState.setOffset(offset); } } else if (timestampWatermarkMode == PERIODIC_WATERMARKS) { - emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset); + emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset, timestamp); } else { - emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset); + emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset, timestamp); } } @@ -238,8 +242,8 @@ public abstract class AbstractFetcher<T, KPH> { * Record emission, if a timestamp will be attached from an assigner that is * also a periodic watermark generator. */ - private void emitRecordWithTimestampAndPeriodicWatermark( - T record, KafkaTopicPartitionState<KPH> partitionState, long offset) + protected void emitRecordWithTimestampAndPeriodicWatermark( + T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long kafkaEventTimestamp) { @SuppressWarnings("unchecked") final KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> withWatermarksState = @@ -251,7 +255,7 @@ public abstract class AbstractFetcher<T, KPH> { final long timestamp; //noinspection SynchronizationOnLocalVariableOrMethodParameter synchronized (withWatermarksState) { - timestamp = withWatermarksState.getTimestampForRecord(record); + timestamp = withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp); } // emit the record with timestamp, using the usual checkpoint lock to guarantee @@ -266,8 +270,8 @@ public abstract class AbstractFetcher<T, KPH> { * Record emission, if a timestamp will be attached from an assigner that is * also a punctuated watermark generator. */ - private void emitRecordWithTimestampAndPunctuatedWatermark( - T record, KafkaTopicPartitionState<KPH> partitionState, long offset) + protected void emitRecordWithTimestampAndPunctuatedWatermark( + T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long kafkaEventTimestamp) { @SuppressWarnings("unchecked") final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState = @@ -275,7 +279,7 @@ public abstract class AbstractFetcher<T, KPH> { // only one thread ever works on accessing timestamps and watermarks // from the punctuated extractor - final long timestamp = withWatermarksState.getTimestampForRecord(record); + final long timestamp = withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp); final Watermark newWatermark = withWatermarksState.checkAndGetNewWatermark(record, timestamp); // emit the record with timestamp, using the usual checkpoint lock to guarantee @@ -291,6 +295,7 @@ public abstract class AbstractFetcher<T, KPH> { updateMinPunctuatedWatermark(newWatermark); } } + /** *Checks whether a new per-partition watermark is also a new cross-partition watermark. */ http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java index 99c5d69..efdc73f 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java @@ -49,8 +49,8 @@ public final class KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> extend // ------------------------------------------------------------------------ - public long getTimestampForRecord (T record) { - return timestampsAndWatermarks.extractTimestamp(record, Long.MIN_VALUE); + public long getTimestampForRecord(T record, long kafkaEventTimestamp) { + return timestampsAndWatermarks.extractTimestamp(record, kafkaEventTimestamp); } public long getCurrentWatermarkTimestamp() { http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java index b265990..edf40ce 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java @@ -54,8 +54,8 @@ public final class KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> exte // ------------------------------------------------------------------------ - public long getTimestampForRecord(T record) { - return timestampsAndWatermarks.extractTimestamp(record, Long.MIN_VALUE); + public long getTimestampForRecord(T record, long kafkaEventTimestamp) { + return timestampsAndWatermarks.extractTimestamp(record, kafkaEventTimestamp); } @Nullable http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index bafff4f..3c967ba 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -68,6 +68,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; @@ -91,6 +92,7 @@ import org.apache.flink.test.util.SuccessException; import org.apache.flink.testutils.junit.RetryOnException; import org.apache.flink.testutils.junit.RetryRule; import org.apache.flink.util.Collector; +import org.apache.flink.util.StringUtils; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.errors.TimeoutException; import org.junit.Assert; @@ -185,7 +187,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { stream.print(); see.execute("No broker test"); } catch(ProgramInvocationException pie) { - if(kafkaServer.getVersion().equals("0.9")) { + if(kafkaServer.getVersion().equals("0.9") || kafkaServer.getVersion().equals("0.10")) { assertTrue(pie.getCause() instanceof JobExecutionException); JobExecutionException jee = (JobExecutionException) pie.getCause(); @@ -287,8 +289,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings); producerProperties.setProperty("retries", "3"); producerProperties.putAll(secureProps); - FlinkKafkaProducerBase<Tuple2<Long, String>> prod = kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null); - stream.addSink(prod); + kafkaServer.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null); // ----------- add consumer dataflow ---------- @@ -516,7 +517,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { // launch a producer thread DataGenerators.InfiniteStringsGenerator generator = - new DataGenerators.InfiniteStringsGenerator(kafkaServer, topic); + new DataGenerators.InfiniteStringsGenerator(kafkaServer, topic, flinkPort); generator.start(); // launch a consumer asynchronously @@ -539,7 +540,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { env.addSource(source).addSink(new DiscardingSink<String>()); - env.execute(); + env.execute("Runner for CancelingOnFullInputTest"); } catch (Throwable t) { jobError.set(t); @@ -560,7 +561,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { } // cancel - JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout)); + JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout), "Runner for CancelingOnFullInputTest"); // wait for the program to be done and validate that we failed with the right exception runnerThread.join(); @@ -570,6 +571,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { assertTrue(failueCause.getMessage().contains("Job was cancelled")); if (generator.isAlive()) { + JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout), "String generator"); generator.shutdown(); generator.join(); } @@ -613,7 +615,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { env.addSource(source).addSink(new DiscardingSink<String>()); - env.execute(); + env.execute("CancelingOnEmptyInputTest"); } catch (Throwable t) { LOG.error("Job Runner failed with exception", t); @@ -671,7 +673,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { .addSink(new DiscardingSink<Integer>()); try { - env.execute(); + env.execute("test fail on deploy"); fail("this test should fail with an exception"); } catch (ProgramInvocationException e) { @@ -738,8 +740,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { Properties props = new Properties(); props.putAll(standardProps); props.putAll(secureProps); - - stream.addSink(kafkaServer.getProducer("dummy", schema, props, null)); + kafkaServer.produceIntoKafka(stream, "dummy", schema, props, null); env.execute("Write to topics"); @@ -954,7 +955,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { } }); - stream.addSink(kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(serSchema), producerProps, null)); + kafkaServer.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper<>(serSchema), producerProps, null); tryExecute(env, "big topology test"); deleteTestTopic(topic); @@ -1046,8 +1047,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { KeyedSerializationSchema<Tuple2<Long, PojoValue>> schema = new TypeInformationKeyValueSerializationSchema<>(Long.class, PojoValue.class, env.getConfig()); Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings); producerProperties.setProperty("retries", "3"); - producerProperties.putAll(secureProps); - kvStream.addSink(kafkaServer.getProducer(topic, schema, producerProperties, null)); + kafkaServer.produceIntoKafka(kvStream, topic, schema, producerProperties, null); env.execute("Write KV to Kafka"); // ----------- Read the data again ------------------- @@ -1132,8 +1132,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings); producerProperties.setProperty("retries", "3"); producerProperties.putAll(secureProps); - - kvStream.addSink(kafkaServer.getProducer(topic, schema, producerProperties, null)); + kafkaServer.produceIntoKafka(kvStream, topic, schema, producerProperties, null); env.execute("Write deletes to Kafka"); @@ -1229,7 +1228,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { props.putAll(secureProps); TypeInformationSerializationSchema<Tuple2<Integer, Integer>> schema = new TypeInformationSerializationSchema<>(TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>"), env1.getConfig()); - DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, props)); + DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, standardProps)); fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer, Integer>, Void>() { @Override public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception {// no op @@ -1254,7 +1253,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { } }); - fromGen.addSink(kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(schema), props, null)); + kafkaServer.produceIntoKafka(fromGen, topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null); env1.execute("Metrics test job"); } catch(Throwable t) { @@ -1369,8 +1368,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { final TypeInformationSerializationSchema<Tuple2<Integer, Integer>> deser = new TypeInformationSerializationSchema<>(intIntTupleType, env.getConfig()); - cc.putAll(secureProps); // create the consumer + cc.putAll(secureProps); FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> consumer = kafkaServer.getConsumer(topicName, deser, cc); DataStream<Tuple2<Integer, Integer>> source = env @@ -1474,9 +1473,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { producerProperties.setProperty("retries", "0"); producerProperties.putAll(secureProps); - stream.addSink(kafkaServer.getProducer( - topicName, serSchema, producerProperties, - new Tuple2Partitioner(parallelism))) + kafkaServer.produceIntoKafka(stream, topicName, serSchema, producerProperties, new Tuple2Partitioner(parallelism)) .setParallelism(parallelism); try { @@ -1773,86 +1770,88 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { // deleteTestTopic(topic); // } // } - - private void executeAndCatchException(StreamExecutionEnvironment env, String execName) throws Exception { - try { - tryExecutePropagateExceptions(env, execName); - } - catch (ProgramInvocationException | JobExecutionException e) { - // look for NotLeaderForPartitionException - Throwable cause = e.getCause(); - - // search for nested SuccessExceptions - int depth = 0; - while (cause != null && depth++ < 20) { - if (cause instanceof kafka.common.NotLeaderForPartitionException) { - throw (Exception) cause; - } - cause = cause.getCause(); - } - throw e; - } - } - - private void putDataInTopics(StreamExecutionEnvironment env, - Properties producerProperties, - final int elementsPerPartition, - Map<String, Boolean> topics, - TypeInformation<Tuple2<Long, Integer>> outputTypeInfo) { - if(topics.size() != 2) { - throw new RuntimeException("This method accepts two topics as arguments."); - } - - TypeInformationSerializationSchema<Tuple2<Long, Integer>> sinkSchema = - new TypeInformationSerializationSchema<>(outputTypeInfo, env.getConfig()); - - DataStream<Tuple2<Long, Integer>> stream = env - .addSource(new RichParallelSourceFunction<Tuple2<Long, Integer>>() { - private boolean running = true; - - @Override - public void run(SourceContext<Tuple2<Long, Integer>> ctx) throws InterruptedException { - int topic = 0; - int currentTs = 1; - - while (running && currentTs < elementsPerPartition) { - long timestamp = (currentTs % 10 == 0) ? -1L : currentTs; - ctx.collect(new Tuple2<Long, Integer>(timestamp, topic)); - currentTs++; - } - - Tuple2<Long, Integer> toWrite2 = new Tuple2<Long, Integer>(-1L, topic); - ctx.collect(toWrite2); - } - - @Override - public void cancel() { - running = false; - } - }).setParallelism(1); - - List<Map.Entry<String, Boolean>> topicsL = new ArrayList<>(topics.entrySet()); - stream.map(new MapFunction<Tuple2<Long,Integer>, Tuple2<Long,Integer>>() { - - @Override - public Tuple2<Long, Integer> map(Tuple2<Long, Integer> value) throws Exception { - return value; - } - }).setParallelism(1).addSink(kafkaServer.getProducer(topicsL.get(0).getKey(), - new KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null)).setParallelism(1); - - if(!topicsL.get(1).getValue()) { - stream.map(new MapFunction<Tuple2<Long,Integer>, Tuple2<Long,Integer>>() { - - @Override - public Tuple2<Long, Integer> map(Tuple2<Long, Integer> value) throws Exception { - long timestamp = (value.f0 == -1) ? -1L : 1000 + value.f0; - return new Tuple2<Long, Integer>(timestamp, 1); - } - }).setParallelism(1).addSink(kafkaServer.getProducer(topicsL.get(1).getKey(), - new KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null)).setParallelism(1); - } - } +// +// private void executeAndCatchException(StreamExecutionEnvironment env, String execName) throws Exception { +// try { +// tryExecutePropagateExceptions(env, execName); +// } +// catch (ProgramInvocationException | JobExecutionException e) { +// // look for NotLeaderForPartitionException +// Throwable cause = e.getCause(); +// +// // search for nested SuccessExceptions +// int depth = 0; +// while (cause != null && depth++ < 20) { +// if (cause instanceof kafka.common.NotLeaderForPartitionException) { +// throw (Exception) cause; +// } +// cause = cause.getCause(); +// } +// throw e; +// } +// } +// +// private void putDataInTopics(StreamExecutionEnvironment env, +// Properties producerProperties, +// final int elementsPerPartition, +// Map<String, Boolean> topics, +// TypeInformation<Tuple2<Long, Integer>> outputTypeInfo) { +// if(topics.size() != 2) { +// throw new RuntimeException("This method accepts two topics as arguments."); +// } +// +// TypeInformationSerializationSchema<Tuple2<Long, Integer>> sinkSchema = +// new TypeInformationSerializationSchema<>(outputTypeInfo, env.getConfig()); +// +// DataStream<Tuple2<Long, Integer>> stream = env +// .addSource(new RichParallelSourceFunction<Tuple2<Long, Integer>>() { +// private boolean running = true; +// +// @Override +// public void run(SourceContext<Tuple2<Long, Integer>> ctx) throws InterruptedException { +// int topic = 0; +// int currentTs = 1; +// +// while (running && currentTs < elementsPerPartition) { +// long timestamp = (currentTs % 10 == 0) ? -1L : currentTs; +// ctx.collect(new Tuple2<Long, Integer>(timestamp, topic)); +// currentTs++; +// } +// +// Tuple2<Long, Integer> toWrite2 = new Tuple2<Long, Integer>(-1L, topic); +// ctx.collect(toWrite2); +// } +// +// @Override +// public void cancel() { +// running = false; +// } +// }).setParallelism(1); +// +// List<Map.Entry<String, Boolean>> topicsL = new ArrayList<>(topics.entrySet()); +// +// stream = stream.map(new MapFunction<Tuple2<Long,Integer>, Tuple2<Long,Integer>>() { +// +// @Override +// public Tuple2<Long, Integer> map(Tuple2<Long, Integer> value) throws Exception { +// return value; +// } +// }).setParallelism(1); +// kafkaServer.produceIntoKafka(stream, topicsL.get(0).getKey(), +// new KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null).setParallelism(1); +// +// if(!topicsL.get(1).getValue()) { +// stream.map(new MapFunction<Tuple2<Long,Integer>, Tuple2<Long,Integer>>() { +// +// @Override +// public Tuple2<Long, Integer> map(Tuple2<Long, Integer> value) throws Exception { +// long timestamp = (value.f0 == -1) ? -1L : 1000 + value.f0; +// return new Tuple2<>(timestamp, 1); +// } +// }).setParallelism(1).addSink(kafkaServer.produceIntoKafka(topicsL.get(1).getKey(), +// new KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null)).setParallelism(1); +// } +// } private DataStreamSink<Tuple2<Long, Integer>> runPunctuatedComsumer(StreamExecutionEnvironment env, List<String> topics, http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java index 5bcf406..c925c8f 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java @@ -109,11 +109,10 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase { props.putAll(secureProps); // sink partitions into - stream.addSink(kafkaServer.getProducer(topic, + kafkaServer.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper<>(serSchema), props, - new CustomPartitioner(parallelism))) - .setParallelism(parallelism); + new CustomPartitioner(parallelism)).setParallelism(parallelism); // ------ consuming topology --------- http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java index 9236e78..5c03b78 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java @@ -165,12 +165,10 @@ public class KafkaShortRetentionTestBase implements Serializable { running = false; } }); - Properties props = new Properties(); props.putAll(standardProps); props.putAll(secureProps); - - stream.addSink(kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props, null)); + kafkaServer.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props, null); // ----------- add consumer dataflow ---------- http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java index 6ecde71..ded1fde 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java @@ -18,6 +18,9 @@ package org.apache.flink.streaming.connectors.kafka; import kafka.server.KafkaServer; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; @@ -76,7 +79,9 @@ public abstract class KafkaTestEnvironment { public abstract <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props); - public abstract <T> FlinkKafkaProducerBase<T> getProducer(String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner); + public abstract <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, + KeyedSerializationSchema<T> serSchema, Properties props, + KafkaPartitioner<T> partitioner); // -- leader failure simulation http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java index 9b5d2e6..582311f 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java @@ -46,10 +46,11 @@ public class AbstractFetcherTimestampsTest { @Test public void testPunctuatedWatermarks() throws Exception { + final String testTopic = "test topic name"; List<KafkaTopicPartition> originalPartitions = Arrays.asList( - new KafkaTopicPartition("test topic name", 7), - new KafkaTopicPartition("test topic name", 13), - new KafkaTopicPartition("test topic name", 21)); + new KafkaTopicPartition(testTopic, 7), + new KafkaTopicPartition(testTopic, 13), + new KafkaTopicPartition(testTopic, 21)); TestSourceContext<Long> sourceContext = new TestSourceContext<>(); @@ -65,22 +66,22 @@ public class AbstractFetcherTimestampsTest { // elements generate a watermark if the timestamp is a multiple of three // elements for partition 1 - fetcher.emitRecord(1L, part1, 1L); - fetcher.emitRecord(2L, part1, 2L); - fetcher.emitRecord(3L, part1, 3L); + fetcher.emitRecord(1L, part1, 1L, Long.MIN_VALUE); + fetcher.emitRecord(2L, part1, 2L, Long.MIN_VALUE); + fetcher.emitRecord(3L, part1, 3L, Long.MIN_VALUE); assertEquals(3L, sourceContext.getLatestElement().getValue().longValue()); assertEquals(3L, sourceContext.getLatestElement().getTimestamp()); assertFalse(sourceContext.hasWatermark()); // elements for partition 2 - fetcher.emitRecord(12L, part2, 1L); + fetcher.emitRecord(12L, part2, 1L, Long.MIN_VALUE); assertEquals(12L, sourceContext.getLatestElement().getValue().longValue()); assertEquals(12L, sourceContext.getLatestElement().getTimestamp()); assertFalse(sourceContext.hasWatermark()); // elements for partition 3 - fetcher.emitRecord(101L, part3, 1L); - fetcher.emitRecord(102L, part3, 2L); + fetcher.emitRecord(101L, part3, 1L, Long.MIN_VALUE); + fetcher.emitRecord(102L, part3, 2L, Long.MIN_VALUE); assertEquals(102L, sourceContext.getLatestElement().getValue().longValue()); assertEquals(102L, sourceContext.getLatestElement().getTimestamp()); @@ -89,25 +90,25 @@ public class AbstractFetcherTimestampsTest { assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp()); // advance partition 3 - fetcher.emitRecord(1003L, part3, 3L); - fetcher.emitRecord(1004L, part3, 4L); - fetcher.emitRecord(1005L, part3, 5L); + fetcher.emitRecord(1003L, part3, 3L, Long.MIN_VALUE); + fetcher.emitRecord(1004L, part3, 4L, Long.MIN_VALUE); + fetcher.emitRecord(1005L, part3, 5L, Long.MIN_VALUE); assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue()); assertEquals(1005L, sourceContext.getLatestElement().getTimestamp()); // advance partition 1 beyond partition 2 - this bumps the watermark - fetcher.emitRecord(30L, part1, 4L); + fetcher.emitRecord(30L, part1, 4L, Long.MIN_VALUE); assertEquals(30L, sourceContext.getLatestElement().getValue().longValue()); assertEquals(30L, sourceContext.getLatestElement().getTimestamp()); assertTrue(sourceContext.hasWatermark()); assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp()); // advance partition 2 again - this bumps the watermark - fetcher.emitRecord(13L, part2, 2L); + fetcher.emitRecord(13L, part2, 2L, Long.MIN_VALUE); assertFalse(sourceContext.hasWatermark()); - fetcher.emitRecord(14L, part2, 3L); + fetcher.emitRecord(14L, part2, 3L, Long.MIN_VALUE); assertFalse(sourceContext.hasWatermark()); - fetcher.emitRecord(15L, part2, 3L); + fetcher.emitRecord(15L, part2, 3L, Long.MIN_VALUE); assertTrue(sourceContext.hasWatermark()); assertEquals(15L, sourceContext.getLatestWatermark().getTimestamp()); } @@ -117,11 +118,12 @@ public class AbstractFetcherTimestampsTest { ExecutionConfig config = new ExecutionConfig(); config.setAutoWatermarkInterval(10); - + + final String testTopic = "test topic name"; List<KafkaTopicPartition> originalPartitions = Arrays.asList( - new KafkaTopicPartition("test topic name", 7), - new KafkaTopicPartition("test topic name", 13), - new KafkaTopicPartition("test topic name", 21)); + new KafkaTopicPartition(testTopic, 7), + new KafkaTopicPartition(testTopic, 13), + new KafkaTopicPartition(testTopic, 21)); TestSourceContext<Long> sourceContext = new TestSourceContext<>(); @@ -142,20 +144,20 @@ public class AbstractFetcherTimestampsTest { // elements generate a watermark if the timestamp is a multiple of three // elements for partition 1 - fetcher.emitRecord(1L, part1, 1L); - fetcher.emitRecord(2L, part1, 2L); - fetcher.emitRecord(3L, part1, 3L); + fetcher.emitRecord(1L, part1, 1L, Long.MIN_VALUE); + fetcher.emitRecord(2L, part1, 2L, Long.MIN_VALUE); + fetcher.emitRecord(3L, part1, 3L, Long.MIN_VALUE); assertEquals(3L, sourceContext.getLatestElement().getValue().longValue()); assertEquals(3L, sourceContext.getLatestElement().getTimestamp()); // elements for partition 2 - fetcher.emitRecord(12L, part2, 1L); + fetcher.emitRecord(12L, part2, 1L, Long.MIN_VALUE); assertEquals(12L, sourceContext.getLatestElement().getValue().longValue()); assertEquals(12L, sourceContext.getLatestElement().getTimestamp()); // elements for partition 3 - fetcher.emitRecord(101L, part3, 1L); - fetcher.emitRecord(102L, part3, 2L); + fetcher.emitRecord(101L, part3, 1L, Long.MIN_VALUE); + fetcher.emitRecord(102L, part3, 2L, Long.MIN_VALUE); assertEquals(102L, sourceContext.getLatestElement().getValue().longValue()); assertEquals(102L, sourceContext.getLatestElement().getTimestamp()); @@ -163,14 +165,14 @@ public class AbstractFetcherTimestampsTest { assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp()); // advance partition 3 - fetcher.emitRecord(1003L, part3, 3L); - fetcher.emitRecord(1004L, part3, 4L); - fetcher.emitRecord(1005L, part3, 5L); + fetcher.emitRecord(1003L, part3, 3L, Long.MIN_VALUE); + fetcher.emitRecord(1004L, part3, 4L, Long.MIN_VALUE); + fetcher.emitRecord(1005L, part3, 5L, Long.MIN_VALUE); assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue()); assertEquals(1005L, sourceContext.getLatestElement().getTimestamp()); // advance partition 1 beyond partition 2 - this bumps the watermark - fetcher.emitRecord(30L, part1, 4L); + fetcher.emitRecord(30L, part1, 4L, Long.MIN_VALUE); assertEquals(30L, sourceContext.getLatestElement().getValue().longValue()); assertEquals(30L, sourceContext.getLatestElement().getTimestamp()); @@ -178,9 +180,9 @@ public class AbstractFetcherTimestampsTest { assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp()); // advance partition 2 again - this bumps the watermark - fetcher.emitRecord(13L, part2, 2L); - fetcher.emitRecord(14L, part2, 3L); - fetcher.emitRecord(15L, part2, 3L); + fetcher.emitRecord(13L, part2, 2L, Long.MIN_VALUE); + fetcher.emitRecord(14L, part2, 3L, Long.MIN_VALUE); + fetcher.emitRecord(15L, part2, 3L, Long.MIN_VALUE); // this blocks until the periodic thread emitted the watermark long watermarkTs = sourceContext.getLatestWatermark().getTimestamp(); http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java index 58a5cc3..3f035fd 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java @@ -23,10 +23,10 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeInfoParser; -import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase; import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment; import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner; @@ -35,6 +35,7 @@ import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWra import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; +import java.io.Serializable; import java.util.Properties; import java.util.Random; @@ -63,7 +64,7 @@ public class DataGenerators { int partition = getRuntimeContext().getIndexOfThisSubtask(); while (running && cnt <= to) { - ctx.collect(new Tuple2<Integer, Integer>(partition, cnt)); + ctx.collect(new Tuple2<>(partition, cnt)); cnt++; } } @@ -74,11 +75,11 @@ public class DataGenerators { } }); - stream.addSink(testServer.getProducer(topic, + testServer.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper<>(new TypeInformationSerializationSchema<>(resultType, env.getConfig())), FlinkKafkaProducerBase.getPropertiesFromBrokerList(testServer.getBrokerConnectionString()), new Tuple2Partitioner(numPartitions) - )); + ); env.execute("Data generator (Int, Int) stream to topic " + topic); } @@ -141,38 +142,40 @@ public class DataGenerators { if(secureProps != null) { props.putAll(testServer.getSecureProperties()); } - - stream - .rebalance() - .addSink(testServer.getProducer(topic, - new KeyedSerializationSchemaWrapper<>(new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig())), - props, - new KafkaPartitioner<Integer>() { - @Override - public int partition(Integer next, byte[] serializedKey, byte[] serializedValue, int numPartitions) { - return next % numPartitions; - } - })); + + stream = stream.rebalance(); + testServer.produceIntoKafka(stream, topic, + new KeyedSerializationSchemaWrapper<>(new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig())), + props, + new KafkaPartitioner<Integer>() { + @Override + public int partition(Integer next, byte[] serializedKey, byte[] serializedValue, int numPartitions) { + return next % numPartitions; + } + }); env.execute("Scrambles int sequence generator"); } // ------------------------------------------------------------------------ - public static class InfiniteStringsGenerator extends Thread { + public static class InfiniteStringsGenerator extends Thread implements Serializable { - private final KafkaTestEnvironment server; + private transient KafkaTestEnvironment server; private final String topic; - + + private final int flinkPort; + private volatile Throwable error; private volatile boolean running = true; - public InfiniteStringsGenerator(KafkaTestEnvironment server, String topic) { + public InfiniteStringsGenerator(KafkaTestEnvironment server, String topic, int flinkPort) { this.server = server; this.topic = topic; + this.flinkPort = flinkPort; } @Override @@ -180,28 +183,36 @@ public class DataGenerators { // we manually feed data into the Kafka sink FlinkKafkaProducerBase<String> producer = null; try { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + DataStream<String> stream = env.addSource(new SourceFunction<String>() { + @Override + public void run(SourceContext<String> ctx) throws Exception { + final StringBuilder bld = new StringBuilder(); + final Random rnd = new Random(); + while (running) { + bld.setLength(0); + int len = rnd.nextInt(100) + 1; + for (int i = 0; i < len; i++) { + bld.append((char) (rnd.nextInt(20) + 'a')); + } + + String next = bld.toString(); + ctx.collect(next); + } + } + + @Override + public void cancel() { + running = false; + } + }); + Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(server.getBrokerConnectionString()); producerProperties.setProperty("retries", "3"); - producer = server.getProducer(topic, + server.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), producerProperties, new FixedPartitioner<String>()); - producer.setRuntimeContext(new MockRuntimeContext(1,0)); - producer.open(new Configuration()); - - final StringBuilder bld = new StringBuilder(); - final Random rnd = new Random(); - - while (running) { - bld.setLength(0); - - int len = rnd.nextInt(100) + 1; - for (int i = 0; i < len; i++) { - bld.append((char) (rnd.nextInt(20) + 'a') ); - } - - String next = bld.toString(); - producer.invoke(next); - } + env.execute("String generator"); } catch (Throwable t) { this.error = t; http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java index 028045a..acdad5a 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java @@ -54,6 +54,10 @@ public class JobManagerCommunicationUtils { } public static void cancelCurrentJob(ActorGateway jobManager) throws Exception { + cancelCurrentJob(jobManager, null); + } + + public static void cancelCurrentJob(ActorGateway jobManager, String name) throws Exception { JobStatusMessage status = null; for (int i = 0; i < 200; i++) { @@ -78,8 +82,21 @@ public class JobManagerCommunicationUtils { else if (jobs.size() == 1) { status = jobs.get(0); } - else { - throw new Exception("Could not cancel job - more than one running job."); + else if(name != null) { + for(JobStatusMessage msg: jobs) { + if(msg.getJobName().equals(name)) { + status = msg; + } + } + if(status == null) { + throw new Exception("Could not cancel job - no job matched expected name = '" + name +"' in " + jobs); + } + } else { + String jobNames = ""; + for(JobStatusMessage jsm: jobs) { + jobNames += jsm.getJobName() + ", "; + } + throw new Exception("Could not cancel job - more than one running job: " + jobNames); } }