This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new c0eb1e3  [FLINK-18011] Make WatermarkStrategy/WatermarkStrategies more 
ergonomic
c0eb1e3 is described below

commit c0eb1e3ad3fceda61e470c584319bddccdf660dd
Author: Aljoscha Krettek <aljos...@apache.org>
AuthorDate: Wed May 27 22:45:26 2020 +0200

    [FLINK-18011] Make WatermarkStrategy/WatermarkStrategies more ergonomic
    
    This removes WatermarkStrategies and instead moves the convenience
    entrypoint methods for strategies directly to WatermarmStrategy.
    
    WatermarkStrategy is now also itself the builder for more complex
    strategies instead of WatermarkStrategies.
---
 docs/dev/connectors/kafka.md                       |  10 +-
 docs/dev/event_timestamp_extractors.md             |  17 +-
 docs/dev/event_timestamps_watermarks.md            |  43 ++--
 .../source/reader/CoordinatedSourceITCase.java     |   8 +-
 .../connectors/kafka/FlinkKafkaConsumerBase.java   |   4 +-
 .../internals/AbstractFetcherWatermarksTest.java   |   4 +-
 .../api/common/eventtime/WatermarkStrategies.java  | 250 ---------------------
 .../api/common/eventtime/WatermarkStrategy.java    | 141 +++++++++++-
 .../eventtime/WatermarkStrategyWithIdleness.java   |  48 ++++
 .../WatermarkStrategyWithTimestampAssigner.java    |  48 ++++
 ...ategiesTest.java => WatermarkStrategyTest.java} |  91 +++++---
 .../flink/streaming/api/datastream/DataStream.java |   4 +-
 .../apache/flink/streaming/api/DataStreamTest.java |  23 ++
 .../api/graph/StreamingJobGraphGeneratorTest.java  |   6 +-
 .../source/SourceOperatorEventTimeTest.java        |  25 +--
 .../operators/source/TestingSourceOperator.java    |   3 +-
 .../TimestampsAndWatermarksOperatorTest.java       |  22 +-
 .../tasks/SourceOperatorStreamTaskTest.java        |   4 +-
 .../flink/streaming/api/scala/DataStream.scala     |   4 +-
 .../api/scala/StreamExecutionEnvironmentTest.scala |   5 +-
 20 files changed, 392 insertions(+), 368 deletions(-)

diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index 6f21ff7..27f18f7 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -372,9 +372,8 @@ properties.setProperty("group.id", "test");
 FlinkKafkaConsumer<String> myConsumer =
     new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties);
 myConsumer.assignTimestampsAndWatermarks(
-    WatermarkStrategies.
-        .<String>forBoundedOutOfOrderness(Duration.ofSeconds(20))
-        .build());
+    WatermarkStrategy.
+        .forBoundedOutOfOrderness(Duration.ofSeconds(20)));
 
 DataStream<String> stream = env.addSource(myConsumer);
 {% endhighlight %}
@@ -388,9 +387,8 @@ properties.setProperty("group.id", "test")
 val myConsumer =
     new FlinkKafkaConsumer("topic", new SimpleStringSchema(), properties);
 myConsumer.assignTimestampsAndWatermarks(
-    WatermarkStrategies.
-        .forBoundedOutOfOrderness[String](Duration.ofSeconds(20))
-        .build())
+    WatermarkStrategy.
+        .forBoundedOutOfOrderness(Duration.ofSeconds(20)))
 
 val stream = env.addSource(myConsumer)
 {% endhighlight %}
diff --git a/docs/dev/event_timestamp_extractors.md 
b/docs/dev/event_timestamp_extractors.md
index a80181d..417658c 100644
--- a/docs/dev/event_timestamp_extractors.md
+++ b/docs/dev/event_timestamp_extractors.md
@@ -53,16 +53,12 @@ unioned, connected, or merged.
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-WatermarkStrategies
-        .<MyType>forMonotonousTimestamps()
-        .build();
+WatermarkStrategy.forMonotonousTimestamps();
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-WatermarkStrategies
-  .forMonotonousTimestamps[MyType]()
-  .build()
+WatermarkStrategy.forMonotonousTimestamps()
 {% endhighlight %}
 </div>
 </div>
@@ -89,16 +85,13 @@ about working with late elements.
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-WatermarkStrategies
-        .<MyType>forBoundedOutOfOrderness(Duration.ofSeconds(10))
-        .build();
+WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-WatermarkStrategies
-  .forBoundedOutOfOrderness[MyType](Duration.ofSeconds(10))
-  .build()
+WatermarkStrategy
+  .forBoundedOutOfOrderness(Duration.ofSeconds(10))
 {% endhighlight %}
 </div>
 </div>
diff --git a/docs/dev/event_timestamps_watermarks.md 
b/docs/dev/event_timestamps_watermarks.md
index 58f78c3..3b8d766 100644
--- a/docs/dev/event_timestamps_watermarks.md
+++ b/docs/dev/event_timestamps_watermarks.md
@@ -41,8 +41,11 @@ Timestamp assignment goes hand-in-hand with generating 
watermarks, which tell
 the system about progress in event time. You can configure this by specifying a
 `WatermarkGenerator`.
 
-The Flink API expects a `WatermarkStrategy` that contains both a 
`TimestampAssigner` and `WatermarkGenerator`.
-A number of common strategies out of the box, available in the 
`WatermarkStrategies` helper, but users can also build their own strategies 
when required. 
+The Flink API expects a `WatermarkStrategy` that contains both a
+`TimestampAssigner` and `WatermarkGenerator`.  A number of common strategies
+are available out of the box as static methods on `WatermarkStrategy`, but
+users can also build their own strategies when required. 
+
 Here is the interface for completeness' sake:
 
 {% highlight java %}
@@ -64,26 +67,26 @@ public interface WatermarkStrategy<T> extends 
TimestampAssignerSupplier<T>, Wate
 {% endhighlight %}
 
 As mentioned, you usually don't implement this interface yourself but use the
-`WatermarkStrategies` helper for using common watermark strategies or to bundle
-together a custom `TimestampAssigner` with a `WatermarkGenerator`. For 
example, to use bounded-of-orderness watermarks and a lambda function as a 
timestamp assigner you use this:
+static helper methods on `WatermarkStrategy` for common watermark strategies or
+to bundle together a custom `TimestampAssigner` with a `WatermarkGenerator`.
+For example, to use bounded-out-of-orderness watermarks and a lambda function 
as a
+timestamp assigner you use this:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-WatermarkStrategies
+WatermarkStrategy
         .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
-        .withTimestampAssigner((event, timestamp) -> event.f0)
-        .build();
+        .withTimestampAssigner((event, timestamp) -> event.f0);
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-WatermarkStrategies
+WatermarkStrategy
   .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
   .withTimestampAssigner(new SerializableTimestampAssigner[(Long, String)] {
     override def extractTimestamp(element: (Long, String), recordTimestamp: 
Long): Long = element._1
   })
