[jira] [Commented] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542120#comment-16542120
 ] 

ASF GitHub Bot commented on FLINK-8480:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5482


> Implement Java API to expose join functionality of 
> TimeBoundedStreamJoinOperator
> 
>
> Key: FLINK-8480
> URL: https://issues.apache.org/jira/browse/FLINK-8480
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Florian Schmidt
>Assignee: Florian Schmidt
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449930#comment-16449930
 ] 

ASF GitHub Bot commented on FLINK-8480:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5482#discussion_r183745456
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
 ---
@@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelector 
keySelector)  {
public  WithWindow 
window(WindowAssigner, W> assigner) {
return new WithWindow<>(input1, input2, 
keySelector1, keySelector2, keyType, assigner, null, null);
}
+
+   /**
+* Specifies the time boundaries over which the join 
operation works, so that
+* leftElement.timestamp + lowerBound <= 
rightElement.timestamp <= leftElement.timestamp + upperBound
+* By default both the lower and the upper bound are 
inclusive. This can be configured
+* with {@link 
TimeBounded#lowerBoundExclusive(boolean)} and
+* {@link TimeBounded#upperBoundExclusive(boolean)}
+*
+* @param lowerBound The lower bound. Needs to be 
smaller than or equal to the upperBound
+* @param upperBound The upper bound. Needs to be 
bigger than or equal to the lowerBound
+*/
+   public TimeBounded between(Time 
lowerBound, Time upperBound) {
+
+   TimeCharacteristic timeCharacteristic =
+   
input1.getExecutionEnvironment().getStreamTimeCharacteristic();
+
+   if (timeCharacteristic != 
TimeCharacteristic.EventTime) {
+   throw new 
RuntimeException("Time-bounded stream joins are only supported in event time");
+   }
+
+   checkNotNull(lowerBound, "A lower bound needs 
to be provided for a time-bounded join");
+   checkNotNull(upperBound, "An upper bound needs 
to be provided for a time-bounded join");
+   return new TimeBounded<>(
+   input1,
+   input2,
+   lowerBound.toMilliseconds(),
+   upperBound.toMilliseconds(),
+   true,
+   true,
+   keySelector1,
+   keySelector2
+   );
+   }
+   }
+   }
+
+   /**
+* Joined streams that have keys for both sides as well as the time 
boundaries over which
+* elements should be joined defined.
+*
+* @param  Input type of elements from the first stream
+* @param  Input type of elements from the second stream
+* @param  The type of the key
+*/
+   public static class TimeBounded {
+
+   private static final String TIMEBOUNDED_JOIN_FUNC_NAME = 
"TimeBoundedJoin";
+
+   private final DataStream left;
+   private final DataStream right;
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final KeySelector keySelector1;
+   private final KeySelector keySelector2;
+
+   private boolean lowerBoundInclusive;
+   private boolean upperBoundInclusive;
+
+   public TimeBounded(
+   DataStream left,
+   DataStream right,
+   long lowerBound,
+   long upperBound,
+   boolean lowerBoundInclusive,
+   boolean upperBoundInclusive,
+   KeySelector keySelector1,
+   KeySelector keySelector2) {
+
+   this.left = Preconditions.checkNotNull(left);
+   this.right = Preconditions.checkNotNull(right);
+
+   this.lowerBound = lowerBound;
+   this.upperBound = upperBound;
+
+   this.lowerBoundInclusive = lowerBoundInclusive;
+   this.upperBoundInclusive = upperBoundInclusive;
+
+   this.keySelector1 = 
Preconditions.checkNotNull(keySelector1);
+   this.keySelector2 = 

[jira] [Commented] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449836#comment-16449836
 ] 

ASF GitHub Bot commented on FLINK-8480:
---

Github user florianschmidt1994 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5482#discussion_r183727245
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
 ---
@@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelector 
keySelector)  {
public  WithWindow 
window(WindowAssigner, W> assigner) {
return new WithWindow<>(input1, input2, 
keySelector1, keySelector2, keyType, assigner, null, null);
}
+
+   /**
+* Specifies the time boundaries over which the join 
operation works, so that
+* leftElement.timestamp + lowerBound <= 
rightElement.timestamp <= leftElement.timestamp + upperBound
+* By default both the lower and the upper bound are 
inclusive. This can be configured
+* with {@link 
TimeBounded#lowerBoundExclusive(boolean)} and
+* {@link TimeBounded#upperBoundExclusive(boolean)}
+*
+* @param lowerBound The lower bound. Needs to be 
smaller than or equal to the upperBound
+* @param upperBound The upper bound. Needs to be 
bigger than or equal to the lowerBound
+*/
+   public TimeBounded between(Time 
lowerBound, Time upperBound) {
+
+   TimeCharacteristic timeCharacteristic =
+   
input1.getExecutionEnvironment().getStreamTimeCharacteristic();
+
+   if (timeCharacteristic != 
TimeCharacteristic.EventTime) {
+   throw new 
RuntimeException("Time-bounded stream joins are only supported in event time");
+   }
+
+   checkNotNull(lowerBound, "A lower bound needs 
to be provided for a time-bounded join");
+   checkNotNull(upperBound, "An upper bound needs 
to be provided for a time-bounded join");
+   return new TimeBounded<>(
+   input1,
+   input2,
+   lowerBound.toMilliseconds(),
+   upperBound.toMilliseconds(),
+   true,
+   true,
+   keySelector1,
+   keySelector2
+   );
+   }
+   }
+   }
+
+   /**
+* Joined streams that have keys for both sides as well as the time 
boundaries over which
+* elements should be joined defined.
+*
+* @param  Input type of elements from the first stream
+* @param  Input type of elements from the second stream
+* @param  The type of the key
+*/
+   public static class TimeBounded {
+
+   private static final String TIMEBOUNDED_JOIN_FUNC_NAME = 
"TimeBoundedJoin";
+
+   private final DataStream left;
+   private final DataStream right;
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final KeySelector keySelector1;
+   private final KeySelector keySelector2;
+
+   private boolean lowerBoundInclusive;
+   private boolean upperBoundInclusive;
+
+   public TimeBounded(
+   DataStream left,
+   DataStream right,
+   long lowerBound,
+   long upperBound,
+   boolean lowerBoundInclusive,
+   boolean upperBoundInclusive,
+   KeySelector keySelector1,
+   KeySelector keySelector2) {
+
+   this.left = Preconditions.checkNotNull(left);
+   this.right = Preconditions.checkNotNull(right);
+
+   this.lowerBound = lowerBound;
+   this.upperBound = upperBound;
+
+   this.lowerBoundInclusive = lowerBoundInclusive;
+   this.upperBoundInclusive = upperBoundInclusive;
+
+   this.keySelector1 = 
Preconditions.checkNotNull(keySelector1);
+   this.keySelector2 = 

[jira] [Commented] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449763#comment-16449763
 ] 

ASF GitHub Bot commented on FLINK-8480:
---

Github user florianschmidt1994 commented on the issue:

https://github.com/apache/flink/pull/5482
  
Thanks for your comments @bowenli86 and sorry for the delay! I added some 
comments and some stuff I'll be addressing right away


> Implement Java API to expose join functionality of 
> TimeBoundedStreamJoinOperator
> 
>
> Key: FLINK-8480
> URL: https://issues.apache.org/jira/browse/FLINK-8480
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Florian Schmidt
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449760#comment-16449760
 ] 

ASF GitHub Bot commented on FLINK-8480:
---

Github user florianschmidt1994 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5482#discussion_r183712409
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * As soon as elements are joined they are passed to a user-defined 
{@link TimeBoundedJoinFunction},
+ * as a {@link Tuple2}, with f0 being the left element and f1 being the 
right element
+ *
+ * The basic idea of this implementation is as follows: Whenever we 
receive an element at
+ * {@link #processElement1(StreamRecord)} (a.k.a. the left side), we add 
it to the left buffer.
+ * We then check the right buffer to see whether there are any elements 
that can be joined. If
+ * there are, they are joined and passed to a user-defined {@link 
TimeBoundedJoinFunction}.
+ * The same happens the other way around when receiving an element on the 
right side.
+ *
+ * In some cases the watermark needs to be delayed. This for example 
can happen if
+ * if t2.ts ∈ [t1.ts + 1, t1.ts + 2] and elements from t1 arrive earlier 
than elements from t2 and
+ * therefore get added to the left buffer. When an element now arrives on 
the right side, the
+ * watermark might have already progressed. The right element now gets 
joined with an
+ * older element from the left side, where the timestamp of the left 
element is lower than the
+ * current watermark, which would make this element late. This can be 
avoided by holding back the
+ * watermarks.
+ *
+ * The left and right buffers are cleared from unused values 
periodically
+ * (triggered by watermarks) in 

[jira] [Commented] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449733#comment-16449733
 ] 

ASF GitHub Bot commented on FLINK-8480:
---

Github user florianschmidt1994 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5482#discussion_r183706512
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
--- End diff --

You mean the spelling? I can address this in the PR of the 
TimeBoundedStreamJoinOperator


> Implement Java API to expose join functionality of 
> TimeBoundedStreamJoinOperator
> 
>
> Key: FLINK-8480
> URL: https://issues.apache.org/jira/browse/FLINK-8480
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Florian Schmidt
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449726#comment-16449726
 ] 

ASF GitHub Bot commented on FLINK-8480:
---

Github user florianschmidt1994 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5482#discussion_r183705882
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
 ---
@@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelector 
keySelector)  {
public  WithWindow 
window(WindowAssigner, W> assigner) {
return new WithWindow<>(input1, input2, 
keySelector1, keySelector2, keyType, assigner, null, null);
}
+
+   /**
+* Specifies the time boundaries over which the join 
operation works, so that
+* leftElement.timestamp + lowerBound <= 
rightElement.timestamp <= leftElement.timestamp + upperBound
+* By default both the lower and the upper bound are 
inclusive. This can be configured
+* with {@link 
TimeBounded#lowerBoundExclusive(boolean)} and
+* {@link TimeBounded#upperBoundExclusive(boolean)}
+*
+* @param lowerBound The lower bound. Needs to be 
smaller than or equal to the upperBound
+* @param upperBound The upper bound. Needs to be 
bigger than or equal to the lowerBound
+*/
+   public TimeBounded between(Time 
lowerBound, Time upperBound) {
+
+   TimeCharacteristic timeCharacteristic =
+   
input1.getExecutionEnvironment().getStreamTimeCharacteristic();
+
+   if (timeCharacteristic != 
TimeCharacteristic.EventTime) {
+   throw new 
RuntimeException("Time-bounded stream joins are only supported in event time");
+   }
+
+   checkNotNull(lowerBound, "A lower bound needs 
to be provided for a time-bounded join");
+   checkNotNull(upperBound, "An upper bound needs 
to be provided for a time-bounded join");
+   return new TimeBounded<>(
+   input1,
+   input2,
+   lowerBound.toMilliseconds(),
+   upperBound.toMilliseconds(),
+   true,
+   true,
+   keySelector1,
+   keySelector2
+   );
+   }
+   }
+   }
+
+   /**
+* Joined streams that have keys for both sides as well as the time 
boundaries over which
+* elements should be joined defined.
+*
+* @param  Input type of elements from the first stream
+* @param  Input type of elements from the second stream
+* @param  The type of the key
+*/
+   public static class TimeBounded {
+
+   private static final String TIMEBOUNDED_JOIN_FUNC_NAME = 
"TimeBoundedJoin";
+
+   private final DataStream left;
+   private final DataStream right;
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final KeySelector keySelector1;
+   private final KeySelector keySelector2;
+
+   private boolean lowerBoundInclusive;
+   private boolean upperBoundInclusive;
+
+   public TimeBounded(
+   DataStream left,
+   DataStream right,
+   long lowerBound,
+   long upperBound,
+   boolean lowerBoundInclusive,
+   boolean upperBoundInclusive,
+   KeySelector keySelector1,
+   KeySelector keySelector2) {
+
+   this.left = Preconditions.checkNotNull(left);
+   this.right = Preconditions.checkNotNull(right);
+
+   this.lowerBound = lowerBound;
+   this.upperBound = upperBound;
+
+   this.lowerBoundInclusive = lowerBoundInclusive;
+   this.upperBoundInclusive = upperBoundInclusive;
+
+   this.keySelector1 = 
Preconditions.checkNotNull(keySelector1);
+   this.keySelector2 = 

[jira] [Commented] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449718#comment-16449718
 ] 

ASF GitHub Bot commented on FLINK-8480:
---

Github user florianschmidt1994 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5482#discussion_r183704705
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
 ---
@@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelector 
keySelector)  {
public  WithWindow 
window(WindowAssigner, W> assigner) {
return new WithWindow<>(input1, input2, 
keySelector1, keySelector2, keyType, assigner, null, null);
}
+
+   /**
+* Specifies the time boundaries over which the join 
operation works, so that
+* leftElement.timestamp + lowerBound <= 
rightElement.timestamp <= leftElement.timestamp + upperBound
+* By default both the lower and the upper bound are 
inclusive. This can be configured
+* with {@link 
TimeBounded#lowerBoundExclusive(boolean)} and
+* {@link TimeBounded#upperBoundExclusive(boolean)}
+*
+* @param lowerBound The lower bound. Needs to be 
smaller than or equal to the upperBound
+* @param upperBound The upper bound. Needs to be 
bigger than or equal to the lowerBound
+*/
+   public TimeBounded between(Time 
lowerBound, Time upperBound) {
+
+   TimeCharacteristic timeCharacteristic =
+   
input1.getExecutionEnvironment().getStreamTimeCharacteristic();
+
+   if (timeCharacteristic != 
TimeCharacteristic.EventTime) {
+   throw new 
RuntimeException("Time-bounded stream joins are only supported in event time");
+   }
+
+   checkNotNull(lowerBound, "A lower bound needs 
to be provided for a time-bounded join");
+   checkNotNull(upperBound, "An upper bound needs 
to be provided for a time-bounded join");
+   return new TimeBounded<>(
+   input1,
+   input2,
+   lowerBound.toMilliseconds(),
+   upperBound.toMilliseconds(),
+   true,
+   true,
+   keySelector1,
+   keySelector2
+   );
+   }
+   }
+   }
+
+   /**
+* Joined streams that have keys for both sides as well as the time 
boundaries over which
+* elements should be joined defined.
+*
+* @param  Input type of elements from the first stream
+* @param  Input type of elements from the second stream
+* @param  The type of the key
+*/
+   public static class TimeBounded {
+
+   private static final String TIMEBOUNDED_JOIN_FUNC_NAME = 
"TimeBoundedJoin";
--- End diff --

Looks like there is only one other case in the datastream package where we 
have a `static final ...` as a constant, and it's inlined there as well. I'd 
propose to keep it here for the time and rethink a config class if we see more 
cases coming up


> Implement Java API to expose join functionality of 
> TimeBoundedStreamJoinOperator
> 
>
> Key: FLINK-8480
> URL: https://issues.apache.org/jira/browse/FLINK-8480
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Florian Schmidt
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449713#comment-16449713
 ] 

ASF GitHub Bot commented on FLINK-8480:
---

Github user florianschmidt1994 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5482#discussion_r183703916
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
 ---
@@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelector 
keySelector)  {
public  WithWindow 
window(WindowAssigner, W> assigner) {
return new WithWindow<>(input1, input2, 
keySelector1, keySelector2, keyType, assigner, null, null);
}
+
+   /**
+* Specifies the time boundaries over which the join 
operation works, so that
+* leftElement.timestamp + lowerBound <= 
rightElement.timestamp <= leftElement.timestamp + upperBound
+* By default both the lower and the upper bound are 
inclusive. This can be configured
+* with {@link 
TimeBounded#lowerBoundExclusive(boolean)} and
+* {@link TimeBounded#upperBoundExclusive(boolean)}
+*
+* @param lowerBound The lower bound. Needs to be 
smaller than or equal to the upperBound
+* @param upperBound The upper bound. Needs to be 
bigger than or equal to the lowerBound
+*/
+   public TimeBounded between(Time 
lowerBound, Time upperBound) {
+
+   TimeCharacteristic timeCharacteristic =
+   
input1.getExecutionEnvironment().getStreamTimeCharacteristic();
+
+   if (timeCharacteristic != 
TimeCharacteristic.EventTime) {
+   throw new 
RuntimeException("Time-bounded stream joins are only supported in event time");
--- End diff --

I'm not sure about the `IllegalStateException`. The description of it says 
>Signals that a method has been invoked at an illegal or inappropriate 
time. In other words, the Java environment or Java application is not in an 
appropriate state for the requested operation.

But I'm not opposed to the idea of creating a custom exception for all 
cases where an operation is not supported with the current time characteristic. 
I'll see if I can think of something 



> Implement Java API to expose join functionality of 
> TimeBoundedStreamJoinOperator
> 
>
> Key: FLINK-8480
> URL: https://issues.apache.org/jira/browse/FLINK-8480
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Florian Schmidt
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386643#comment-16386643
 ] 

ASF GitHub Bot commented on FLINK-8480:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5482#discussion_r172303424
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
 ---
@@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelector 
keySelector)  {
public  WithWindow 
window(WindowAssigner, W> assigner) {
return new WithWindow<>(input1, input2, 
keySelector1, keySelector2, keyType, assigner, null, null);
}
+
+   /**
+* Specifies the time boundaries over which the join 
operation works, so that
+* leftElement.timestamp + lowerBound <= 
rightElement.timestamp <= leftElement.timestamp + upperBound
+* By default both the lower and the upper bound are 
inclusive. This can be configured
+* with {@link 
TimeBounded#lowerBoundExclusive(boolean)} and
+* {@link TimeBounded#upperBoundExclusive(boolean)}
+*
+* @param lowerBound The lower bound. Needs to be 
smaller than or equal to the upperBound
+* @param upperBound The upper bound. Needs to be 
bigger than or equal to the lowerBound
+*/
+   public TimeBounded between(Time 
lowerBound, Time upperBound) {
+
+   TimeCharacteristic timeCharacteristic =
+   
input1.getExecutionEnvironment().getStreamTimeCharacteristic();
+
+   if (timeCharacteristic != 
TimeCharacteristic.EventTime) {
+   throw new 
RuntimeException("Time-bounded stream joins are only supported in event time");
+   }
+
+   checkNotNull(lowerBound, "A lower bound needs 
to be provided for a time-bounded join");
+   checkNotNull(upperBound, "An upper bound needs 
to be provided for a time-bounded join");
+   return new TimeBounded<>(
+   input1,
+   input2,
+   lowerBound.toMilliseconds(),
+   upperBound.toMilliseconds(),
+   true,
+   true,
+   keySelector1,
+   keySelector2
+   );
+   }
+   }
+   }
+
+   /**
+* Joined streams that have keys for both sides as well as the time 
boundaries over which
+* elements should be joined defined.
+*
+* @param  Input type of elements from the first stream
+* @param  Input type of elements from the second stream
+* @param  The type of the key
+*/
+   public static class TimeBounded {
+
+   private static final String TIMEBOUNDED_JOIN_FUNC_NAME = 
"TimeBoundedJoin";
+
+   private final DataStream left;
+   private final DataStream right;
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final KeySelector keySelector1;
+   private final KeySelector keySelector2;
+
+   private boolean lowerBoundInclusive;
+   private boolean upperBoundInclusive;
+
+   public TimeBounded(
+   DataStream left,
+   DataStream right,
+   long lowerBound,
+   long upperBound,
+   boolean lowerBoundInclusive,
+   boolean upperBoundInclusive,
+   KeySelector keySelector1,
+   KeySelector keySelector2) {
+
+   this.left = Preconditions.checkNotNull(left);
+   this.right = Preconditions.checkNotNull(right);
+
+   this.lowerBound = lowerBound;
+   this.upperBound = upperBound;
+
+   this.lowerBoundInclusive = lowerBoundInclusive;
+   this.upperBoundInclusive = upperBoundInclusive;
+
+   this.keySelector1 = 
Preconditions.checkNotNull(keySelector1);
+   this.keySelector2 = 

[jira] [Commented] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386642#comment-16386642
 ] 

ASF GitHub Bot commented on FLINK-8480:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5482#discussion_r172306197
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * As soon as elements are joined they are passed to a user-defined 
{@link TimeBoundedJoinFunction},
+ * as a {@link Tuple2}, with f0 being the left element and f1 being the 
right element
+ *
+ * The basic idea of this implementation is as follows: Whenever we 
receive an element at
+ * {@link #processElement1(StreamRecord)} (a.k.a. the left side), we add 
it to the left buffer.
+ * We then check the right buffer to see whether there are any elements 
that can be joined. If
+ * there are, they are joined and passed to a user-defined {@link 
TimeBoundedJoinFunction}.
+ * The same happens the other way around when receiving an element on the 
right side.
+ *
+ * In some cases the watermark needs to be delayed. This for example 
can happen if
+ * if t2.ts ∈ [t1.ts + 1, t1.ts + 2] and elements from t1 arrive earlier 
than elements from t2 and
+ * therefore get added to the left buffer. When an element now arrives on 
the right side, the
+ * watermark might have already progressed. The right element now gets 
joined with an
+ * older element from the left side, where the timestamp of the left 
element is lower than the
+ * current watermark, which would make this element late. This can be 
avoided by holding back the
+ * watermarks.
+ *
+ * The left and right buffers are cleared from unused values 
periodically
+ * (triggered by watermarks) in order not 

[jira] [Commented] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386641#comment-16386641
 ] 

ASF GitHub Bot commented on FLINK-8480:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5482#discussion_r172302583
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
 ---
@@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelector 
keySelector)  {
public  WithWindow 
window(WindowAssigner, W> assigner) {
return new WithWindow<>(input1, input2, 
keySelector1, keySelector2, keyType, assigner, null, null);
}
+
+   /**
+* Specifies the time boundaries over which the join 
operation works, so that
+* leftElement.timestamp + lowerBound <= 
rightElement.timestamp <= leftElement.timestamp + upperBound
+* By default both the lower and the upper bound are 
inclusive. This can be configured
+* with {@link 
TimeBounded#lowerBoundExclusive(boolean)} and
+* {@link TimeBounded#upperBoundExclusive(boolean)}
+*
+* @param lowerBound The lower bound. Needs to be 
smaller than or equal to the upperBound
+* @param upperBound The upper bound. Needs to be 
bigger than or equal to the lowerBound
+*/
+   public TimeBounded between(Time 
lowerBound, Time upperBound) {
+
+   TimeCharacteristic timeCharacteristic =
+   
input1.getExecutionEnvironment().getStreamTimeCharacteristic();
+
+   if (timeCharacteristic != 
TimeCharacteristic.EventTime) {
+   throw new 
RuntimeException("Time-bounded stream joins are only supported in event time");
+   }
+
+   checkNotNull(lowerBound, "A lower bound needs 
to be provided for a time-bounded join");
+   checkNotNull(upperBound, "An upper bound needs 
to be provided for a time-bounded join");
+   return new TimeBounded<>(
+   input1,
+   input2,
+   lowerBound.toMilliseconds(),
+   upperBound.toMilliseconds(),
+   true,
+   true,
+   keySelector1,
+   keySelector2
+   );
+   }
+   }
+   }
+
+   /**
+* Joined streams that have keys for both sides as well as the time 
boundaries over which
+* elements should be joined defined.
+*
+* @param  Input type of elements from the first stream
+* @param  Input type of elements from the second stream
+* @param  The type of the key
+*/
+   public static class TimeBounded {
+
+   private static final String TIMEBOUNDED_JOIN_FUNC_NAME = 
"TimeBoundedJoin";
--- End diff --

hmm... this might be not very relevant, but I'd prefer a single config 
class that holds all function's names, rather than having them scattered all 
over the code base. 


> Implement Java API to expose join functionality of 
> TimeBoundedStreamJoinOperator
> 
>
> Key: FLINK-8480
> URL: https://issues.apache.org/jira/browse/FLINK-8480
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Florian Schmidt
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386640#comment-16386640
 ] 

ASF GitHub Bot commented on FLINK-8480:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5482#discussion_r172303671
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
--- End diff --

bound**s**


> Implement Java API to expose join functionality of 
> TimeBoundedStreamJoinOperator
> 
>
> Key: FLINK-8480
> URL: https://issues.apache.org/jira/browse/FLINK-8480
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Florian Schmidt
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386639#comment-16386639
 ] 

ASF GitHub Bot commented on FLINK-8480:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5482#discussion_r172302147
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
 ---
@@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelector 
keySelector)  {
public  WithWindow 
window(WindowAssigner, W> assigner) {
return new WithWindow<>(input1, input2, 
keySelector1, keySelector2, keyType, assigner, null, null);
}
+
+   /**
+* Specifies the time boundaries over which the join 
operation works, so that
+* leftElement.timestamp + lowerBound <= 
rightElement.timestamp <= leftElement.timestamp + upperBound
+* By default both the lower and the upper bound are 
inclusive. This can be configured
+* with {@link 
TimeBounded#lowerBoundExclusive(boolean)} and
+* {@link TimeBounded#upperBoundExclusive(boolean)}
+*
+* @param lowerBound The lower bound. Needs to be 
smaller than or equal to the upperBound
+* @param upperBound The upper bound. Needs to be 
bigger than or equal to the lowerBound
+*/
+   public TimeBounded between(Time 
lowerBound, Time upperBound) {
+
+   TimeCharacteristic timeCharacteristic =
+   
input1.getExecutionEnvironment().getStreamTimeCharacteristic();
+
+   if (timeCharacteristic != 
TimeCharacteristic.EventTime) {
+   throw new 
RuntimeException("Time-bounded stream joins are only supported in event time");
--- End diff --

should use `IllegalStateException`. or even better, shall we create a Flink 
specific exception?


> Implement Java API to expose join functionality of 
> TimeBoundedStreamJoinOperator
> 
>
> Key: FLINK-8480
> URL: https://issues.apache.org/jira/browse/FLINK-8480
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Florian Schmidt
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)