This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push: new c0eb1e3 [FLINK-18011] Make WatermarkStrategy/WatermarkStrategies more ergonomic c0eb1e3 is described below commit c0eb1e3ad3fceda61e470c584319bddccdf660dd Author: Aljoscha Krettek <aljos...@apache.org> AuthorDate: Wed May 27 22:45:26 2020 +0200 [FLINK-18011] Make WatermarkStrategy/WatermarkStrategies more ergonomic This removes WatermarkStrategies and instead moves the convenience entrypoint methods for strategies directly to WatermarmStrategy. WatermarkStrategy is now also itself the builder for more complex strategies instead of WatermarkStrategies. --- docs/dev/connectors/kafka.md | 10 +- docs/dev/event_timestamp_extractors.md | 17 +- docs/dev/event_timestamps_watermarks.md | 43 ++-- .../source/reader/CoordinatedSourceITCase.java | 8 +- .../connectors/kafka/FlinkKafkaConsumerBase.java | 4 +- .../internals/AbstractFetcherWatermarksTest.java | 4 +- .../api/common/eventtime/WatermarkStrategies.java | 250 --------------------- .../api/common/eventtime/WatermarkStrategy.java | 141 +++++++++++- .../eventtime/WatermarkStrategyWithIdleness.java | 48 ++++ .../WatermarkStrategyWithTimestampAssigner.java | 48 ++++ ...ategiesTest.java => WatermarkStrategyTest.java} | 91 +++++--- .../flink/streaming/api/datastream/DataStream.java | 4 +- .../apache/flink/streaming/api/DataStreamTest.java | 23 ++ .../api/graph/StreamingJobGraphGeneratorTest.java | 6 +- .../source/SourceOperatorEventTimeTest.java | 25 +-- .../operators/source/TestingSourceOperator.java | 3 +- .../TimestampsAndWatermarksOperatorTest.java | 22 +- .../tasks/SourceOperatorStreamTaskTest.java | 4 +- .../flink/streaming/api/scala/DataStream.scala | 4 +- .../api/scala/StreamExecutionEnvironmentTest.scala | 5 +- 20 files changed, 392 insertions(+), 368 deletions(-) diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md index 6f21ff7..27f18f7 100644 --- a/docs/dev/connectors/kafka.md +++ b/docs/dev/connectors/kafka.md @@ -372,9 +372,8 @@ properties.setProperty("group.id", "test"); FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties); myConsumer.assignTimestampsAndWatermarks( - WatermarkStrategies. - .<String>forBoundedOutOfOrderness(Duration.ofSeconds(20)) - .build()); + WatermarkStrategy. + .forBoundedOutOfOrderness(Duration.ofSeconds(20))); DataStream<String> stream = env.addSource(myConsumer); {% endhighlight %} @@ -388,9 +387,8 @@ properties.setProperty("group.id", "test") val myConsumer = new FlinkKafkaConsumer("topic", new SimpleStringSchema(), properties); myConsumer.assignTimestampsAndWatermarks( - WatermarkStrategies. - .forBoundedOutOfOrderness[String](Duration.ofSeconds(20)) - .build()) + WatermarkStrategy. + .forBoundedOutOfOrderness(Duration.ofSeconds(20))) val stream = env.addSource(myConsumer) {% endhighlight %} diff --git a/docs/dev/event_timestamp_extractors.md b/docs/dev/event_timestamp_extractors.md index a80181d..417658c 100644 --- a/docs/dev/event_timestamp_extractors.md +++ b/docs/dev/event_timestamp_extractors.md @@ -53,16 +53,12 @@ unioned, connected, or merged. <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -WatermarkStrategies - .<MyType>forMonotonousTimestamps() - .build(); +WatermarkStrategy.forMonotonousTimestamps(); {% endhighlight %} </div> <div data-lang="scala" markdown="1"> {% highlight scala %} -WatermarkStrategies - .forMonotonousTimestamps[MyType]() - .build() +WatermarkStrategy.forMonotonousTimestamps() {% endhighlight %} </div> </div> @@ -89,16 +85,13 @@ about working with late elements. <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -WatermarkStrategies - .<MyType>forBoundedOutOfOrderness(Duration.ofSeconds(10)) - .build(); +WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10)); {% endhighlight %} </div> <div data-lang="scala" markdown="1"> {% highlight scala %} -WatermarkStrategies - .forBoundedOutOfOrderness[MyType](Duration.ofSeconds(10)) - .build() +WatermarkStrategy + .forBoundedOutOfOrderness(Duration.ofSeconds(10)) {% endhighlight %} </div> </div> diff --git a/docs/dev/event_timestamps_watermarks.md b/docs/dev/event_timestamps_watermarks.md index 58f78c3..3b8d766 100644 --- a/docs/dev/event_timestamps_watermarks.md +++ b/docs/dev/event_timestamps_watermarks.md @@ -41,8 +41,11 @@ Timestamp assignment goes hand-in-hand with generating watermarks, which tell the system about progress in event time. You can configure this by specifying a `WatermarkGenerator`. -The Flink API expects a `WatermarkStrategy` that contains both a `TimestampAssigner` and `WatermarkGenerator`. -A number of common strategies out of the box, available in the `WatermarkStrategies` helper, but users can also build their own strategies when required. +The Flink API expects a `WatermarkStrategy` that contains both a +`TimestampAssigner` and `WatermarkGenerator`. A number of common strategies +are available out of the box as static methods on `WatermarkStrategy`, but +users can also build their own strategies when required. + Here is the interface for completeness' sake: {% highlight java %} @@ -64,26 +67,26 @@ public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, Wate {% endhighlight %} As mentioned, you usually don't implement this interface yourself but use the -`WatermarkStrategies` helper for using common watermark strategies or to bundle -together a custom `TimestampAssigner` with a `WatermarkGenerator`. For example, to use bounded-of-orderness watermarks and a lambda function as a timestamp assigner you use this: +static helper methods on `WatermarkStrategy` for common watermark strategies or +to bundle together a custom `TimestampAssigner` with a `WatermarkGenerator`. +For example, to use bounded-out-of-orderness watermarks and a lambda function as a +timestamp assigner you use this: <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -WatermarkStrategies +WatermarkStrategy .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20)) - .withTimestampAssigner((event, timestamp) -> event.f0) - .build(); + .withTimestampAssigner((event, timestamp) -> event.f0); {% endhighlight %} </div> <div data-lang="scala" markdown="1"> {% highlight scala %} -WatermarkStrategies +WatermarkStrategy .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20)) .withTimestampAssigner(new SerializableTimestampAssigner[(Long, String)] { override def extractTimestamp(element: (Long, String), recordTimestamp: Long): Long = element._1 }) - .build() {% endhighlight %} (Using Scala Lambdas here currently doesn't work because Scala is stupid and it's hard to support this. #fus) @@ -176,23 +179,23 @@ This is a problem because it can happen that some of your partitions do still carry events. In that case, the watermark will be held back, because it is computed as the minimum over all the different parallel watermarks. -To deal with this, you can use a `WatermarkStrategy` that will detect idleness and mark an input as idle. `WatermarkStrategies` provides a convenience helper for this: +To deal with this, you can use a `WatermarkStrategy` that will detect idleness +and mark an input as idle. `WatermarkStrategy` provides a convenience helper +for this: <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -WatermarkStrategies +WatermarkStrategy .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20)) - .withIdleness(Duration.ofMinutes(1)) - .build(); + .withIdleness(Duration.ofMinutes(1)); {% endhighlight %} </div> <div data-lang="scala" markdown="1"> {% highlight scala %} -WatermarkStrategies +WatermarkStrategy .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20)) .withIdleness(Duration.ofMinutes(1)) - .build() {% endhighlight %} </div> </div> @@ -434,9 +437,8 @@ case. {% highlight java %} FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>("myTopic", schema, props); kafkaSource.assignTimestampsAndWatermarks( - WatermarkStrategies. - .<MyType>forBoundedOutOfOrderness(Duration.ofSeconds(20)) - .build()); + WatermarkStrategy. + .forBoundedOutOfOrderness(Duration.ofSeconds(20))); DataStream<MyType> stream = env.addSource(kafkaSource); {% endhighlight %} @@ -445,9 +447,8 @@ DataStream<MyType> stream = env.addSource(kafkaSource); {% highlight scala %} val kafkaSource = new FlinkKafkaConsumer[MyType]("myTopic", schema, props) kafkaSource.assignTimestampsAndWatermarks( - WatermarkStrategies - .forBoundedOutOfOrderness[MyType](Duration.ofSeconds(20)) - .build()) + WatermarkStrategy + .forBoundedOutOfOrderness(Duration.ofSeconds(20))) val stream: DataStream[MyType] = env.addSource(kafkaSource) {% endhighlight %} diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java index a7f4c22..6582210 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java @@ -19,7 +19,7 @@ package org.apache.flink.connector.base.source.reader; import org.apache.flink.api.common.accumulators.ListAccumulator; -import org.apache.flink.api.common.eventtime.WatermarkStrategies; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.Source; import org.apache.flink.configuration.Configuration; @@ -47,7 +47,7 @@ public class CoordinatedSourceITCase extends AbstractTestBase { MockBaseSource source = new MockBaseSource(2, 10, Boundedness.BOUNDED); DataStream<Integer> stream = env.continuousSource( source, - WatermarkStrategies.<Integer>noWatermarks().build(), + WatermarkStrategy.noWatermarks(), "TestingSource"); executeAndVerify(env, stream, 20); } @@ -59,11 +59,11 @@ public class CoordinatedSourceITCase extends AbstractTestBase { MockBaseSource source2 = new MockBaseSource(2, 10, 20, Boundedness.BOUNDED); DataStream<Integer> stream1 = env.continuousSource( source1, - WatermarkStrategies.<Integer>noWatermarks().build(), + WatermarkStrategy.noWatermarks(), "TestingSource1"); DataStream<Integer> stream2 = env.continuousSource( source2, - WatermarkStrategies.<Integer>noWatermarks().build(), + WatermarkStrategy.noWatermarks(), "TestingSource2"); executeAndVerify(env, stream1.union(stream2), 40); } diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java index 0e8c261..67d65fb 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java @@ -280,8 +280,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti * strictly ascending per Kafka partition, they will not be strictly ascending in the resulting * Flink DataStream, if the parallel source subtask reads more than one partition. * - * <p>Common watermark generation patterns can be found in the - * {@link org.apache.flink.api.common.eventtime.WatermarkStrategies} class. + * <p>Common watermark generation patterns can be found as static methods in the + * {@link org.apache.flink.api.common.eventtime.WatermarkStrategy} class. * * @return The consumer object, to allow function chaining. */ diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherWatermarksTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherWatermarksTest.java index e33ea4a..83a4f68 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherWatermarksTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherWatermarksTest.java @@ -20,7 +20,6 @@ package org.apache.flink.streaming.connectors.kafka.internals; import org.apache.flink.api.common.eventtime.WatermarkGenerator; import org.apache.flink.api.common.eventtime.WatermarkOutput; -import org.apache.flink.api.common.eventtime.WatermarkStrategies; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; @@ -72,10 +71,9 @@ public class AbstractFetcherWatermarksTest { public static Collection<WatermarkStrategy<Long>> getParams() { return Arrays.asList( new AssignerWithPeriodicWatermarksAdapter.Strategy<>(new PeriodicTestExtractor()), - WatermarkStrategies + WatermarkStrategy .forGenerator((ctx) -> new PeriodicTestWatermarkGenerator()) .withTimestampAssigner((event, previousTimestamp) -> event) - .build() ); } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategies.java b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategies.java deleted file mode 100644 index 880edc7..0000000 --- a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategies.java +++ /dev/null @@ -1,250 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.common.eventtime; - -import org.apache.flink.annotation.Public; - -import javax.annotation.Nullable; - -import java.time.Duration; - -import static org.apache.flink.util.Preconditions.checkArgument; -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * WatermarkStrategies is a simply way to build a {@link WatermarkStrategy} by configuring - * common strategies. - */ -@Public -public final class WatermarkStrategies<T> { - - /** - * The {@link TimestampAssigner} to use. This can be {@code null} for cases where records come - * out of a source with valid timestamps, for example from Kafka. - */ - @Nullable - private TimestampAssignerSupplier<T> timestampAssignerSupplier = null; - - /** The base strategy for watermark generation. Starting point, is always set. */ - private final WatermarkStrategy<T> baseStrategy; - - /** Optional idle timeout for watermarks. */ - @Nullable - private Duration idleTimeout; - - private WatermarkStrategies(WatermarkStrategy<T> baseStrategy) { - this.baseStrategy = baseStrategy; - } - - // ------------------------------------------------------------------------ - // builder methods - // ------------------------------------------------------------------------ - - /** - * Add an idle timeout to the watermark strategy. - * If no records flow in a partition of a stream for that amount of time, then that partition - * is considered "idle" and will not hold back the progress of watermarks in downstream operators. - * - * <p>Idleness can be important if some partitions have little data and might not have events during - * some periods. Without idleness, these streams can stall the overall event time progress of the - * application. - */ - public WatermarkStrategies<T> withIdleness(Duration idleTimeout) { - checkNotNull(idleTimeout, "idleTimeout"); - checkArgument(!(idleTimeout.isZero() || idleTimeout.isNegative()), "idleTimeout must be greater than zero"); - this.idleTimeout = idleTimeout; - return this; - } - - /** - * Adds the given {@link TimestampAssigner} (via a {@link TimestampAssignerSupplier}) to this - * {@link WatermarkStrategies}. - * - * <p>You can use this when a {@link TimestampAssigner} needs additional context, for example - * access to the metrics system. - * - * <pre> - * {@code WatermarkStrategy<Object> wmStrategy = WatermarkStrategies - * .forMonotonousTimestamps() - * .withTimestampAssigner((ctx) -> new MetricsReportingAssigner(ctx)) - * .build(); - * }</pre> - */ - public WatermarkStrategies<T> withTimestampAssigner(TimestampAssignerSupplier<T> timestampAssigner) { - checkNotNull(timestampAssigner, "timestampAssigner"); - this.timestampAssignerSupplier = timestampAssigner; - return this; - } - - /** - * Adds the given {@link TimestampAssigner} to this {@link WatermarkStrategies}. - * - * <p>You can use this in case you want to specify a {@link TimestampAssigner} via a lambda - * function. - * - * <pre> - * {@code WatermarkStrategy<CustomObject> wmStrategy = WatermarkStrategies - * .<CustomObject>forMonotonousTimestamps() - * .withTimestampAssigner((event, timestamp) -> event.getTimestamp()) - * .build(); - * }</pre> - */ - public WatermarkStrategies<T> withTimestampAssigner(SerializableTimestampAssigner<T> timestampAssigner) { - checkNotNull(timestampAssigner, "timestampAssigner"); - this.timestampAssignerSupplier = TimestampAssignerSupplier.of(timestampAssigner); - return this; - } - - /** - * Build the watermark strategy. - */ - public WatermarkStrategy<T> build() { - WatermarkStrategy<T> strategy = this.baseStrategy; - - if (idleTimeout != null) { - strategy = new WithIdlenessStrategy<>(strategy, idleTimeout); - } - - if (timestampAssignerSupplier != null) { - strategy = new WithTimestampAssigner<>(strategy, timestampAssignerSupplier); - } - - return strategy; - } - - // ------------------------------------------------------------------------ - // builder entry points - // ------------------------------------------------------------------------ - - /** - * Starts building a watermark strategy for situations with monotonously ascending - * timestamps. - * - * <p>The watermarks are generated periodically and tightly follow the latest - * timestamp in the data. The delay introduced by this strategy is mainly the periodic - * interval in which the watermarks are generated. - * - * @see AscendingTimestampsWatermarks - */ - public static <T> WatermarkStrategies<T> forMonotonousTimestamps() { - return new WatermarkStrategies<>((ctx) -> new AscendingTimestampsWatermarks<>()); - } - - /** - * Starts building a watermark strategy for situations where records are out of order, but - * you can place an upper bound on how far the events are out of order. - * An out-of-order bound B means that once the an event with timestamp T was encountered, no - * events older than {@code T - B} will follow any more. - * - * <p>The watermarks are generated periodically. The delay introduced by this watermark strategy - * is the periodic interval length, plus the out of orderness bound. - * - * @see BoundedOutOfOrdernessWatermarks - */ - public static <T> WatermarkStrategies<T> forBoundedOutOfOrderness(Duration maxOutOfOrderness) { - return new WatermarkStrategies<>((ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness)); - } - - /** - * Starts building a watermark strategy based on an existing {@code WatermarkStrategy}. - */ - public static <T> WatermarkStrategies<T> forStrategy(WatermarkStrategy<T> strategy) { - return new WatermarkStrategies<>(strategy); - } - - /** - * Starts building a watermark strategy based on an existing {@link WatermarkGeneratorSupplier}. - */ - public static <T> WatermarkStrategies<T> forGenerator(WatermarkGeneratorSupplier<T> generatorSupplier) { - return new WatermarkStrategies<>(new FromWatermarkGeneratorSupplier<>(generatorSupplier)); - } - - /** - * Starts building a watermark strategy that generates no watermarks at all. - * This may be useful in scenarios that do pure processing-time based stream processing. - */ - public static <T> WatermarkStrategies<T> noWatermarks() { - return new WatermarkStrategies<>((ctx) -> new NoWatermarksGenerator<>()); - } - - // ------------------------------------------------------------------------ - - private static final class FromWatermarkGeneratorSupplier<T> implements WatermarkStrategy<T> { - private static final long serialVersionUID = 1L; - - private final WatermarkGeneratorSupplier<T> generatorSupplier; - - private FromWatermarkGeneratorSupplier(WatermarkGeneratorSupplier<T> generatorSupplier) { - this.generatorSupplier = generatorSupplier; - } - - @Override - public WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { - return generatorSupplier.createWatermarkGenerator(context); - } - } - - /** - * A {@link WatermarkStrategy} that overrides the {@link TimestampAssigner} of the given base - * {@link WatermarkStrategy}. - */ - private static final class WithTimestampAssigner<T> implements WatermarkStrategy<T> { - private static final long serialVersionUID = 1L; - - private final WatermarkStrategy<T> baseStrategy; - private final TimestampAssignerSupplier<T> timestampAssigner; - - private WithTimestampAssigner(WatermarkStrategy<T> baseStrategy, TimestampAssignerSupplier<T> timestampAssigner) { - this.baseStrategy = baseStrategy; - this.timestampAssigner = timestampAssigner; - } - - @Override - public TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context) { - return timestampAssigner.createTimestampAssigner(context); - } - - @Override - public WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { - return baseStrategy.createWatermarkGenerator(context); - } - } - - private static final class WithIdlenessStrategy<T> implements WatermarkStrategy<T> { - private static final long serialVersionUID = 1L; - - private final WatermarkStrategy<T> baseStrategy; - private final Duration idlenessTimeout; - - private WithIdlenessStrategy(WatermarkStrategy<T> baseStrategy, Duration idlenessTimeout) { - this.baseStrategy = baseStrategy; - this.idlenessTimeout = idlenessTimeout; - } - - @Override - public TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context) { - return baseStrategy.createTimestampAssigner(context); - } - - @Override - public WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { - return new WatermarksWithIdleness<>(baseStrategy.createWatermarkGenerator(context), idlenessTimeout); - } - } -} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java index 254afc1..cf11e47 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java @@ -21,17 +21,48 @@ package org.apache.flink.api.common.eventtime; import org.apache.flink.annotation.Public; import java.io.Serializable; +import java.time.Duration; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; /** * The WatermarkStrategy defines how to generate {@link Watermark}s in the stream sources. The * WatermarkStrategy is a builder/factory for the {@link WatermarkGenerator} that generates the * watermarks and the {@link TimestampAssigner} which assigns the internal timestamp of a record. * + * <p>This interface is split into three parts: 1) methods that an implementor of this interface + * needs to implement, 2) builder methods for building a {@code WatermarkStrategy} on a base + * strategy, 3) convenience methods for constructing a {code WatermarkStrategy} for common built-in + * strategies or based on a {@link WatermarkGeneratorSupplier} + * + * <p>Implementors of this interface need only implement {@link #createWatermarkGenerator(WatermarkGeneratorSupplier.Context)}. + * Optionally, you can implement {@link #createTimestampAssigner(TimestampAssignerSupplier.Context)}. + * + * <p>The builder methods, like {@link #withIdleness(Duration)} or {@link + * #createTimestampAssigner(TimestampAssignerSupplier.Context)} create a new {@code + * WatermarkStrategy} that wraps and enriches a base strategy. The strategy on which the method is + * called is the base strategy. + * + * <p>The convenience methods, for example {@link #forBoundedOutOfOrderness(Duration)}, create a + * {@code WatermarkStrategy} for common built in strategies. + * * <p>This interface is {@link Serializable} because watermark strategies may be shipped * to workers during distributed execution. */ @Public -public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T> { +public interface WatermarkStrategy<T> extends + TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T> { + + // ------------------------------------------------------------------------ + // Methods that implementors need to implement. + // ------------------------------------------------------------------------ + + /** + * Instantiates a WatermarkGenerator that generates watermarks according to this strategy. + */ + @Override + WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context); /** * Instantiates a {@link TimestampAssigner} for assigning timestamps according to this @@ -44,9 +75,111 @@ public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, Wate return new RecordTimestampAssigner<>(); } + // ------------------------------------------------------------------------ + // Builder methods for enriching a base WatermarkStrategy + // ------------------------------------------------------------------------ + /** - * Instantiates a WatermarkGenerator that generates watermarks according to this strategy. + * Creates a new {@code WatermarkStrategy} that wraps this strategy but instead uses the given + * {@link TimestampAssigner} (via a {@link TimestampAssignerSupplier}). + * + * <p>You can use this when a {@link TimestampAssigner} needs additional context, for example + * access to the metrics system. + * + * <pre> + * {@code WatermarkStrategy<Object> wmStrategy = WatermarkStrategy + * .forMonotonousTimestamps() + * .withTimestampAssigner((ctx) -> new MetricsReportingAssigner(ctx)); + * }</pre> */ - @Override - WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context); + default WatermarkStrategy<T> withTimestampAssigner(TimestampAssignerSupplier<T> timestampAssigner) { + checkNotNull(timestampAssigner, "timestampAssigner"); + return new WatermarkStrategyWithTimestampAssigner<>(this, timestampAssigner); + } + + /** + * Creates a new {@code WatermarkStrategy} that wraps this strategy but instead uses the given + * {@link SerializableTimestampAssigner}. + * + * <p>You can use this in case you want to specify a {@link TimestampAssigner} via a lambda + * function. + * + * <pre> + * {@code WatermarkStrategy<CustomObject> wmStrategy = WatermarkStrategy + * .forMonotonousTimestamps() + * .withTimestampAssigner((event, timestamp) -> event.getTimestamp()); + * }</pre> + */ + default WatermarkStrategy<T> withTimestampAssigner(SerializableTimestampAssigner<T> timestampAssigner) { + checkNotNull(timestampAssigner, "timestampAssigner"); + return new WatermarkStrategyWithTimestampAssigner<>(this, + TimestampAssignerSupplier.of(timestampAssigner)); + } + + /** + * Creates a new enriched {@link WatermarkStrategy} that also does idleness detection in the + * created {@link WatermarkGenerator}. + * + * <p>Add an idle timeout to the watermark strategy. If no records flow in a partition of a + * stream for that amount of time, then that partition is considered "idle" and will not hold + * back the progress of watermarks in downstream operators. + * + * <p>Idleness can be important if some partitions have little data and might not have events + * during some periods. Without idleness, these streams can stall the overall event time + * progress of the application. + */ + default WatermarkStrategy<T> withIdleness(Duration idleTimeout) { + checkNotNull(idleTimeout, "idleTimeout"); + checkArgument(!(idleTimeout.isZero() || idleTimeout.isNegative()), + "idleTimeout must be greater than zero"); + return new WatermarkStrategyWithIdleness<>(this, idleTimeout); + } + + // ------------------------------------------------------------------------ + // Convenience methods for common watermark strategies + // ------------------------------------------------------------------------ + + /** + * Creates a watermark strategy for situations with monotonously ascending timestamps. + * + * <p>The watermarks are generated periodically and tightly follow the latest + * timestamp in the data. The delay introduced by this strategy is mainly the periodic interval + * in which the watermarks are generated. + * + * @see AscendingTimestampsWatermarks + */ + static <T> WatermarkStrategy<T> forMonotonousTimestamps() { + return (ctx) -> new AscendingTimestampsWatermarks<>(); + } + + /** + * Creates a watermark strategy for situations where records are out of order, but you can place + * an upper bound on how far the events are out of order. An out-of-order bound B means that + * once the an event with timestamp T was encountered, no events older than {@code T - B} will + * follow any more. + * + * <p>The watermarks are generated periodically. The delay introduced by this watermark + * strategy is the periodic interval length, plus the out of orderness bound. + * + * @see BoundedOutOfOrdernessWatermarks + */ + static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration maxOutOfOrderness) { + return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness); + } + + /** + * Creates a watermark strategy based on an existing {@link WatermarkGeneratorSupplier}. + */ + static <T> WatermarkStrategy<T> forGenerator(WatermarkGeneratorSupplier<T> generatorSupplier) { + return generatorSupplier::createWatermarkGenerator; + } + + /** + * Creates a watermark strategy that generates no watermarks at all. This may be useful in + * scenarios that do pure processing-time based stream processing. + */ + static <T> WatermarkStrategy<T> noWatermarks() { + return (ctx) -> new NoWatermarksGenerator<>(); + } + } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategyWithIdleness.java b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategyWithIdleness.java new file mode 100644 index 0000000..98c3374 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategyWithIdleness.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.eventtime; + +import java.time.Duration; + +/** + * A {@link WatermarkStrategy} that adds idleness detection on top of the wrapped strategy. + */ +final class WatermarkStrategyWithIdleness<T> implements WatermarkStrategy<T> { + + private static final long serialVersionUID = 1L; + + private final WatermarkStrategy<T> baseStrategy; + private final Duration idlenessTimeout; + + WatermarkStrategyWithIdleness(WatermarkStrategy<T> baseStrategy, Duration idlenessTimeout) { + this.baseStrategy = baseStrategy; + this.idlenessTimeout = idlenessTimeout; + } + + @Override + public TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context) { + return baseStrategy.createTimestampAssigner(context); + } + + @Override + public WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { + return new WatermarksWithIdleness<>(baseStrategy.createWatermarkGenerator(context), + idlenessTimeout); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategyWithTimestampAssigner.java b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategyWithTimestampAssigner.java new file mode 100644 index 0000000..f83779a --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategyWithTimestampAssigner.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.eventtime; + +/** + * A {@link WatermarkStrategy} that overrides the {@link TimestampAssigner} of the given base {@link + * WatermarkStrategy}. + */ +final class WatermarkStrategyWithTimestampAssigner<T> implements WatermarkStrategy<T> { + + private static final long serialVersionUID = 1L; + + private final WatermarkStrategy<T> baseStrategy; + private final TimestampAssignerSupplier<T> timestampAssigner; + + WatermarkStrategyWithTimestampAssigner( + WatermarkStrategy<T> baseStrategy, + TimestampAssignerSupplier<T> timestampAssigner) { + this.baseStrategy = baseStrategy; + this.timestampAssigner = timestampAssigner; + } + + @Override + public TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context) { + return timestampAssigner.createTimestampAssigner(context); + } + + @Override + public WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { + return baseStrategy.createWatermarkGenerator(context); + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkStrategiesTest.java b/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkStrategyTest.java similarity index 72% rename from flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkStrategiesTest.java rename to flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkStrategyTest.java index 6136bb7..857b994 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkStrategiesTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkStrategyTest.java @@ -30,6 +30,7 @@ import org.apache.flink.metrics.MetricGroup; import org.junit.Test; import java.io.Serializable; +import java.time.Duration; import java.util.Map; import static org.hamcrest.CoreMatchers.instanceOf; @@ -37,97 +38,115 @@ import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; /** - * Test for the {@link WatermarkStrategies} class. + * Test for the {@link WatermarkStrategy} class. */ -public class WatermarkStrategiesTest { +public class WatermarkStrategyTest { @Test public void testDefaultTimeStampAssigner() { - WatermarkStrategy<Object> wmStrategy = WatermarkStrategies - .forMonotonousTimestamps() - .build(); + WatermarkStrategy<Object> wmStrategy = WatermarkStrategy + .forMonotonousTimestamps(); + // ensure that the closure can be cleaned through the WatermarkStategies ClosureCleaner.clean(wmStrategy, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); - assertThat(wmStrategy.createTimestampAssigner(assignerContext()), instanceOf(RecordTimestampAssigner.class)); + assertThat(wmStrategy.createTimestampAssigner(assignerContext()), + instanceOf(RecordTimestampAssigner.class)); } @Test public void testLambdaTimestampAssigner() { - WatermarkStrategy<Object> wmStrategy = WatermarkStrategies + WatermarkStrategy<Object> wmStrategy = WatermarkStrategy .forMonotonousTimestamps() - .withTimestampAssigner((event, timestamp) -> 42L) - .build(); + .withTimestampAssigner((event, timestamp) -> 42L); + // ensure that the closure can be cleaned through the WatermarkStategies ClosureCleaner.clean(wmStrategy, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); - TimestampAssigner<Object> timestampAssigner = wmStrategy.createTimestampAssigner(assignerContext()); + TimestampAssigner<Object> timestampAssigner = wmStrategy + .createTimestampAssigner(assignerContext()); assertThat(timestampAssigner.extractTimestamp(null, 13L), is(42L)); } @Test public void testLambdaTimestampAssignerSupplier() { - WatermarkStrategy<Object> wmStrategy = WatermarkStrategies + WatermarkStrategy<Object> wmStrategy = WatermarkStrategy .forMonotonousTimestamps() - .withTimestampAssigner(TimestampAssignerSupplier.of((event, timestamp) -> 42L)) - .build(); + .withTimestampAssigner(TimestampAssignerSupplier.of((event, timestamp) -> 42L)); // ensure that the closure can be cleaned through the WatermarkStategies ClosureCleaner.clean(wmStrategy, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); - TimestampAssigner<Object> timestampAssigner = wmStrategy.createTimestampAssigner(assignerContext()); + TimestampAssigner<Object> timestampAssigner = wmStrategy + .createTimestampAssigner(assignerContext()); assertThat(timestampAssigner.extractTimestamp(null, 13L), is(42L)); } @Test public void testAnonymousInnerTimestampAssigner() { - WatermarkStrategy<Object> wmStrategy = WatermarkStrategies + WatermarkStrategy<Object> wmStrategy = WatermarkStrategy .forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<Object>() { @Override public long extractTimestamp(Object element, long recordTimestamp) { return 42; } - }) - .build(); + }); // ensure that the closure can be cleaned through the WatermarkStategies ClosureCleaner.clean(wmStrategy, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); - TimestampAssigner<Object> timestampAssigner = wmStrategy.createTimestampAssigner(assignerContext()); + TimestampAssigner<Object> timestampAssigner = wmStrategy + .createTimestampAssigner(assignerContext()); assertThat(timestampAssigner.extractTimestamp(null, 13L), is(42L)); } @Test public void testClassTimestampAssigner() { - WatermarkStrategy<Object> wmStrategy = WatermarkStrategies + WatermarkStrategy<Object> wmStrategy = WatermarkStrategy .forMonotonousTimestamps() - .withTimestampAssigner((ctx) -> new TestTimestampAssigner()) - .build(); + .withTimestampAssigner((ctx) -> new TestTimestampAssigner()); // ensure that the closure can be cleaned through the WatermarkStategies ClosureCleaner.clean(wmStrategy, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); - TimestampAssigner<Object> timestampAssigner = wmStrategy.createTimestampAssigner(assignerContext()); + TimestampAssigner<Object> timestampAssigner = wmStrategy + .createTimestampAssigner(assignerContext()); assertThat(timestampAssigner.extractTimestamp(null, 13L), is(42L)); } @Test public void testClassTimestampAssignerUsingSupplier() { - WatermarkStrategy<Object> wmStrategy = WatermarkStrategies + WatermarkStrategy<Object> wmStrategy = WatermarkStrategy .forMonotonousTimestamps() - .withTimestampAssigner((context) -> new TestTimestampAssigner()) - .build(); + .withTimestampAssigner((context) -> new TestTimestampAssigner()); // ensure that the closure can be cleaned through the WatermarkStategies ClosureCleaner.clean(wmStrategy, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); - TimestampAssigner<Object> timestampAssigner = wmStrategy.createTimestampAssigner(assignerContext()); + TimestampAssigner<Object> timestampAssigner = wmStrategy + .createTimestampAssigner(assignerContext()); assertThat(timestampAssigner.extractTimestamp(null, 13L), is(42L)); } + @Test + public void testWithIdlenessHelper() { + WatermarkStrategy<String> wmStrategy = WatermarkStrategy + .<String>forMonotonousTimestamps() + .withIdleness(Duration.ofDays(7)); + + // ensure that the closure can be cleaned + ClosureCleaner.clean(wmStrategy, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + + assertThat(wmStrategy.createTimestampAssigner(assignerContext()), + instanceOf(RecordTimestampAssigner.class)); + assertThat(wmStrategy.createWatermarkGenerator(generatorContext()), + instanceOf(WatermarksWithIdleness.class)); + } + static class TestTimestampAssigner implements TimestampAssigner<Object>, Serializable { + @Override public long extractTimestamp(Object element, long recordTimestamp) { return 42L; @@ -135,13 +154,29 @@ public class WatermarkStrategiesTest { } static TimestampAssignerSupplier.Context assignerContext() { - return DummyMetricGroup::new; + return new TimestampAssignerSupplier.Context() { + @Override + public MetricGroup getMetricGroup() { + return new DummyMetricGroup(); + } + }; + } + + static WatermarkGeneratorSupplier.Context generatorContext() { + return new WatermarkGeneratorSupplier.Context() { + @Override + public MetricGroup getMetricGroup() { + return new DummyMetricGroup(); + } + }; } /** - * A dummy {@link MetricGroup} to be used when a group is required as an argument but not actually used. + * A dummy {@link MetricGroup} to be used when a group is required as an argument but not + * actually used. */ public static class DummyMetricGroup implements MetricGroup { + @Override public Counter counter(int name) { return null; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 8911f45..5a54ea2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -888,8 +888,8 @@ public class DataStream<T> { * <p>Periodically (defined by the {@link ExecutionConfig#getAutoWatermarkInterval()}), the * {@link WatermarkGenerator#onPeriodicEmit(WatermarkOutput)} method will be called. * - * <p>Common watermark generation patterns can be found in the - * {@link org.apache.flink.api.common.eventtime.WatermarkStrategies} class. + * <p>Common watermark generation patterns can be found as static methods in the + * {@link org.apache.flink.api.common.eventtime.WatermarkStrategy} class. * * @param watermarkStrategy The strategy to generate watermarks based on event timestamps. * @return The stream after the transformation, with assigned timestamps and watermarks. diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java index a174a6e..788385b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api; import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FoldFunction; @@ -92,6 +93,7 @@ import org.junit.rules.ExpectedException; import javax.annotation.Nullable; import java.lang.reflect.Method; +import java.time.Duration; import java.util.List; import static org.junit.Assert.assertEquals; @@ -110,6 +112,27 @@ public class DataStreamTest extends TestLogger { public ExpectedException expectedException = ExpectedException.none(); /** + * Ensure that WatermarkStrategy is easy to use in the API, without superfluous generics. + */ + @Test + public void testErgonomicWatermarkStrategy() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<String> input = env.fromElements("bonjour"); + + // as soon as you have a chain of methods the first call needs a generic + input.assignTimestampsAndWatermarks( + WatermarkStrategy + .forBoundedOutOfOrderness(Duration.ofMillis(10))); + + // as soon as you have a chain of methods the first call needs to specify the generic type + input.assignTimestampsAndWatermarks( + WatermarkStrategy + .<String>forBoundedOutOfOrderness(Duration.ofMillis(10)) + .withTimestampAssigner((event, timestamp) -> 42L)); + } + + /** * Tests union functionality. This ensures that self-unions and unions of streams * with differing parallelism work. * diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index fec7b0d..eca6883 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -18,7 +18,7 @@ package org.apache.flink.streaming.api.graph; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.eventtime.WatermarkStrategies; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; @@ -253,7 +253,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Integer> stream = env.continuousSource( new MockSource(Boundedness.BOUNDED, 1), - WatermarkStrategies.<Integer>noWatermarks().build(), + WatermarkStrategy.noWatermarks(), "TestingSource"); OneInputTransformation<Integer, Integer> resultTransform = new OneInputTransformation<Integer, Integer>( @@ -463,7 +463,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Integer> source = env.continuousSource( new MockSource(Boundedness.BOUNDED, 1), - WatermarkStrategies.<Integer>noWatermarks().build(), + WatermarkStrategy.noWatermarks(), "TestSource"); source.addSink(new DiscardingSink<>()); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java index 347975c..8d05336 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.operators.source; import org.apache.flink.api.common.eventtime.Watermark; -import org.apache.flink.api.common.eventtime.WatermarkStrategies; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.connector.source.ReaderOutput; @@ -57,9 +56,9 @@ public class SourceOperatorEventTimeTest { @Test public void testMainOutputPeriodicWatermarks() throws Exception { - final WatermarkStrategy<Integer> watermarkStrategy = WatermarkStrategies - .<Integer>forGenerator((ctx) -> new OnPeriodicTestWatermarkGenerator<>()) - .build(); + final WatermarkStrategy<Integer> watermarkStrategy = + WatermarkStrategy + .forGenerator((ctx) -> new OnPeriodicTestWatermarkGenerator<>()); final List<Watermark> result = testSequenceOfWatermarks(watermarkStrategy, (output) -> output.collect(0, 100L), @@ -75,9 +74,9 @@ public class SourceOperatorEventTimeTest { @Test public void testMainOutputEventWatermarks() throws Exception { - final WatermarkStrategy<Integer> watermarkStrategy = WatermarkStrategies - .<Integer>forGenerator((ctx) -> new OnEventTestWatermarkGenerator<>()) - .build(); + final WatermarkStrategy<Integer> watermarkStrategy = + WatermarkStrategy + .forGenerator((ctx) -> new OnEventTestWatermarkGenerator<>()); final List<Watermark> result = testSequenceOfWatermarks(watermarkStrategy, (output) -> output.collect(0, 100L), @@ -93,9 +92,9 @@ public class SourceOperatorEventTimeTest { @Test public void testPerSplitOutputPeriodicWatermarks() throws Exception { - final WatermarkStrategy<Integer> watermarkStrategy = WatermarkStrategies - .<Integer>forGenerator((ctx) -> new OnPeriodicTestWatermarkGenerator<>()) - .build(); + final WatermarkStrategy<Integer> watermarkStrategy = + WatermarkStrategy + .forGenerator((ctx) -> new OnPeriodicTestWatermarkGenerator<>()); final List<Watermark> result = testSequenceOfWatermarks(watermarkStrategy, (output) -> { @@ -118,9 +117,9 @@ public class SourceOperatorEventTimeTest { @Test public void testPerSplitOutputEventWatermarks() throws Exception { - final WatermarkStrategy<Integer> watermarkStrategy = WatermarkStrategies - .<Integer>forGenerator((ctx) -> new OnEventTestWatermarkGenerator<>()) - .build(); + final WatermarkStrategy<Integer> watermarkStrategy = + WatermarkStrategy + .forGenerator((ctx) -> new OnEventTestWatermarkGenerator<>()); final List<Watermark> result = testSequenceOfWatermarks(watermarkStrategy, (output) -> { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java index 43992dc..038a211 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.operators.source; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.eventtime.WatermarkStrategies; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.mocks.MockSourceSplit; @@ -56,7 +55,7 @@ public class TestingSourceOperator<T> extends SourceOperator<T, MockSourceSplit OperatorEventGateway eventGateway, int subtaskIndex) { - this(reader, WatermarkStrategies.<T>noWatermarks().build(), new TestProcessingTimeService(), eventGateway, subtaskIndex, 5); + this(reader, WatermarkStrategy.noWatermarks(), new TestProcessingTimeService(), eventGateway, subtaskIndex, 5); } public TestingSourceOperator( diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java index 4c149ba..995a8ca 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.eventtime.TimestampAssigner; import org.apache.flink.api.common.eventtime.Watermark; import org.apache.flink.api.common.eventtime.WatermarkGenerator; import org.apache.flink.api.common.eventtime.WatermarkOutput; -import org.apache.flink.api.common.eventtime.WatermarkStrategies; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; @@ -47,7 +47,7 @@ public class TimestampsAndWatermarksOperatorTest { @Test public void inputWatermarksAreNotForwarded() throws Exception { OneInputStreamOperatorTestHarness<Long, Long> testHarness = createTestHarness( - WatermarkStrategies + WatermarkStrategy .forGenerator((ctx) -> new PeriodicWatermarkGenerator()) .withTimestampAssigner((ctx) -> new LongExtractor())); @@ -60,7 +60,7 @@ public class TimestampsAndWatermarksOperatorTest { @Test public void longMaxInputWatermarkIsForwarded() throws Exception { OneInputStreamOperatorTestHarness<Long, Long> testHarness = createTestHarness( - WatermarkStrategies + WatermarkStrategy .forGenerator((ctx) -> new PeriodicWatermarkGenerator()) .withTimestampAssigner((ctx) -> new LongExtractor())); @@ -72,7 +72,7 @@ public class TimestampsAndWatermarksOperatorTest { @Test public void periodicWatermarksEmitOnPeriodicEmit() throws Exception { OneInputStreamOperatorTestHarness<Long, Long> testHarness = createTestHarness( - WatermarkStrategies + WatermarkStrategy .forGenerator((ctx) -> new PeriodicWatermarkGenerator()) .withTimestampAssigner((ctx) -> new LongExtractor())); @@ -92,7 +92,7 @@ public class TimestampsAndWatermarksOperatorTest { @Test public void periodicWatermarksOnlyEmitOnPeriodicEmit() throws Exception { OneInputStreamOperatorTestHarness<Long, Long> testHarness = createTestHarness( - WatermarkStrategies + WatermarkStrategy .forGenerator((ctx) -> new PeriodicWatermarkGenerator()) .withTimestampAssigner((ctx) -> new LongExtractor())); @@ -105,7 +105,7 @@ public class TimestampsAndWatermarksOperatorTest { @Test public void periodicWatermarksDoNotRegress() throws Exception { OneInputStreamOperatorTestHarness<Long, Long> testHarness = createTestHarness( - WatermarkStrategies + WatermarkStrategy .forGenerator((ctx) -> new PeriodicWatermarkGenerator()) .withTimestampAssigner((ctx) -> new LongExtractor())); @@ -125,7 +125,7 @@ public class TimestampsAndWatermarksOperatorTest { @Test public void punctuatedWatermarksEmitImmediately() throws Exception { OneInputStreamOperatorTestHarness<Tuple2<Boolean, Long>, Tuple2<Boolean, Long>> testHarness = createTestHarness( - WatermarkStrategies + WatermarkStrategy .forGenerator((ctx) -> new PunctuatedWatermarkGenerator()) .withTimestampAssigner((ctx) -> new TupleExtractor())); @@ -143,7 +143,7 @@ public class TimestampsAndWatermarksOperatorTest { @Test public void punctuatedWatermarksDoNotRegress() throws Exception { OneInputStreamOperatorTestHarness<Tuple2<Boolean, Long>, Tuple2<Boolean, Long>> testHarness = createTestHarness( - WatermarkStrategies + WatermarkStrategy .forGenerator((ctx) -> new PunctuatedWatermarkGenerator()) .withTimestampAssigner((ctx) -> new TupleExtractor())); @@ -165,7 +165,7 @@ public class TimestampsAndWatermarksOperatorTest { public void testNegativeTimestamps() throws Exception { OneInputStreamOperatorTestHarness<Long, Long> testHarness = createTestHarness( - WatermarkStrategies + WatermarkStrategy .forGenerator((ctx) -> new NeverWatermarkGenerator()) .withTimestampAssigner((ctx) -> new LongExtractor())); @@ -181,10 +181,10 @@ public class TimestampsAndWatermarksOperatorTest { } private static <T> OneInputStreamOperatorTestHarness<T, T> createTestHarness( - WatermarkStrategies<T> watermarkStrategy) throws Exception { + WatermarkStrategy<T> watermarkStrategy) throws Exception { final TimestampsAndWatermarksOperator<T> operator = - new TimestampsAndWatermarksOperator<>(watermarkStrategy.build()); + new TimestampsAndWatermarksOperator<>(watermarkStrategy); OneInputStreamOperatorTestHarness<T, T> testHarness = new OneInputStreamOperatorTestHarness<>(operator); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java index 515bd51..0f2e99d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java @@ -19,7 +19,7 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.api.common.eventtime.TimestampAssigner; -import org.apache.flink.api.common.eventtime.WatermarkStrategies; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.mocks.MockSource; @@ -132,7 +132,7 @@ public class SourceOperatorStreamTaskTest { // get a source operator. SourceOperatorFactory<Integer> sourceOperatorFactory = new SourceOperatorFactory<>( new MockSource(Boundedness.BOUNDED, 1), - WatermarkStrategies.<Integer>noWatermarks().build()); + WatermarkStrategy.noWatermarks()); // build a test harness. MultipleInputStreamTaskTestHarnessBuilder<Integer> builder = diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index fe2184e..0608c53 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -828,8 +828,8 @@ class DataStream[T](stream: JavaStream[T]) { * Periodically (defined by the [[ExecutionConfig#getAutoWatermarkInterval()]]), the * [[WatermarkGenerator#onPeriodicEmit(WatermarkOutput)]] method will be called. * - * Common watermark generation patterns can be found in the - * [[org.apache.flink.api.common.eventtime.WatermarkStrategies]] class. + * Common watermark generation patterns can be found as static methods in the + * [[org.apache.flink.api.common.eventtime.WatermarkStrategy]] class. */ def assignTimestampsAndWatermarks(watermarkStrategy: WatermarkStrategy[T]): DataStream[T] = { val cleanedStrategy = clean(watermarkStrategy) diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala index f493497..fa503e0 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala @@ -18,12 +18,11 @@ package org.apache.flink.streaming.api.scala -import org.apache.flink.api.common.eventtime.WatermarkStrategies +import org.apache.flink.api.common.eventtime.WatermarkStrategy import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.connector.source.Boundedness import org.apache.flink.api.connector.source.mocks.MockSource import org.apache.flink.api.java.typeutils.GenericTypeInfo - import org.junit.Assert.assertEquals import org.junit.Test @@ -43,7 +42,7 @@ class StreamExecutionEnvironmentTest { val stream = env.continuousSource( new MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 1), - WatermarkStrategies.noWatermarks[Integer]().build(), + WatermarkStrategy.noWatermarks(), "test source") assertEquals(typeInfo, stream.dataType)