-  .build()
 {% endhighlight %}
 
 (Using Scala Lambdas here currently doesn't work because Scala is stupid and 
it's hard to support this. #fus)
@@ -176,23 +179,23 @@ This is a problem because it can happen that some of your 
partitions do still
 carry events. In that case, the watermark will be held back, because it is
 computed as the minimum over all the different parallel watermarks.
 
-To deal with this, you can use a `WatermarkStrategy` that will detect idleness 
and mark an input as idle. `WatermarkStrategies` provides a convenience helper 
for this:
+To deal with this, you can use a `WatermarkStrategy` that will detect idleness
+and mark an input as idle. `WatermarkStrategy` provides a convenience helper
+for this:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-WatermarkStrategies
+WatermarkStrategy
         .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
-        .withIdleness(Duration.ofMinutes(1))
-        .build();
+        .withIdleness(Duration.ofMinutes(1));
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-WatermarkStrategies
+WatermarkStrategy
   .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
   .withIdleness(Duration.ofMinutes(1))
-  .build()
 {% endhighlight %}
 </div>
 </div>
@@ -434,9 +437,8 @@ case.
 {% highlight java %}
 FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>("myTopic", 
schema, props);
 kafkaSource.assignTimestampsAndWatermarks(
-        WatermarkStrategies.
-                .<MyType>forBoundedOutOfOrderness(Duration.ofSeconds(20))
-                .build());
+        WatermarkStrategy.
+                .forBoundedOutOfOrderness(Duration.ofSeconds(20)));
 
 DataStream<MyType> stream = env.addSource(kafkaSource);
 {% endhighlight %}
@@ -445,9 +447,8 @@ DataStream<MyType> stream = env.addSource(kafkaSource);
 {% highlight scala %}
 val kafkaSource = new FlinkKafkaConsumer[MyType]("myTopic", schema, props)
 kafkaSource.assignTimestampsAndWatermarks(
-  WatermarkStrategies
-    .forBoundedOutOfOrderness[MyType](Duration.ofSeconds(20))
-    .build())
+  WatermarkStrategy
+    .forBoundedOutOfOrderness(Duration.ofSeconds(20)))
 
 val stream: DataStream[MyType] = env.addSource(kafkaSource)
 {% endhighlight %}
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
index a7f4c22..6582210 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
@@ -19,7 +19,7 @@
 package org.apache.flink.connector.base.source.reader;
 
 import org.apache.flink.api.common.accumulators.ListAccumulator;
-import org.apache.flink.api.common.eventtime.WatermarkStrategies;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.configuration.Configuration;
@@ -47,7 +47,7 @@ public class CoordinatedSourceITCase extends AbstractTestBase 
{
                MockBaseSource source = new MockBaseSource(2, 10, 
Boundedness.BOUNDED);
                DataStream<Integer> stream = env.continuousSource(
                                source,
-                               
WatermarkStrategies.<Integer>noWatermarks().build(),
+                               WatermarkStrategy.noWatermarks(),
                                "TestingSource");
                executeAndVerify(env, stream, 20);
        }
@@ -59,11 +59,11 @@ public class CoordinatedSourceITCase extends 
AbstractTestBase {
                MockBaseSource source2 = new MockBaseSource(2, 10, 20, 
Boundedness.BOUNDED);
                DataStream<Integer> stream1 = env.continuousSource(
                                source1,
-                               
WatermarkStrategies.<Integer>noWatermarks().build(),
+                               WatermarkStrategy.noWatermarks(),
                                "TestingSource1");
                DataStream<Integer> stream2 = env.continuousSource(
                                source2,
-                               
WatermarkStrategies.<Integer>noWatermarks().build(),
+                               WatermarkStrategy.noWatermarks(),
                                "TestingSource2");
                executeAndVerify(env, stream1.union(stream2), 40);
        }
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 0e8c261..67d65fb 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -280,8 +280,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
         * strictly ascending per Kafka partition, they will not be strictly 
ascending in the resulting
         * Flink DataStream, if the parallel source subtask reads more than one 
partition.
         *
-        * <p>Common watermark generation patterns can be found in the
-        * {@link org.apache.flink.api.common.eventtime.WatermarkStrategies} 
class.
+        * <p>Common watermark generation patterns can be found as static 
methods in the
+        * {@link org.apache.flink.api.common.eventtime.WatermarkStrategy} 
class.
         *
         * @return The consumer object, to allow function chaining.
         */
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherWatermarksTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherWatermarksTest.java
index e33ea4a..83a4f68 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherWatermarksTest.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherWatermarksTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.connectors.kafka.internals;
 
 import org.apache.flink.api.common.eventtime.WatermarkGenerator;
 import org.apache.flink.api.common.eventtime.WatermarkOutput;
