[flink-playgrounds] 01/02: [FLINK-14160] Add --backpressure option to the ClickEventCount job in the operations playground

2019-09-23 Thread fhueske
This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink-playgrounds.git

commit 41acc3b90bbf43e6879f2e3d9cdded0cac980524
Author: David Anderson 
AuthorDate: Thu Sep 19 20:08:58 2019 +0200

[FLINK-14160] Add --backpressure option to the ClickEventCount job in the 
operations playground

This closes #4.
---
 .../java/flink-playground-clickcountjob/pom.xml|  2 +-
 .../ops/clickcount/ClickEventCount.java| 25 ++--
 .../ops/clickcount/functions/BackpressureMap.java  | 46 ++
 operations-playground/docker-compose.yaml  |  4 +-
 4 files changed, 71 insertions(+), 6 deletions(-)

diff --git 
a/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml 
b/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml
index 3d17fcd..893c11e 100644
--- a/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml
@@ -22,7 +22,7 @@ under the License.
 
org.apache.flink
flink-playground-clickcountjob
-   1-FLINK-1.9_2.11
+   2-FLINK-1.9_2.11
 
flink-playground-clickcountjob
jar
diff --git 
a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
 
b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
index 0316bc6..f3d628c 100644
--- 
a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
+++ 
b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
@@ -18,6 +18,7 @@
 package org.apache.flink.playgrounds.ops.clickcount;
 
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.playgrounds.ops.clickcount.functions.BackpressureMap;
 import 
org.apache.flink.playgrounds.ops.clickcount.functions.ClickEventStatisticsCollector;
 import 
org.apache.flink.playgrounds.ops.clickcount.functions.CountingAggregator;
 import org.apache.flink.playgrounds.ops.clickcount.records.ClickEvent;
@@ -25,6 +26,7 @@ import 
org.apache.flink.playgrounds.ops.clickcount.records.ClickEventDeserializa
 import 
org.apache.flink.playgrounds.ops.clickcount.records.ClickEventStatistics;
 import 
org.apache.flink.playgrounds.ops.clickcount.records.ClickEventStatisticsSerializationSchema;
 import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
 import org.apache.flink.streaming.api.windowing.time.Time;
@@ -47,6 +49,7 @@ import java.util.concurrent.TimeUnit;
  * The Job can be configured via the command line:
  * * "--checkpointing": enables checkpointing
  * * "--event-time": set the StreamTimeCharacteristic to EventTime
+ * * "--backpressure": insert an operator that causes periodic backpressure
  * * "--input-topic": the name of the Kafka Topic to consume {@link 
ClickEvent}s from
  * * "--output-topic": the name of the Kafka Topic to produce {@link 
ClickEventStatistics} to
  * * "--bootstrap.servers": comma-separated list of Kafka brokers
