[jira] [Commented] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator
[ https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542120#comment-16542120 ] ASF GitHub Bot commented on FLINK-8480: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5482 > Implement Java API to expose join functionality of > TimeBoundedStreamJoinOperator > > > Key: FLINK-8480 > URL: https://issues.apache.org/jira/browse/FLINK-8480 > Project: Flink > Issue Type: Sub-task >Reporter: Florian Schmidt >Assignee: Florian Schmidt >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator
[ https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449930#comment-16449930 ] ASF GitHub Bot commented on FLINK-8480: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5482#discussion_r183745456 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java --- @@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelectorkeySelector) { public WithWindow window(WindowAssigner, W> assigner) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null); } + + /** +* Specifies the time boundaries over which the join operation works, so that +* leftElement.timestamp + lowerBound <= rightElement.timestamp <= leftElement.timestamp + upperBound +* By default both the lower and the upper bound are inclusive. This can be configured +* with {@link TimeBounded#lowerBoundExclusive(boolean)} and +* {@link TimeBounded#upperBoundExclusive(boolean)} +* +* @param lowerBound The lower bound. Needs to be smaller than or equal to the upperBound +* @param upperBound The upper bound. Needs to be bigger than or equal to the lowerBound +*/ + public TimeBounded between(Time lowerBound, Time upperBound) { + + TimeCharacteristic timeCharacteristic = + input1.getExecutionEnvironment().getStreamTimeCharacteristic(); + + if (timeCharacteristic != TimeCharacteristic.EventTime) { + throw new RuntimeException("Time-bounded stream joins are only supported in event time"); + } + + checkNotNull(lowerBound, "A lower bound needs to be provided for a time-bounded join"); + checkNotNull(upperBound, "An upper bound needs to be provided for a time-bounded join"); + return new TimeBounded<>( + input1, + input2, + lowerBound.toMilliseconds(), + upperBound.toMilliseconds(), + true, + true, + keySelector1, + keySelector2 + ); + } + } + } + + /** +* Joined streams that have keys for both sides as well as the time boundaries over which +* elements should be joined defined. +* +* @param Input type of elements from the first stream +* @param Input type of elements from the second stream +* @param The type of the key +*/ + public static class TimeBounded { + + private static final String TIMEBOUNDED_JOIN_FUNC_NAME = "TimeBoundedJoin"; + + private final DataStream left; + private final DataStream right; + + private final long lowerBound; + private final long upperBound; + + private final KeySelector keySelector1; + private final KeySelector keySelector2; + + private boolean lowerBoundInclusive; + private boolean upperBoundInclusive; + + public TimeBounded( + DataStream left, + DataStream right, + long lowerBound, + long upperBound, + boolean lowerBoundInclusive, + boolean upperBoundInclusive, + KeySelector keySelector1, + KeySelector keySelector2) { + + this.left = Preconditions.checkNotNull(left); + this.right = Preconditions.checkNotNull(right); + + this.lowerBound = lowerBound; + this.upperBound = upperBound; + + this.lowerBoundInclusive = lowerBoundInclusive; + this.upperBoundInclusive = upperBoundInclusive; + + this.keySelector1 = Preconditions.checkNotNull(keySelector1); + this.keySelector2 =
[jira] [Commented] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator
[ https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449836#comment-16449836 ] ASF GitHub Bot commented on FLINK-8480: --- Github user florianschmidt1994 commented on a diff in the pull request: https://github.com/apache/flink/pull/5482#discussion_r183727245 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java --- @@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelectorkeySelector) { public WithWindow window(WindowAssigner, W> assigner) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null); } + + /** +* Specifies the time boundaries over which the join operation works, so that +* leftElement.timestamp + lowerBound <= rightElement.timestamp <= leftElement.timestamp + upperBound +* By default both the lower and the upper bound are inclusive. This can be configured +* with {@link TimeBounded#lowerBoundExclusive(boolean)} and +* {@link TimeBounded#upperBoundExclusive(boolean)} +* +* @param lowerBound The lower bound. Needs to be smaller than or equal to the upperBound +* @param upperBound The upper bound. Needs to be bigger than or equal to the lowerBound +*/ + public TimeBounded between(Time lowerBound, Time upperBound) { + + TimeCharacteristic timeCharacteristic = + input1.getExecutionEnvironment().getStreamTimeCharacteristic(); + + if (timeCharacteristic != TimeCharacteristic.EventTime) { + throw new RuntimeException("Time-bounded stream joins are only supported in event time"); + } + + checkNotNull(lowerBound, "A lower bound needs to be provided for a time-bounded join"); + checkNotNull(upperBound, "An upper bound needs to be provided for a time-bounded join"); + return new TimeBounded<>( + input1, + input2, + lowerBound.toMilliseconds(), + upperBound.toMilliseconds(), + true, + true, + keySelector1, + keySelector2 + ); + } + } + } + + /** +* Joined streams that have keys for both sides as well as the time boundaries over which +* elements should be joined defined. +* +* @param Input type of elements from the first stream +* @param Input type of elements from the second stream +* @param The type of the key +*/ + public static class TimeBounded { + + private static final String TIMEBOUNDED_JOIN_FUNC_NAME = "TimeBoundedJoin"; + + private final DataStream left; + private final DataStream right; + + private final long lowerBound; + private final long upperBound; + + private final KeySelector keySelector1; + private final KeySelector keySelector2; + + private boolean lowerBoundInclusive; + private boolean upperBoundInclusive; + + public TimeBounded( + DataStream left, + DataStream right, + long lowerBound, + long upperBound, + boolean lowerBoundInclusive, + boolean upperBoundInclusive, + KeySelector keySelector1, + KeySelector keySelector2) { + + this.left = Preconditions.checkNotNull(left); + this.right = Preconditions.checkNotNull(right); + + this.lowerBound = lowerBound; + this.upperBound = upperBound; + + this.lowerBoundInclusive = lowerBoundInclusive; + this.upperBoundInclusive = upperBoundInclusive; + + this.keySelector1 = Preconditions.checkNotNull(keySelector1); + this.keySelector2 =
[jira] [Commented] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator
[ https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449763#comment-16449763 ] ASF GitHub Bot commented on FLINK-8480: --- Github user florianschmidt1994 commented on the issue: https://github.com/apache/flink/pull/5482 Thanks for your comments @bowenli86 and sorry for the delay! I added some comments and some stuff I'll be addressing right away > Implement Java API to expose join functionality of > TimeBoundedStreamJoinOperator > > > Key: FLINK-8480 > URL: https://issues.apache.org/jira/browse/FLINK-8480 > Project: Flink > Issue Type: Sub-task >Reporter: Florian Schmidt >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator
[ https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449760#comment-16449760 ] ASF GitHub Bot commented on FLINK-8480: --- Github user florianschmidt1994 commented on a diff in the pull request: https://github.com/apache/flink/pull/5482#discussion_r183712409 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java --- @@ -0,0 +1,480 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.BooleanSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.OutputTag; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * A TwoInputStreamOperator to execute time-bounded stream inner joins. + * + * By using a configurable lower and upper bound this operator will emit exactly those pairs + * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both the lower and the + * upper bound can be configured to be either inclusive or exclusive. + * + * As soon as elements are joined they are passed to a user-defined {@link TimeBoundedJoinFunction}, + * as a {@link Tuple2}, with f0 being the left element and f1 being the right element + * + * The basic idea of this implementation is as follows: Whenever we receive an element at + * {@link #processElement1(StreamRecord)} (a.k.a. the left side), we add it to the left buffer. + * We then check the right buffer to see whether there are any elements that can be joined. If + * there are, they are joined and passed to a user-defined {@link TimeBoundedJoinFunction}. + * The same happens the other way around when receiving an element on the right side. + * + * In some cases the watermark needs to be delayed. This for example can happen if + * if t2.ts ∈ [t1.ts + 1, t1.ts + 2] and elements from t1 arrive earlier than elements from t2 and + * therefore get added to the left buffer. When an element now arrives on the right side, the + * watermark might have already progressed. The right element now gets joined with an + * older element from the left side, where the timestamp of the left element is lower than the + * current watermark, which would make this element late. This can be avoided by holding back the + * watermarks. + * + * The left and right buffers are cleared from unused values periodically + * (triggered by watermarks) in
[jira] [Commented] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator
[ https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449733#comment-16449733 ] ASF GitHub Bot commented on FLINK-8480: --- Github user florianschmidt1994 commented on a diff in the pull request: https://github.com/apache/flink/pull/5482#discussion_r183706512 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java --- @@ -0,0 +1,480 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.BooleanSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.OutputTag; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * A TwoInputStreamOperator to execute time-bounded stream inner joins. + * + * By using a configurable lower and upper bound this operator will emit exactly those pairs + * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both the lower and the + * upper bound can be configured to be either inclusive or exclusive. --- End diff -- You mean the spelling? I can address this in the PR of the TimeBoundedStreamJoinOperator > Implement Java API to expose join functionality of > TimeBoundedStreamJoinOperator > > > Key: FLINK-8480 > URL: https://issues.apache.org/jira/browse/FLINK-8480 > Project: Flink > Issue Type: Sub-task >Reporter: Florian Schmidt >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator
[ https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449726#comment-16449726 ] ASF GitHub Bot commented on FLINK-8480: --- Github user florianschmidt1994 commented on a diff in the pull request: https://github.com/apache/flink/pull/5482#discussion_r183705882 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java --- @@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelectorkeySelector) { public WithWindow window(WindowAssigner, W> assigner) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null); } + + /** +* Specifies the time boundaries over which the join operation works, so that +* leftElement.timestamp + lowerBound <= rightElement.timestamp <= leftElement.timestamp + upperBound +* By default both the lower and the upper bound are inclusive. This can be configured +* with {@link TimeBounded#lowerBoundExclusive(boolean)} and +* {@link TimeBounded#upperBoundExclusive(boolean)} +* +* @param lowerBound The lower bound. Needs to be smaller than or equal to the upperBound +* @param upperBound The upper bound. Needs to be bigger than or equal to the lowerBound +*/ + public TimeBounded between(Time lowerBound, Time upperBound) { + + TimeCharacteristic timeCharacteristic = + input1.getExecutionEnvironment().getStreamTimeCharacteristic(); + + if (timeCharacteristic != TimeCharacteristic.EventTime) { + throw new RuntimeException("Time-bounded stream joins are only supported in event time"); + } + + checkNotNull(lowerBound, "A lower bound needs to be provided for a time-bounded join"); + checkNotNull(upperBound, "An upper bound needs to be provided for a time-bounded join"); + return new TimeBounded<>( + input1, + input2, + lowerBound.toMilliseconds(), + upperBound.toMilliseconds(), + true, + true, + keySelector1, + keySelector2 + ); + } + } + } + + /** +* Joined streams that have keys for both sides as well as the time boundaries over which +* elements should be joined defined. +* +* @param Input type of elements from the first stream +* @param Input type of elements from the second stream +* @param The type of the key +*/ + public static class TimeBounded { + + private static final String TIMEBOUNDED_JOIN_FUNC_NAME = "TimeBoundedJoin"; + + private final DataStream left; + private final DataStream right; + + private final long lowerBound; + private final long upperBound; + + private final KeySelector keySelector1; + private final KeySelector keySelector2; + + private boolean lowerBoundInclusive; + private boolean upperBoundInclusive; + + public TimeBounded( + DataStream left, + DataStream right, + long lowerBound, + long upperBound, + boolean lowerBoundInclusive, + boolean upperBoundInclusive, + KeySelector keySelector1, + KeySelector keySelector2) { + + this.left = Preconditions.checkNotNull(left); + this.right = Preconditions.checkNotNull(right); + + this.lowerBound = lowerBound; + this.upperBound = upperBound; + + this.lowerBoundInclusive = lowerBoundInclusive; + this.upperBoundInclusive = upperBoundInclusive; + + this.keySelector1 = Preconditions.checkNotNull(keySelector1); + this.keySelector2 =
[jira] [Commented] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator
[ https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449718#comment-16449718 ] ASF GitHub Bot commented on FLINK-8480: --- Github user florianschmidt1994 commented on a diff in the pull request: https://github.com/apache/flink/pull/5482#discussion_r183704705 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java --- @@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelectorkeySelector) { public WithWindow window(WindowAssigner, W> assigner) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null); } + + /** +* Specifies the time boundaries over which the join operation works, so that +* leftElement.timestamp + lowerBound <= rightElement.timestamp <= leftElement.timestamp + upperBound +* By default both the lower and the upper bound are inclusive. This can be configured +* with {@link TimeBounded#lowerBoundExclusive(boolean)} and +* {@link TimeBounded#upperBoundExclusive(boolean)} +* +* @param lowerBound The lower bound. Needs to be smaller than or equal to the upperBound +* @param upperBound The upper bound. Needs to be bigger than or equal to the lowerBound +*/ + public TimeBounded between(Time lowerBound, Time upperBound) { + + TimeCharacteristic timeCharacteristic = + input1.getExecutionEnvironment().getStreamTimeCharacteristic(); + + if (timeCharacteristic != TimeCharacteristic.EventTime) { + throw new RuntimeException("Time-bounded stream joins are only supported in event time"); + } + + checkNotNull(lowerBound, "A lower bound needs to be provided for a time-bounded join"); + checkNotNull(upperBound, "An upper bound needs to be provided for a time-bounded join"); + return new TimeBounded<>( + input1, + input2, + lowerBound.toMilliseconds(), + upperBound.toMilliseconds(), + true, + true, + keySelector1, + keySelector2 + ); + } + } + } + + /** +* Joined streams that have keys for both sides as well as the time boundaries over which +* elements should be joined defined. +* +* @param Input type of elements from the first stream +* @param Input type of elements from the second stream +* @param The type of the key +*/ + public static class TimeBounded { + + private static final String TIMEBOUNDED_JOIN_FUNC_NAME = "TimeBoundedJoin"; --- End diff -- Looks like there is only one other case in the datastream package where we have a `static final ...` as a constant, and it's inlined there as well. I'd propose to keep it here for the time and rethink a config class if we see more cases coming up > Implement Java API to expose join functionality of > TimeBoundedStreamJoinOperator > > > Key: FLINK-8480 > URL: https://issues.apache.org/jira/browse/FLINK-8480 > Project: Flink > Issue Type: Sub-task >Reporter: Florian Schmidt >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator
[ https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449713#comment-16449713 ] ASF GitHub Bot commented on FLINK-8480: --- Github user florianschmidt1994 commented on a diff in the pull request: https://github.com/apache/flink/pull/5482#discussion_r183703916 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java --- @@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelectorkeySelector) { public WithWindow window(WindowAssigner, W> assigner) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null); } + + /** +* Specifies the time boundaries over which the join operation works, so that +* leftElement.timestamp + lowerBound <= rightElement.timestamp <= leftElement.timestamp + upperBound +* By default both the lower and the upper bound are inclusive. This can be configured +* with {@link TimeBounded#lowerBoundExclusive(boolean)} and +* {@link TimeBounded#upperBoundExclusive(boolean)} +* +* @param lowerBound The lower bound. Needs to be smaller than or equal to the upperBound +* @param upperBound The upper bound. Needs to be bigger than or equal to the lowerBound +*/ + public TimeBounded between(Time lowerBound, Time upperBound) { + + TimeCharacteristic timeCharacteristic = + input1.getExecutionEnvironment().getStreamTimeCharacteristic(); + + if (timeCharacteristic != TimeCharacteristic.EventTime) { + throw new RuntimeException("Time-bounded stream joins are only supported in event time"); --- End diff -- I'm not sure about the `IllegalStateException`. The description of it says >Signals that a method has been invoked at an illegal or inappropriate time. In other words, the Java environment or Java application is not in an appropriate state for the requested operation. But I'm not opposed to the idea of creating a custom exception for all cases where an operation is not supported with the current time characteristic. I'll see if I can think of something > Implement Java API to expose join functionality of > TimeBoundedStreamJoinOperator > > > Key: FLINK-8480 > URL: https://issues.apache.org/jira/browse/FLINK-8480 > Project: Flink > Issue Type: Sub-task >Reporter: Florian Schmidt >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator
[ https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386643#comment-16386643 ] ASF GitHub Bot commented on FLINK-8480: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5482#discussion_r172303424 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java --- @@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelectorkeySelector) { public WithWindow window(WindowAssigner, W> assigner) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null); } + + /** +* Specifies the time boundaries over which the join operation works, so that +* leftElement.timestamp + lowerBound <= rightElement.timestamp <= leftElement.timestamp + upperBound +* By default both the lower and the upper bound are inclusive. This can be configured +* with {@link TimeBounded#lowerBoundExclusive(boolean)} and +* {@link TimeBounded#upperBoundExclusive(boolean)} +* +* @param lowerBound The lower bound. Needs to be smaller than or equal to the upperBound +* @param upperBound The upper bound. Needs to be bigger than or equal to the lowerBound +*/ + public TimeBounded between(Time lowerBound, Time upperBound) { + + TimeCharacteristic timeCharacteristic = + input1.getExecutionEnvironment().getStreamTimeCharacteristic(); + + if (timeCharacteristic != TimeCharacteristic.EventTime) { + throw new RuntimeException("Time-bounded stream joins are only supported in event time"); + } + + checkNotNull(lowerBound, "A lower bound needs to be provided for a time-bounded join"); + checkNotNull(upperBound, "An upper bound needs to be provided for a time-bounded join"); + return new TimeBounded<>( + input1, + input2, + lowerBound.toMilliseconds(), + upperBound.toMilliseconds(), + true, + true, + keySelector1, + keySelector2 + ); + } + } + } + + /** +* Joined streams that have keys for both sides as well as the time boundaries over which +* elements should be joined defined. +* +* @param Input type of elements from the first stream +* @param Input type of elements from the second stream +* @param The type of the key +*/ + public static class TimeBounded { + + private static final String TIMEBOUNDED_JOIN_FUNC_NAME = "TimeBoundedJoin"; + + private final DataStream left; + private final DataStream right; + + private final long lowerBound; + private final long upperBound; + + private final KeySelector keySelector1; + private final KeySelector keySelector2; + + private boolean lowerBoundInclusive; + private boolean upperBoundInclusive; + + public TimeBounded( + DataStream left, + DataStream right, + long lowerBound, + long upperBound, + boolean lowerBoundInclusive, + boolean upperBoundInclusive, + KeySelector keySelector1, + KeySelector keySelector2) { + + this.left = Preconditions.checkNotNull(left); + this.right = Preconditions.checkNotNull(right); + + this.lowerBound = lowerBound; + this.upperBound = upperBound; + + this.lowerBoundInclusive = lowerBoundInclusive; + this.upperBoundInclusive = upperBoundInclusive; + + this.keySelector1 = Preconditions.checkNotNull(keySelector1); + this.keySelector2 =
[jira] [Commented] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator
[ https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386642#comment-16386642 ] ASF GitHub Bot commented on FLINK-8480: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5482#discussion_r172306197 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java --- @@ -0,0 +1,480 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.BooleanSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.OutputTag; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * A TwoInputStreamOperator to execute time-bounded stream inner joins. + * + * By using a configurable lower and upper bound this operator will emit exactly those pairs + * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both the lower and the + * upper bound can be configured to be either inclusive or exclusive. + * + * As soon as elements are joined they are passed to a user-defined {@link TimeBoundedJoinFunction}, + * as a {@link Tuple2}, with f0 being the left element and f1 being the right element + * + * The basic idea of this implementation is as follows: Whenever we receive an element at + * {@link #processElement1(StreamRecord)} (a.k.a. the left side), we add it to the left buffer. + * We then check the right buffer to see whether there are any elements that can be joined. If + * there are, they are joined and passed to a user-defined {@link TimeBoundedJoinFunction}. + * The same happens the other way around when receiving an element on the right side. + * + * In some cases the watermark needs to be delayed. This for example can happen if + * if t2.ts ∈ [t1.ts + 1, t1.ts + 2] and elements from t1 arrive earlier than elements from t2 and + * therefore get added to the left buffer. When an element now arrives on the right side, the + * watermark might have already progressed. The right element now gets joined with an + * older element from the left side, where the timestamp of the left element is lower than the + * current watermark, which would make this element late. This can be avoided by holding back the + * watermarks. + * + * The left and right buffers are cleared from unused values periodically + * (triggered by watermarks) in order not
[jira] [Commented] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator
[ https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386641#comment-16386641 ] ASF GitHub Bot commented on FLINK-8480: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5482#discussion_r172302583 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java --- @@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelectorkeySelector) { public WithWindow window(WindowAssigner, W> assigner) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null); } + + /** +* Specifies the time boundaries over which the join operation works, so that +* leftElement.timestamp + lowerBound <= rightElement.timestamp <= leftElement.timestamp + upperBound +* By default both the lower and the upper bound are inclusive. This can be configured +* with {@link TimeBounded#lowerBoundExclusive(boolean)} and +* {@link TimeBounded#upperBoundExclusive(boolean)} +* +* @param lowerBound The lower bound. Needs to be smaller than or equal to the upperBound +* @param upperBound The upper bound. Needs to be bigger than or equal to the lowerBound +*/ + public TimeBounded between(Time lowerBound, Time upperBound) { + + TimeCharacteristic timeCharacteristic = + input1.getExecutionEnvironment().getStreamTimeCharacteristic(); + + if (timeCharacteristic != TimeCharacteristic.EventTime) { + throw new RuntimeException("Time-bounded stream joins are only supported in event time"); + } + + checkNotNull(lowerBound, "A lower bound needs to be provided for a time-bounded join"); + checkNotNull(upperBound, "An upper bound needs to be provided for a time-bounded join"); + return new TimeBounded<>( + input1, + input2, + lowerBound.toMilliseconds(), + upperBound.toMilliseconds(), + true, + true, + keySelector1, + keySelector2 + ); + } + } + } + + /** +* Joined streams that have keys for both sides as well as the time boundaries over which +* elements should be joined defined. +* +* @param Input type of elements from the first stream +* @param Input type of elements from the second stream +* @param The type of the key +*/ + public static class TimeBounded { + + private static final String TIMEBOUNDED_JOIN_FUNC_NAME = "TimeBoundedJoin"; --- End diff -- hmm... this might be not very relevant, but I'd prefer a single config class that holds all function's names, rather than having them scattered all over the code base. > Implement Java API to expose join functionality of > TimeBoundedStreamJoinOperator > > > Key: FLINK-8480 > URL: https://issues.apache.org/jira/browse/FLINK-8480 > Project: Flink > Issue Type: Sub-task >Reporter: Florian Schmidt >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator
[ https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386640#comment-16386640 ] ASF GitHub Bot commented on FLINK-8480: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5482#discussion_r172303671 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java --- @@ -0,0 +1,480 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.BooleanSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.OutputTag; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * A TwoInputStreamOperator to execute time-bounded stream inner joins. + * + * By using a configurable lower and upper bound this operator will emit exactly those pairs + * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both the lower and the + * upper bound can be configured to be either inclusive or exclusive. --- End diff -- bound**s** > Implement Java API to expose join functionality of > TimeBoundedStreamJoinOperator > > > Key: FLINK-8480 > URL: https://issues.apache.org/jira/browse/FLINK-8480 > Project: Flink > Issue Type: Sub-task >Reporter: Florian Schmidt >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator
[ https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386639#comment-16386639 ] ASF GitHub Bot commented on FLINK-8480: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5482#discussion_r172302147 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java --- @@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelectorkeySelector) { public WithWindow window(WindowAssigner, W> assigner) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null); } + + /** +* Specifies the time boundaries over which the join operation works, so that +* leftElement.timestamp + lowerBound <= rightElement.timestamp <= leftElement.timestamp + upperBound +* By default both the lower and the upper bound are inclusive. This can be configured +* with {@link TimeBounded#lowerBoundExclusive(boolean)} and +* {@link TimeBounded#upperBoundExclusive(boolean)} +* +* @param lowerBound The lower bound. Needs to be smaller than or equal to the upperBound +* @param upperBound The upper bound. Needs to be bigger than or equal to the lowerBound +*/ + public TimeBounded between(Time lowerBound, Time upperBound) { + + TimeCharacteristic timeCharacteristic = + input1.getExecutionEnvironment().getStreamTimeCharacteristic(); + + if (timeCharacteristic != TimeCharacteristic.EventTime) { + throw new RuntimeException("Time-bounded stream joins are only supported in event time"); --- End diff -- should use `IllegalStateException`. or even better, shall we create a Flink specific exception? > Implement Java API to expose join functionality of > TimeBoundedStreamJoinOperator > > > Key: FLINK-8480 > URL: https://issues.apache.org/jira/browse/FLINK-8480 > Project: Flink > Issue Type: Sub-task >Reporter: Florian Schmidt >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)