-import org.apache.flink.api.common.eventtime.WatermarkStrategies;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
@@ -72,10 +71,9 @@ public class AbstractFetcherWatermarksTest {
                public static Collection<WatermarkStrategy<Long>> getParams() {
                        return Arrays.asList(
                                        new 
AssignerWithPeriodicWatermarksAdapter.Strategy<>(new PeriodicTestExtractor()),
-                                       WatermarkStrategies
+                                       WatermarkStrategy
                                                        .forGenerator((ctx) -> 
new PeriodicTestWatermarkGenerator())
                                                        
.withTimestampAssigner((event, previousTimestamp) -> event)
-                                                       .build()
                        );
                }
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategies.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategies.java
deleted file mode 100644
index 880edc7..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategies.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.eventtime;
-
-import org.apache.flink.annotation.Public;
-
-import javax.annotation.Nullable;
-
-import java.time.Duration;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * WatermarkStrategies is a simply way to build a {@link WatermarkStrategy} by 
configuring
- * common strategies.
- */
-@Public
-public final class WatermarkStrategies<T> {
-
-       /**
-        * The {@link TimestampAssigner} to use. This can be {@code null} for 
cases where records come
-        * out of a source with valid timestamps, for example from Kafka.
-        */
-       @Nullable
-       private TimestampAssignerSupplier<T> timestampAssignerSupplier = null;
-
-       /** The base strategy for watermark generation. Starting point, is 
always set. */
-       private final WatermarkStrategy<T> baseStrategy;
-
-       /** Optional idle timeout for watermarks. */
-       @Nullable
-       private Duration idleTimeout;
-
-       private WatermarkStrategies(WatermarkStrategy<T> baseStrategy) {
-               this.baseStrategy = baseStrategy;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  builder methods
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Add an idle timeout to the watermark strategy.
-        * If no records flow in a partition of a stream for that amount of 
time, then that partition
-        * is considered "idle" and will not hold back the progress of 
watermarks in downstream operators.
-        *
-        * <p>Idleness can be important if some partitions have little data and 
might not have events during
-        * some periods. Without idleness, these streams can stall the overall 
event time progress of the
-        * application.
-        */
-       public WatermarkStrategies<T> withIdleness(Duration idleTimeout) {
-               checkNotNull(idleTimeout, "idleTimeout");
-               checkArgument(!(idleTimeout.isZero() || 
idleTimeout.isNegative()), "idleTimeout must be greater than zero");
-               this.idleTimeout = idleTimeout;
-               return this;
-       }
-
-       /**
-        * Adds the given {@link TimestampAssigner} (via a {@link 
TimestampAssignerSupplier}) to this
-        * {@link WatermarkStrategies}.
-        *
-        * <p>You can use this when a {@link TimestampAssigner} needs 
additional context, for example
-        * access to the metrics system.
-        *
-        * <pre>
-        * {@code WatermarkStrategy<Object> wmStrategy = WatermarkStrategies
-        *   .forMonotonousTimestamps()
-        *   .withTimestampAssigner((ctx) -> new MetricsReportingAssigner(ctx))
-        *   .build();
-        * }</pre>
-        */
-       public WatermarkStrategies<T> 
withTimestampAssigner(TimestampAssignerSupplier<T> timestampAssigner) {
-               checkNotNull(timestampAssigner, "timestampAssigner");
-               this.timestampAssignerSupplier = timestampAssigner;
-               return this;
-       }
-
-       /**
-        * Adds the given {@link TimestampAssigner} to this {@link 
WatermarkStrategies}.
-        *
-        * <p>You can use this in case you want to specify a {@link 
TimestampAssigner} via a lambda
-        * function.
-        *
-        * <pre>
-        * {@code WatermarkStrategy<CustomObject> wmStrategy = 
WatermarkStrategies
-        *   .<CustomObject>forMonotonousTimestamps()
-        *   .withTimestampAssigner((event, timestamp) -> event.getTimestamp())
-        *   .build();
-        * }</pre>
-        */
-       public WatermarkStrategies<T> 
withTimestampAssigner(SerializableTimestampAssigner<T> timestampAssigner) {
-               checkNotNull(timestampAssigner, "timestampAssigner");
-               this.timestampAssignerSupplier = 
TimestampAssignerSupplier.of(timestampAssigner);
-               return this;
-       }
-
-       /**
-        * Build the watermark strategy.
-        */
-       public WatermarkStrategy<T> build() {
-               WatermarkStrategy<T> strategy = this.baseStrategy;
-
-               if (idleTimeout != null) {
-                       strategy = new WithIdlenessStrategy<>(strategy, 
idleTimeout);
-               }
-
-               if (timestampAssignerSupplier != null) {
-                       strategy = new WithTimestampAssigner<>(strategy, 
timestampAssignerSupplier);
-               }
-
-               return strategy;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  builder entry points
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Starts building a watermark strategy for situations with 
monotonously ascending
-        * timestamps.
-        *
-        * <p>The watermarks are generated periodically and tightly follow the 
latest
-        * timestamp in the data. The delay introduced by this strategy is 
mainly the periodic
-        * interval in which the watermarks are generated.
-        *
-        * @see AscendingTimestampsWatermarks
-        */
-       public static <T> WatermarkStrategies<T> forMonotonousTimestamps() {
-               return new WatermarkStrategies<>((ctx) -> new 
AscendingTimestampsWatermarks<>());
-       }
-
-       /**
-        * Starts building a watermark strategy for situations where records 
are out of order, but
-        * you can place an upper bound on how far the events are out of order.
-        * An out-of-order bound B means that once the an event with timestamp 
T was encountered, no
-        * events older than {@code T - B} will follow any more.
-        *
-        * <p>The watermarks are generated periodically. The delay introduced 
by this watermark strategy
-        * is the periodic interval length, plus the out of orderness bound.
-        *
-        * @see BoundedOutOfOrdernessWatermarks
-        */
-       public static <T> WatermarkStrategies<T> 
forBoundedOutOfOrderness(Duration maxOutOfOrderness) {
-               return new WatermarkStrategies<>((ctx) -> new 
BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness));
-       }
-
-       /**
-        * Starts building a watermark strategy based on an existing {@code 
WatermarkStrategy}.
-        */
-       public static <T> WatermarkStrategies<T> 
forStrategy(WatermarkStrategy<T> strategy) {
-               return new WatermarkStrategies<>(strategy);
-       }
-
-       /**
-        * Starts building a watermark strategy based on an existing {@link 
WatermarkGeneratorSupplier}.
-        */
-       public static <T> WatermarkStrategies<T> 
forGenerator(WatermarkGeneratorSupplier<T> generatorSupplier) {
-               return new WatermarkStrategies<>(new 
FromWatermarkGeneratorSupplier<>(generatorSupplier));
-       }
-
-       /**
-        * Starts building a watermark strategy that generates no watermarks at 
all.
-        * This may be useful in scenarios that do pure processing-time based 
stream processing.
-        */
-       public static <T> WatermarkStrategies<T> noWatermarks() {
-               return new WatermarkStrategies<>((ctx) -> new 
NoWatermarksGenerator<>());
-       }
-
-       // 
------------------------------------------------------------------------
-
-       private static final class FromWatermarkGeneratorSupplier<T> implements 
WatermarkStrategy<T> {
-               private static final long serialVersionUID = 1L;
-
-               private final WatermarkGeneratorSupplier<T> generatorSupplier;
-
-               private 
FromWatermarkGeneratorSupplier(WatermarkGeneratorSupplier<T> generatorSupplier) 
{
-                       this.generatorSupplier = generatorSupplier;
-               }
-
-               @Override
-               public WatermarkGenerator<T> 
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
-                       return 
generatorSupplier.createWatermarkGenerator(context);
-               }
-       }
-
-       /**
-        * A {@link WatermarkStrategy} that overrides the {@link 
TimestampAssigner} of the given base
-        * {@link WatermarkStrategy}.
-        */
-       private static final class WithTimestampAssigner<T> implements 
WatermarkStrategy<T> {
-               private static final long serialVersionUID = 1L;
-
-               private final WatermarkStrategy<T> baseStrategy;
-               private final TimestampAssignerSupplier<T> timestampAssigner;
-
-               private WithTimestampAssigner(WatermarkStrategy<T> 
baseStrategy, TimestampAssignerSupplier<T> timestampAssigner) {
-                       this.baseStrategy = baseStrategy;
-                       this.timestampAssigner = timestampAssigner;
-               }
-
-               @Override
-               public TimestampAssigner<T> 
createTimestampAssigner(TimestampAssignerSupplier.Context context) {
-                       return 
timestampAssigner.createTimestampAssigner(context);
-               }
-
-               @Override
-               public WatermarkGenerator<T> 
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
-                       return baseStrategy.createWatermarkGenerator(context);
-               }
-       }
-
-       private static final class WithIdlenessStrategy<T> implements 
WatermarkStrategy<T> {
-               private static final long serialVersionUID = 1L;
-
-               private final WatermarkStrategy<T> baseStrategy;
-               private final Duration idlenessTimeout;
-
-               private WithIdlenessStrategy(WatermarkStrategy<T> baseStrategy, 
Duration idlenessTimeout) {
-                       this.baseStrategy = baseStrategy;
-                       this.idlenessTimeout = idlenessTimeout;
-               }
-
-               @Override
-               public TimestampAssigner<T> 
createTimestampAssigner(TimestampAssignerSupplier.Context context) {
-                       return baseStrategy.createTimestampAssigner(context);
-               }
-
-               @Override
-               public WatermarkGenerator<T> 
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
-                       return new 
WatermarksWithIdleness<>(baseStrategy.createWatermarkGenerator(context), 
idlenessTimeout);
-               }
-       }
-}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java
index 254afc1..cf11e47 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java
@@ -21,17 +21,48 @@ package org.apache.flink.api.common.eventtime;
 import org.apache.flink.annotation.Public;
 
 import java.io.Serializable;
