[flink-playgrounds] 01/02: [FLINK-14160] Add --backpressure option to the ClickEventCount job in the operations playground
This is an automated email from the ASF dual-hosted git repository. fhueske pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink-playgrounds.git commit 41acc3b90bbf43e6879f2e3d9cdded0cac980524 Author: David Anderson AuthorDate: Thu Sep 19 20:08:58 2019 +0200 [FLINK-14160] Add --backpressure option to the ClickEventCount job in the operations playground This closes #4. --- .../java/flink-playground-clickcountjob/pom.xml| 2 +- .../ops/clickcount/ClickEventCount.java| 25 ++-- .../ops/clickcount/functions/BackpressureMap.java | 46 ++ operations-playground/docker-compose.yaml | 4 +- 4 files changed, 71 insertions(+), 6 deletions(-) diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml b/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml index 3d17fcd..893c11e 100644 --- a/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml +++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml @@ -22,7 +22,7 @@ under the License. org.apache.flink flink-playground-clickcountjob - 1-FLINK-1.9_2.11 + 2-FLINK-1.9_2.11 flink-playground-clickcountjob jar diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java index 0316bc6..f3d628c 100644 --- a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java +++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java @@ -18,6 +18,7 @@ package org.apache.flink.playgrounds.ops.clickcount; import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.playgrounds.ops.clickcount.functions.BackpressureMap; import org.apache.flink.playgrounds.ops.clickcount.functions.ClickEventStatisticsCollector; import org.apache.flink.playgrounds.ops.clickcount.functions.CountingAggregator; import org.apache.flink.playgrounds.ops.clickcount.records.ClickEvent; @@ -25,6 +26,7 @@ import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventDeserializa import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventStatistics; import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventStatisticsSerializationSchema; import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.time.Time; @@ -47,6 +49,7 @@ import java.util.concurrent.TimeUnit; * The Job can be configured via the command line: * * "--checkpointing": enables checkpointing * * "--event-time": set the StreamTimeCharacteristic to EventTime + * * "--backpressure": insert an operator that causes periodic backpressure * * "--input-topic": the name of the Kafka Topic to consume {@link ClickEvent}s from * * "--output-topic": the name of the Kafka Topic to produce {@link ClickEventStatistics} to * * "--bootstrap.servers": comma-separated list of Kafka brokers @@ -56,6 +59,7 @@ public class ClickEventCount { public static final String CHECKPOINTING_OPTION = "checkpointing"; public static final String EVENT_TIME_OPTION = "event-time"; + public static final String BACKPRESSURE_OPTION = "backpressure"; public static final Time WINDOW_SIZE = Time.of(15, TimeUnit.SECONDS); @@ -66,6 +70,8 @@ public class ClickEventCount { configureEnvironment(params, env); + boolean inflictBackpressure = params.has(BACKPRESSURE_OPTION); + String inputTopic = params.get("input-topic", "input"); String outputTopic = params.get("output-topic", "output"); String brokers = params.get("bootstrap.servers", "localhost:9092"); @@ -73,19 +79,32 @@ public class ClickEventCount { kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "click-event-count"); - env.addSource(new FlinkKafkaConsumer<>(inputTopic, new ClickEventDeserializationSchema(), kafkaProps)) + DataStream clicks = + env.addSource(new FlinkKafkaConsumer<>(inputTopic, new ClickEventDeserializationSchema(), kafkaProps)) .name("ClickEvent Source")
[flink-playgrounds] 01/02: [FLINK-14160] Add --backpressure option to the ClickEventCount job in the operations playground
This is an automated email from the ASF dual-hosted git repository. fhueske pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-playgrounds.git commit 1c7c254fc7827e74db7c3c387348e7ca2219788a Author: David Anderson AuthorDate: Thu Sep 19 20:08:58 2019 +0200 [FLINK-14160] Add --backpressure option to the ClickEventCount job in the operations playground This closes #4. --- .../java/flink-playground-clickcountjob/pom.xml| 2 +- .../ops/clickcount/ClickEventCount.java| 25 ++-- .../ops/clickcount/functions/BackpressureMap.java | 46 ++ operations-playground/docker-compose.yaml | 4 +- 4 files changed, 71 insertions(+), 6 deletions(-) diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml b/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml index 3d17fcd..893c11e 100644 --- a/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml +++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml @@ -22,7 +22,7 @@ under the License. org.apache.flink flink-playground-clickcountjob - 1-FLINK-1.9_2.11 + 2-FLINK-1.9_2.11 flink-playground-clickcountjob jar diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java index 0316bc6..f3d628c 100644 --- a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java +++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java @@ -18,6 +18,7 @@ package org.apache.flink.playgrounds.ops.clickcount; import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.playgrounds.ops.clickcount.functions.BackpressureMap; import org.apache.flink.playgrounds.ops.clickcount.functions.ClickEventStatisticsCollector; import org.apache.flink.playgrounds.ops.clickcount.functions.CountingAggregator; import org.apache.flink.playgrounds.ops.clickcount.records.ClickEvent; @@ -25,6 +26,7 @@ import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventDeserializa import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventStatistics; import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventStatisticsSerializationSchema; import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.time.Time; @@ -47,6 +49,7 @@ import java.util.concurrent.TimeUnit; * The Job can be configured via the command line: * * "--checkpointing": enables checkpointing * * "--event-time": set the StreamTimeCharacteristic to EventTime + * * "--backpressure": insert an operator that causes periodic backpressure * * "--input-topic": the name of the Kafka Topic to consume {@link ClickEvent}s from * * "--output-topic": the name of the Kafka Topic to produce {@link ClickEventStatistics} to * * "--bootstrap.servers": comma-separated list of Kafka brokers @@ -56,6 +59,7 @@ public class ClickEventCount { public static final String CHECKPOINTING_OPTION = "checkpointing"; public static final String EVENT_TIME_OPTION = "event-time"; + public static final String BACKPRESSURE_OPTION = "backpressure"; public static final Time WINDOW_SIZE = Time.of(15, TimeUnit.SECONDS); @@ -66,6 +70,8 @@ public class ClickEventCount { configureEnvironment(params, env); + boolean inflictBackpressure = params.has(BACKPRESSURE_OPTION); + String inputTopic = params.get("input-topic", "input"); String outputTopic = params.get("output-topic", "output"); String brokers = params.get("bootstrap.servers", "localhost:9092"); @@ -73,19 +79,32 @@ public class ClickEventCount { kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "click-event-count"); - env.addSource(new FlinkKafkaConsumer<>(inputTopic, new ClickEventDeserializationSchema(), kafkaProps)) + DataStream clicks = + env.addSource(new FlinkKafkaConsumer<>(inputTopic, new ClickEventDeserializationSchema(), kafkaProps)) .name("ClickEvent Source")