@@ -56,6 +59,7 @@ public class ClickEventCount {
 
public static final String CHECKPOINTING_OPTION = "checkpointing";
public static final String EVENT_TIME_OPTION = "event-time";
+   public static final String BACKPRESSURE_OPTION = "backpressure";
 
public static final Time WINDOW_SIZE = Time.of(15, TimeUnit.SECONDS);
 
@@ -66,6 +70,8 @@ public class ClickEventCount {
 
configureEnvironment(params, env);
 
+   boolean inflictBackpressure = params.has(BACKPRESSURE_OPTION);
+
String inputTopic = params.get("input-topic", "input");
String outputTopic = params.get("output-topic", "output");
String brokers = params.get("bootstrap.servers", 
"localhost:9092");
@@ -73,19 +79,32 @@ public class ClickEventCount {
kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokers);
kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
"click-event-count");
 
-   env.addSource(new FlinkKafkaConsumer<>(inputTopic, new 
ClickEventDeserializationSchema(), kafkaProps))
+   DataStream clicks =
+   env.addSource(new 
FlinkKafkaConsumer<>(inputTopic, new ClickEventDeserializationSchema(), 
kafkaProps))
.name("ClickEvent Source")

[flink-playgrounds] 01/02: [FLINK-14160] Add --backpressure option to the ClickEventCount job in the operations playground

2019-09-23 Thread fhueske
This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-playgrounds.git

commit 1c7c254fc7827e74db7c3c387348e7ca2219788a
Author: David Anderson 
AuthorDate: Thu Sep 19 20:08:58 2019 +0200

[FLINK-14160] Add --backpressure option to the ClickEventCount job in the 
operations playground

This closes #4.
---
 .../java/flink-playground-clickcountjob/pom.xml|  2 +-
 .../ops/clickcount/ClickEventCount.java| 25 ++--
 .../ops/clickcount/functions/BackpressureMap.java  | 46 ++
 operations-playground/docker-compose.yaml  |  4 +-
 4 files changed, 71 insertions(+), 6 deletions(-)

diff --git 
a/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml 
b/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml
index 3d17fcd..893c11e 100644
--- a/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml
@@ -22,7 +22,7 @@ under the License.
 
org.apache.flink
flink-playground-clickcountjob
-   1-FLINK-1.9_2.11
+   2-FLINK-1.9_2.11
 
flink-playground-clickcountjob
jar
diff --git 
a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
 
b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
index 0316bc6..f3d628c 100644
--- 
a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
+++ 
b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
@@ -18,6 +18,7 @@
 package org.apache.flink.playgrounds.ops.clickcount;
 
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.playgrounds.ops.clickcount.functions.BackpressureMap;
 import 
org.apache.flink.playgrounds.ops.clickcount.functions.ClickEventStatisticsCollector;
 import 
org.apache.flink.playgrounds.ops.clickcount.functions.CountingAggregator;
 import org.apache.flink.playgrounds.ops.clickcount.records.ClickEvent;
@@ -25,6 +26,7 @@ import 
org.apache.flink.playgrounds.ops.clickcount.records.ClickEventDeserializa
 import 
org.apache.flink.playgrounds.ops.clickcount.records.ClickEventStatistics;
 import 
org.apache.flink.playgrounds.ops.clickcount.records.ClickEventStatisticsSerializationSchema;
 import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
 import org.apache.flink.streaming.api.windowing.time.Time;
@@ -47,6 +49,7 @@ import java.util.concurrent.TimeUnit;
  * The Job can be configured via the command line:
  * * "--checkpointing": enables checkpointing
  * * "--event-time": set the StreamTimeCharacteristic to EventTime
+ * * "--backpressure": insert an operator that causes periodic backpressure
  * * "--input-topic": the name of the Kafka Topic to consume {@link 
ClickEvent}s from
  * * "--output-topic": the name of the Kafka Topic to produce {@link 
ClickEventStatistics} to
  * * "--bootstrap.servers": comma-separated list of Kafka brokers
@@ -56,6 +59,7 @@ public class ClickEventCount {
 
public static final String CHECKPOINTING_OPTION = "checkpointing";
public static final String EVENT_TIME_OPTION = "event-time";
+   public static final String BACKPRESSURE_OPTION = "backpressure";
 
public static final Time WINDOW_SIZE = Time.of(15, TimeUnit.SECONDS);
 
@@ -66,6 +70,8 @@ public class ClickEventCount {
 
configureEnvironment(params, env);
 
+   boolean inflictBackpressure = params.has(BACKPRESSURE_OPTION);
+
String inputTopic = params.get("input-topic", "input");
String outputTopic = params.get("output-topic", "output");
String brokers = params.get("bootstrap.servers", 
"localhost:9092");
@@ -73,19 +79,32 @@ public class ClickEventCount {
kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokers);
kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
"click-event-count");
 
-   env.addSource(new FlinkKafkaConsumer<>(inputTopic, new 
ClickEventDeserializationSchema(), kafkaProps))
+   DataStream clicks =
+   env.addSource(new 
FlinkKafkaConsumer<>(inputTopic, new ClickEventDeserializationSchema(), 
kafkaProps))
.name("ClickEvent Source")