+import java.time.Duration;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * The WatermarkStrategy defines how to generate {@link Watermark}s in the 
stream sources. The
  * WatermarkStrategy is a builder/factory for the {@link WatermarkGenerator} 
that generates the
  * watermarks and the {@link TimestampAssigner} which assigns the internal 
timestamp of a record.
  *
+ * <p>This interface is split into three parts: 1) methods that an implementor 
of this interface
+ * needs to implement, 2) builder methods for building a {@code 
WatermarkStrategy} on a base
+ * strategy, 3) convenience methods for constructing a {code 
WatermarkStrategy} for common built-in
+ * strategies or based on a {@link WatermarkGeneratorSupplier}
+ *
+ * <p>Implementors of this interface need only implement {@link 
#createWatermarkGenerator(WatermarkGeneratorSupplier.Context)}.
+ * Optionally, you can implement {@link 
#createTimestampAssigner(TimestampAssignerSupplier.Context)}.
+ *
+ * <p>The builder methods, like {@link #withIdleness(Duration)} or {@link
+ * #createTimestampAssigner(TimestampAssignerSupplier.Context)} create a new 
{@code
+ * WatermarkStrategy} that wraps and enriches a base strategy. The strategy on 
which the method is
+ * called is the base strategy.
+ *
+ * <p>The convenience methods, for example {@link 
#forBoundedOutOfOrderness(Duration)}, create a
+ * {@code WatermarkStrategy} for common built in strategies.
+ *
  * <p>This interface is {@link Serializable} because watermark strategies may 
be shipped
  * to workers during distributed execution.
  */
 @Public
-public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, 
WatermarkGeneratorSupplier<T> {
+public interface WatermarkStrategy<T> extends
+               TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T> {
+
+       // 
------------------------------------------------------------------------
+       //  Methods that implementors need to implement.
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Instantiates a WatermarkGenerator that generates watermarks 
according to this strategy.
+        */
+       @Override
+       WatermarkGenerator<T> 
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
 
        /**
         * Instantiates a {@link TimestampAssigner} for assigning timestamps 
according to this
@@ -44,9 +75,111 @@ public interface WatermarkStrategy<T> extends 
TimestampAssignerSupplier<T>, Wate
                return new RecordTimestampAssigner<>();
        }
 
+       // 
------------------------------------------------------------------------
+       //  Builder methods for enriching a base WatermarkStrategy
+       // 
------------------------------------------------------------------------
+
        /**
-        * Instantiates a WatermarkGenerator that generates watermarks 
according to this strategy.
+        * Creates a new {@code WatermarkStrategy} that wraps this strategy but 
instead uses the given
+        * {@link TimestampAssigner} (via a {@link TimestampAssignerSupplier}).
+        *
+        * <p>You can use this when a {@link TimestampAssigner} needs 
additional context, for example
+        * access to the metrics system.
+        *
+        * <pre>
+        * {@code WatermarkStrategy<Object> wmStrategy = WatermarkStrategy
+        *   .forMonotonousTimestamps()
+        *   .withTimestampAssigner((ctx) -> new MetricsReportingAssigner(ctx));
+        * }</pre>
         */
-       @Override
-       WatermarkGenerator<T> 
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
+       default WatermarkStrategy<T> 
withTimestampAssigner(TimestampAssignerSupplier<T> timestampAssigner) {
+               checkNotNull(timestampAssigner, "timestampAssigner");
+               return new WatermarkStrategyWithTimestampAssigner<>(this, 
timestampAssigner);
+       }
+
+       /**
+        * Creates a new {@code WatermarkStrategy} that wraps this strategy but 
instead uses the given
+        * {@link SerializableTimestampAssigner}.
+        *
+        * <p>You can use this in case you want to specify a {@link 
TimestampAssigner} via a lambda
+        * function.
+        *
+        * <pre>
+        * {@code WatermarkStrategy<CustomObject> wmStrategy = WatermarkStrategy
+        *   .forMonotonousTimestamps()
+        *   .withTimestampAssigner((event, timestamp) -> event.getTimestamp());
+        * }</pre>
+        */
+       default WatermarkStrategy<T> 
withTimestampAssigner(SerializableTimestampAssigner<T> timestampAssigner) {
+               checkNotNull(timestampAssigner, "timestampAssigner");
+               return new WatermarkStrategyWithTimestampAssigner<>(this,
+                               
TimestampAssignerSupplier.of(timestampAssigner));
+       }
+
+       /**
+        * Creates a new enriched {@link WatermarkStrategy} that also does 
idleness detection in the
+        * created {@link WatermarkGenerator}.
+        *
+        * <p>Add an idle timeout to the watermark strategy. If no records flow 
in a partition of a
+        * stream for that amount of time, then that partition is considered 
"idle" and will not hold
+        * back the progress of watermarks in downstream operators.
+        *
+        * <p>Idleness can be important if some partitions have little data and 
might not have events
+        * during some periods. Without idleness, these streams can stall the 
overall event time
+        * progress of the application.
+        */
+       default WatermarkStrategy<T> withIdleness(Duration idleTimeout) {
+               checkNotNull(idleTimeout, "idleTimeout");
+               checkArgument(!(idleTimeout.isZero() || 
idleTimeout.isNegative()),
+                               "idleTimeout must be greater than zero");
+               return new WatermarkStrategyWithIdleness<>(this, idleTimeout);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Convenience methods for common watermark strategies
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates a watermark strategy for situations with monotonously 
ascending timestamps.
+        *
+        * <p>The watermarks are generated periodically and tightly follow the 
latest
+        * timestamp in the data. The delay introduced by this strategy is 
mainly the periodic interval
+        * in which the watermarks are generated.
+        *
+        * @see AscendingTimestampsWatermarks
+        */
+       static <T> WatermarkStrategy<T> forMonotonousTimestamps() {
+               return (ctx) -> new AscendingTimestampsWatermarks<>();
+       }
+
+       /**
+        * Creates a watermark strategy for situations where records are out of 
order, but you can place
+        * an upper bound on how far the events are out of order. An 
out-of-order bound B means that
+        * once the an event with timestamp T was encountered, no events older 
than {@code T - B} will
+        * follow any more.
+        *
+        * <p>The watermarks are generated periodically. The delay introduced 
by this watermark
+        * strategy is the periodic interval length, plus the out of orderness 
bound.
+        *
+        * @see BoundedOutOfOrdernessWatermarks
+        */
+       static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration 
maxOutOfOrderness) {
+               return (ctx) -> new 
BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);
+       }
+
+       /**
+        * Creates a watermark strategy based on an existing {@link 
WatermarkGeneratorSupplier}.
+        */
+       static <T> WatermarkStrategy<T> 
forGenerator(WatermarkGeneratorSupplier<T> generatorSupplier) {
+               return generatorSupplier::createWatermarkGenerator;
+       }
+
+       /**
+        * Creates a watermark strategy that generates no watermarks at all. 
This may be useful in
+        * scenarios that do pure processing-time based stream processing.
+        */
+       static <T> WatermarkStrategy<T> noWatermarks() {
+               return (ctx) -> new NoWatermarksGenerator<>();
+       }
+
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategyWithIdleness.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategyWithIdleness.java
new file mode 100644
index 0000000..98c3374
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategyWithIdleness.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.eventtime;
+
+import java.time.Duration;
+
+/**
+ * A {@link WatermarkStrategy} that adds idleness detection on top of the 
wrapped strategy.
+ */
+final class WatermarkStrategyWithIdleness<T> implements WatermarkStrategy<T> {
+
+       private static final long serialVersionUID = 1L;
+
+       private final WatermarkStrategy<T> baseStrategy;
+       private final Duration idlenessTimeout;
+
+       WatermarkStrategyWithIdleness(WatermarkStrategy<T> baseStrategy, 
Duration idlenessTimeout) {
+               this.baseStrategy = baseStrategy;
+               this.idlenessTimeout = idlenessTimeout;
+       }
+
+       @Override
+       public TimestampAssigner<T> 
createTimestampAssigner(TimestampAssignerSupplier.Context context) {
+               return baseStrategy.createTimestampAssigner(context);
+       }
+
+       @Override
+       public WatermarkGenerator<T> 
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
+               return new 
WatermarksWithIdleness<>(baseStrategy.createWatermarkGenerator(context),
+                               idlenessTimeout);
+       }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategyWithTimestampAssigner.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategyWithTimestampAssigner.java
new file mode 100644
index 0000000..f83779a
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategyWithTimestampAssigner.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.eventtime;
+
+/**
+ * A {@link WatermarkStrategy} that overrides the {@link TimestampAssigner} of 
the given base {@link
+ * WatermarkStrategy}.
+ */
+final class WatermarkStrategyWithTimestampAssigner<T> implements 
WatermarkStrategy<T> {
+
+       private static final long serialVersionUID = 1L;
+
+       private final WatermarkStrategy<T> baseStrategy;
+       private final TimestampAssignerSupplier<T> timestampAssigner;
+
+       WatermarkStrategyWithTimestampAssigner(
+                       WatermarkStrategy<T> baseStrategy,
+                       TimestampAssignerSupplier<T> timestampAssigner) {
+               this.baseStrategy = baseStrategy;
+               this.timestampAssigner = timestampAssigner;
+       }
+
+       @Override
+       public TimestampAssigner<T> 
createTimestampAssigner(TimestampAssignerSupplier.Context context) {
+               return timestampAssigner.createTimestampAssigner(context);
+       }
+
+       @Override
+       public WatermarkGenerator<T> 
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
+               return baseStrategy.createWatermarkGenerator(context);
+       }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkStrategiesTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkStrategyTest.java
similarity index 72%
rename from 
flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkStrategiesTest.java
rename to 
flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkStrategyTest.java
index 6136bb7..857b994 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkStrategiesTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkStrategyTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.metrics.MetricGroup;
 import org.junit.Test;
 
 import java.io.Serializable;
+import java.time.Duration;
 import java.util.Map;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
@@ -37,97 +38,115 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
 
 /**
- * Test for the {@link WatermarkStrategies} class.
+ * Test for the {@link WatermarkStrategy} class.
  */
-public class WatermarkStrategiesTest {
+public class WatermarkStrategyTest {
 
        @Test
        public void testDefaultTimeStampAssigner() {
-               WatermarkStrategy<Object> wmStrategy = WatermarkStrategies
-                               .forMonotonousTimestamps()
-                               .build();
+               WatermarkStrategy<Object> wmStrategy = WatermarkStrategy
+                               .forMonotonousTimestamps();
+
                // ensure that the closure can be cleaned through the 
WatermarkStategies
                ClosureCleaner.clean(wmStrategy, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
 
-               
assertThat(wmStrategy.createTimestampAssigner(assignerContext()), 
instanceOf(RecordTimestampAssigner.class));
+               
assertThat(wmStrategy.createTimestampAssigner(assignerContext()),
+                               instanceOf(RecordTimestampAssigner.class));
        }
 
        @Test
        public void testLambdaTimestampAssigner() {
-               WatermarkStrategy<Object> wmStrategy = WatermarkStrategies
+               WatermarkStrategy<Object> wmStrategy = WatermarkStrategy
                                .forMonotonousTimestamps()
-                               .withTimestampAssigner((event, timestamp) -> 
42L)
-                               .build();
+                               .withTimestampAssigner((event, timestamp) -> 
42L);
+
                // ensure that the closure can be cleaned through the 
WatermarkStategies
                ClosureCleaner.clean(wmStrategy, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
 
-               TimestampAssigner<Object> timestampAssigner = 
wmStrategy.createTimestampAssigner(assignerContext());
+               TimestampAssigner<Object> timestampAssigner = wmStrategy
+                               .createTimestampAssigner(assignerContext());
 
                assertThat(timestampAssigner.extractTimestamp(null, 13L), 
is(42L));
        }
 
        @Test
        public void testLambdaTimestampAssignerSupplier() {
-               WatermarkStrategy<Object> wmStrategy = WatermarkStrategies
+               WatermarkStrategy<Object> wmStrategy = WatermarkStrategy
                                .forMonotonousTimestamps()
-                               
.withTimestampAssigner(TimestampAssignerSupplier.of((event, timestamp) -> 42L))
-                               .build();
+                               
.withTimestampAssigner(TimestampAssignerSupplier.of((event, timestamp) -> 42L));
                // ensure that the closure can be cleaned through the 
WatermarkStategies
                ClosureCleaner.clean(wmStrategy, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
 
-               TimestampAssigner<Object> timestampAssigner = 
wmStrategy.createTimestampAssigner(assignerContext());
+               TimestampAssigner<Object> timestampAssigner = wmStrategy
+                               .createTimestampAssigner(assignerContext());
 
                assertThat(timestampAssigner.extractTimestamp(null, 13L), 
is(42L));
        }
 
        @Test
        public void testAnonymousInnerTimestampAssigner() {
-               WatermarkStrategy<Object> wmStrategy = WatermarkStrategies
+               WatermarkStrategy<Object> wmStrategy = WatermarkStrategy
                                .forMonotonousTimestamps()
                                .withTimestampAssigner(new 
SerializableTimestampAssigner<Object>() {
                                        @Override
                                        public long extractTimestamp(Object 
element, long recordTimestamp) {
                                                return 42;
                                        }
-                               })
-                               .build();
+                               });
                // ensure that the closure can be cleaned through the 
WatermarkStategies
                ClosureCleaner.clean(wmStrategy, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
 
-               TimestampAssigner<Object> timestampAssigner = 
wmStrategy.createTimestampAssigner(assignerContext());
+               TimestampAssigner<Object> timestampAssigner = wmStrategy
+                               .createTimestampAssigner(assignerContext());
 
                assertThat(timestampAssigner.extractTimestamp(null, 13L), 
is(42L));
        }
 
        @Test
        public void testClassTimestampAssigner() {
-               WatermarkStrategy<Object> wmStrategy = WatermarkStrategies
+               WatermarkStrategy<Object> wmStrategy = WatermarkStrategy
                                .forMonotonousTimestamps()
-                               .withTimestampAssigner((ctx) -> new 
TestTimestampAssigner())
-                               .build();
+                               .withTimestampAssigner((ctx) -> new 
TestTimestampAssigner());
                // ensure that the closure can be cleaned through the 
WatermarkStategies
                ClosureCleaner.clean(wmStrategy, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
 
-               TimestampAssigner<Object> timestampAssigner = 
wmStrategy.createTimestampAssigner(assignerContext());
+               TimestampAssigner<Object> timestampAssigner = wmStrategy
+                               .createTimestampAssigner(assignerContext());
 
                assertThat(timestampAssigner.extractTimestamp(null, 13L), 
is(42L));
        }
 
        @Test
        public void testClassTimestampAssignerUsingSupplier() {
-               WatermarkStrategy<Object> wmStrategy = WatermarkStrategies
+               WatermarkStrategy<Object> wmStrategy = WatermarkStrategy
                                .forMonotonousTimestamps()
-                               .withTimestampAssigner((context) -> new 
TestTimestampAssigner())
-                               .build();
+                               .withTimestampAssigner((context) -> new 
TestTimestampAssigner());
                // ensure that the closure can be cleaned through the 
WatermarkStategies
                ClosureCleaner.clean(wmStrategy, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
 
-               TimestampAssigner<Object> timestampAssigner = 
wmStrategy.createTimestampAssigner(assignerContext());
+               TimestampAssigner<Object> timestampAssigner = wmStrategy
+                               .createTimestampAssigner(assignerContext());
 
                assertThat(timestampAssigner.extractTimestamp(null, 13L), 
is(42L));
        }
 
+       @Test
+       public void testWithIdlenessHelper() {
+               WatermarkStrategy<String> wmStrategy = WatermarkStrategy
+                               .<String>forMonotonousTimestamps()
+                               .withIdleness(Duration.ofDays(7));
+
+               // ensure that the closure can be cleaned
+               ClosureCleaner.clean(wmStrategy, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+
+               
assertThat(wmStrategy.createTimestampAssigner(assignerContext()),
+                               instanceOf(RecordTimestampAssigner.class));
+               
assertThat(wmStrategy.createWatermarkGenerator(generatorContext()),
+                               instanceOf(WatermarksWithIdleness.class));
+       }
+
        static class TestTimestampAssigner implements 
TimestampAssigner<Object>, Serializable {
+
                @Override
                public long extractTimestamp(Object element, long 
recordTimestamp) {
                        return 42L;
@@ -135,13 +154,29 @@ public class WatermarkStrategiesTest {
        }
 
        static TimestampAssignerSupplier.Context assignerContext() {
-               return DummyMetricGroup::new;
+               return new TimestampAssignerSupplier.Context() {
+                       @Override
+                       public MetricGroup getMetricGroup() {
+                               return new DummyMetricGroup();
+                       }
+               };
+       }
+
+       static WatermarkGeneratorSupplier.Context generatorContext() {
+               return new WatermarkGeneratorSupplier.Context() {
+                       @Override
+                       public MetricGroup getMetricGroup() {
+                               return new DummyMetricGroup();
+                       }
+               };
        }
 
        /**
-        * A dummy {@link MetricGroup} to be used when a group is required as 
an argument but not actually used.
+        * A dummy {@link MetricGroup} to be used when a group is required as 
an argument but not
+        * actually used.
         */
        public static class DummyMetricGroup implements MetricGroup {
+
                @Override
                public Counter counter(int name) {
                        return null;
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 8911f45..5a54ea2 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -888,8 +888,8 @@ public class DataStream<T> {
         * <p>Periodically (defined by the {@link 
ExecutionConfig#getAutoWatermarkInterval()}), the
         * {@link WatermarkGenerator#onPeriodicEmit(WatermarkOutput)} method 
will be called.
         *
-        * <p>Common watermark generation patterns can be found in the
-        * {@link org.apache.flink.api.common.eventtime.WatermarkStrategies} 
class.
+        * <p>Common watermark generation patterns can be found as static 
methods in the
+        * {@link org.apache.flink.api.common.eventtime.WatermarkStrategy} 
class.
         *
         * @param watermarkStrategy The strategy to generate watermarks based 
on event timestamps.
         * @return The stream after the transformation, with assigned 
timestamps and watermarks.
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index a174a6e..788385b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.api;
 
 import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.FoldFunction;
@@ -92,6 +93,7 @@ import org.junit.rules.ExpectedException;
 import javax.annotation.Nullable;
 
 import java.lang.reflect.Method;
+import java.time.Duration;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
@@ -110,6 +112,27 @@ public class DataStreamTest extends TestLogger {
        public ExpectedException expectedException = ExpectedException.none();
 
        /**
+        * Ensure that WatermarkStrategy is easy to use in the API, without 
superfluous generics.
+        */
+       @Test
+       public void testErgonomicWatermarkStrategy() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStream<String> input = env.fromElements("bonjour");
+
+               // as soon as you have a chain of methods the first call needs 
a generic
+               input.assignTimestampsAndWatermarks(
+                               WatermarkStrategy
+                                               
.forBoundedOutOfOrderness(Duration.ofMillis(10)));
+
+               // as soon as you have a chain of methods the first call needs 
to specify the generic type
+               input.assignTimestampsAndWatermarks(
+                               WatermarkStrategy
+                                               
.<String>forBoundedOutOfOrderness(Duration.ofMillis(10))
+                                               .withTimestampAssigner((event, 
timestamp) -> 42L));
+       }
+
+       /**
         * Tests union functionality. This ensures that self-unions and unions 
of streams
         * with differing parallelism work.
         *
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index fec7b0d..eca6883 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -18,7 +18,7 @@
 package org.apache.flink.streaming.api.graph;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.eventtime.WatermarkStrategies;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -253,7 +253,7 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                DataStream<Integer> stream = env.continuousSource(
                                new MockSource(Boundedness.BOUNDED, 1),
-                               
WatermarkStrategies.<Integer>noWatermarks().build(),
+                               WatermarkStrategy.noWatermarks(),
                                "TestingSource");
 
                OneInputTransformation<Integer, Integer> resultTransform = new 
OneInputTransformation<Integer, Integer>(
@@ -463,7 +463,7 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                DataStream<Integer> source = env.continuousSource(
                                new MockSource(Boundedness.BOUNDED, 1),
-                               
WatermarkStrategies.<Integer>noWatermarks().build(),
+                               WatermarkStrategy.noWatermarks(),
                                "TestSource");
                source.addSink(new DiscardingSink<>());
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
index 347975c..8d05336 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.api.operators.source;
 
 import org.apache.flink.api.common.eventtime.Watermark;
-import org.apache.flink.api.common.eventtime.WatermarkStrategies;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.connector.source.ReaderOutput;
@@ -57,9 +56,9 @@ public class SourceOperatorEventTimeTest {
 
        @Test
        public void testMainOutputPeriodicWatermarks() throws Exception {
-               final WatermarkStrategy<Integer> watermarkStrategy = 
WatermarkStrategies
-                       .<Integer>forGenerator((ctx) -> new 
OnPeriodicTestWatermarkGenerator<>())
-                       .build();
+               final WatermarkStrategy<Integer> watermarkStrategy =
+                               WatermarkStrategy
+                                               .forGenerator((ctx) -> new 
OnPeriodicTestWatermarkGenerator<>());
 
                final List<Watermark> result = 
testSequenceOfWatermarks(watermarkStrategy,
                        (output) -> output.collect(0, 100L),
@@ -75,9 +74,9 @@ public class SourceOperatorEventTimeTest {
 
        @Test
        public void testMainOutputEventWatermarks() throws Exception {
-               final WatermarkStrategy<Integer> watermarkStrategy = 
WatermarkStrategies
-                       .<Integer>forGenerator((ctx) -> new 
OnEventTestWatermarkGenerator<>())
-                       .build();
+               final WatermarkStrategy<Integer> watermarkStrategy =
+                               WatermarkStrategy
+                                               .forGenerator((ctx) -> new 
OnEventTestWatermarkGenerator<>());
 
                final List<Watermark> result = 
testSequenceOfWatermarks(watermarkStrategy,
                        (output) -> output.collect(0, 100L),
@@ -93,9 +92,9 @@ public class SourceOperatorEventTimeTest {
 
        @Test
        public void testPerSplitOutputPeriodicWatermarks() throws Exception {
-               final WatermarkStrategy<Integer> watermarkStrategy = 
WatermarkStrategies
-                       .<Integer>forGenerator((ctx) -> new 
OnPeriodicTestWatermarkGenerator<>())
-                       .build();
+               final WatermarkStrategy<Integer> watermarkStrategy =
+                               WatermarkStrategy
+                                               .forGenerator((ctx) -> new 
OnPeriodicTestWatermarkGenerator<>());
 
                final List<Watermark> result = 
testSequenceOfWatermarks(watermarkStrategy,
                        (output) -> {
@@ -118,9 +117,9 @@ public class SourceOperatorEventTimeTest {
 
        @Test
        public void testPerSplitOutputEventWatermarks() throws Exception {
-               final WatermarkStrategy<Integer> watermarkStrategy = 
WatermarkStrategies
-                               .<Integer>forGenerator((ctx) -> new 
OnEventTestWatermarkGenerator<>())
-                               .build();
+               final WatermarkStrategy<Integer> watermarkStrategy =
+                               WatermarkStrategy
+                                               .forGenerator((ctx) -> new 
OnEventTestWatermarkGenerator<>());
 
                final List<Watermark> result = 
testSequenceOfWatermarks(watermarkStrategy,
                        (output) -> {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
index 43992dc..038a211 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.api.operators.source;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.eventtime.WatermarkStrategies;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
@@ -56,7 +55,7 @@ public class TestingSourceOperator<T>  extends 
SourceOperator<T, MockSourceSplit
                        OperatorEventGateway eventGateway,
                        int subtaskIndex) {
 
-               this(reader, WatermarkStrategies.<T>noWatermarks().build(), new 
TestProcessingTimeService(), eventGateway, subtaskIndex, 5);
+               this(reader, WatermarkStrategy.noWatermarks(), new 
TestProcessingTimeService(), eventGateway, subtaskIndex, 5);
        }
 
        public TestingSourceOperator(
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java
index 4c149ba..995a8ca 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java
@@ -22,7 +22,7 @@ import 
org.apache.flink.api.common.eventtime.TimestampAssigner;
 import org.apache.flink.api.common.eventtime.Watermark;
 import org.apache.flink.api.common.eventtime.WatermarkGenerator;
 import org.apache.flink.api.common.eventtime.WatermarkOutput;
-import org.apache.flink.api.common.eventtime.WatermarkStrategies;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -47,7 +47,7 @@ public class TimestampsAndWatermarksOperatorTest {
        @Test
        public void inputWatermarksAreNotForwarded() throws Exception {
                OneInputStreamOperatorTestHarness<Long, Long> testHarness = 
createTestHarness(
-                               WatermarkStrategies
+                               WatermarkStrategy
                                                .forGenerator((ctx) -> new 
PeriodicWatermarkGenerator())
                                                .withTimestampAssigner((ctx) -> 
new LongExtractor()));
 
@@ -60,7 +60,7 @@ public class TimestampsAndWatermarksOperatorTest {
        @Test
        public void longMaxInputWatermarkIsForwarded() throws Exception {
                OneInputStreamOperatorTestHarness<Long, Long> testHarness = 
createTestHarness(
-                               WatermarkStrategies
+                               WatermarkStrategy
                                                .forGenerator((ctx) -> new 
PeriodicWatermarkGenerator())
                                                .withTimestampAssigner((ctx) -> 
new LongExtractor()));
 
@@ -72,7 +72,7 @@ public class TimestampsAndWatermarksOperatorTest {
        @Test
        public void periodicWatermarksEmitOnPeriodicEmit() throws Exception {
                OneInputStreamOperatorTestHarness<Long, Long> testHarness = 
createTestHarness(
-                               WatermarkStrategies
+                               WatermarkStrategy
                                                .forGenerator((ctx) -> new 
PeriodicWatermarkGenerator())
                                                .withTimestampAssigner((ctx) -> 
new LongExtractor()));
 
@@ -92,7 +92,7 @@ public class TimestampsAndWatermarksOperatorTest {
        @Test
        public void periodicWatermarksOnlyEmitOnPeriodicEmit() throws Exception 
{
                OneInputStreamOperatorTestHarness<Long, Long> testHarness = 
createTestHarness(
-                               WatermarkStrategies
+                               WatermarkStrategy
                                                .forGenerator((ctx) -> new 
PeriodicWatermarkGenerator())
                                                .withTimestampAssigner((ctx) -> 
new LongExtractor()));
 
@@ -105,7 +105,7 @@ public class TimestampsAndWatermarksOperatorTest {
        @Test
        public void periodicWatermarksDoNotRegress() throws Exception {
                OneInputStreamOperatorTestHarness<Long, Long> testHarness = 
createTestHarness(
-                               WatermarkStrategies
+                               WatermarkStrategy
                                                .forGenerator((ctx) -> new 
PeriodicWatermarkGenerator())
                                                .withTimestampAssigner((ctx) -> 
new LongExtractor()));
 
@@ -125,7 +125,7 @@ public class TimestampsAndWatermarksOperatorTest {
        @Test
        public void punctuatedWatermarksEmitImmediately() throws Exception {
                OneInputStreamOperatorTestHarness<Tuple2<Boolean, Long>, 
Tuple2<Boolean, Long>> testHarness = createTestHarness(
-                               WatermarkStrategies
+                               WatermarkStrategy
                                                .forGenerator((ctx) -> new 
PunctuatedWatermarkGenerator())
                                                .withTimestampAssigner((ctx) -> 
new TupleExtractor()));
 
@@ -143,7 +143,7 @@ public class TimestampsAndWatermarksOperatorTest {
        @Test
        public void punctuatedWatermarksDoNotRegress() throws Exception {
                OneInputStreamOperatorTestHarness<Tuple2<Boolean, Long>, 
Tuple2<Boolean, Long>> testHarness = createTestHarness(
-                               WatermarkStrategies
+                               WatermarkStrategy
                                                .forGenerator((ctx) -> new 
PunctuatedWatermarkGenerator())
                                                .withTimestampAssigner((ctx) -> 
new TupleExtractor()));
 
@@ -165,7 +165,7 @@ public class TimestampsAndWatermarksOperatorTest {
        public void testNegativeTimestamps() throws Exception {
 
                OneInputStreamOperatorTestHarness<Long, Long> testHarness = 
createTestHarness(
-                               WatermarkStrategies
+                               WatermarkStrategy
                                                .forGenerator((ctx) -> new 
NeverWatermarkGenerator())
                                                .withTimestampAssigner((ctx) -> 
new LongExtractor()));
 
@@ -181,10 +181,10 @@ public class TimestampsAndWatermarksOperatorTest {
        }
 
        private static <T> OneInputStreamOperatorTestHarness<T, T> 
createTestHarness(
-                       WatermarkStrategies<T> watermarkStrategy) throws 
Exception {
+                       WatermarkStrategy<T> watermarkStrategy) throws 
Exception {
 
                final TimestampsAndWatermarksOperator<T> operator =
-                               new 
TimestampsAndWatermarksOperator<>(watermarkStrategy.build());
+                               new 
TimestampsAndWatermarksOperator<>(watermarkStrategy);
 
                OneInputStreamOperatorTestHarness<T, T> testHarness =
                                new 
OneInputStreamOperatorTestHarness<>(operator);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
index 515bd51..0f2e99d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.api.common.eventtime.TimestampAssigner;
-import org.apache.flink.api.common.eventtime.WatermarkStrategies;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.mocks.MockSource;
@@ -132,7 +132,7 @@ public class SourceOperatorStreamTaskTest {
                // get a source operator.
                SourceOperatorFactory<Integer> sourceOperatorFactory = new 
SourceOperatorFactory<>(
                                new MockSource(Boundedness.BOUNDED, 1),
-                               
WatermarkStrategies.<Integer>noWatermarks().build());
+                               WatermarkStrategy.noWatermarks());
 
                // build a test harness.
                MultipleInputStreamTaskTestHarnessBuilder<Integer> builder =
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index fe2184e..0608c53 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -828,8 +828,8 @@ class DataStream[T](stream: JavaStream[T]) {
    * Periodically (defined by the 
[[ExecutionConfig#getAutoWatermarkInterval()]]), the
    * [[WatermarkGenerator#onPeriodicEmit(WatermarkOutput)]] method will be 
called.
    *
-   * Common watermark generation patterns can be found in the
-   * [[org.apache.flink.api.common.eventtime.WatermarkStrategies]] class.
+   * Common watermark generation patterns can be found as static methods in the
+   * [[org.apache.flink.api.common.eventtime.WatermarkStrategy]] class.
    */
   def assignTimestampsAndWatermarks(watermarkStrategy: WatermarkStrategy[T]): 
DataStream[T] = {
     val cleanedStrategy = clean(watermarkStrategy)
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala
index f493497..fa503e0 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala
@@ -18,12 +18,11 @@
 
 package org.apache.flink.streaming.api.scala
 
-import org.apache.flink.api.common.eventtime.WatermarkStrategies
+import org.apache.flink.api.common.eventtime.WatermarkStrategy
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.connector.source.Boundedness
 import org.apache.flink.api.connector.source.mocks.MockSource
 import org.apache.flink.api.java.typeutils.GenericTypeInfo
-
 import org.junit.Assert.assertEquals
 import org.junit.Test
 
@@ -43,7 +42,7 @@ class StreamExecutionEnvironmentTest {
 
     val stream = env.continuousSource(
       new MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 1),
-      WatermarkStrategies.noWatermarks[Integer]().build(),
+      WatermarkStrategy.noWatermarks(),
       "test source")
 
     assertEquals(typeInfo, stream.dataType)

Reply via email to