[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16533686#comment-16533686
 ] 

ASF GitHub Bot commented on FLINK-8479:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5342


> Implement time-bounded inner join of streams as a TwoInputStreamOperator
> 
>
> Key: FLINK-8479
> URL: https://issues.apache.org/jira/browse/FLINK-8479
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Florian Schmidt
>Assignee: Florian Schmidt
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16533681#comment-16533681
 ] 

ASF GitHub Bot commented on FLINK-8479:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5342
  
Thanks for the work @florianschmidt1994 ! Merging this.


> Implement time-bounded inner join of streams as a TwoInputStreamOperator
> 
>
> Key: FLINK-8479
> URL: https://issues.apache.org/jira/browse/FLINK-8479
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Florian Schmidt
>Assignee: Florian Schmidt
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511091#comment-16511091
 ] 

ASF GitHub Bot commented on FLINK-8479:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5342
  
Thanks @florianschmidt1994 . I will, but may be not today.


> Implement time-bounded inner join of streams as a TwoInputStreamOperator
> 
>
> Key: FLINK-8479
> URL: https://issues.apache.org/jira/browse/FLINK-8479
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Florian Schmidt
>Assignee: Florian Schmidt
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511086#comment-16511086
 ] 

ASF GitHub Bot commented on FLINK-8479:
---

Github user florianschmidt1994 commented on the issue:

https://github.com/apache/flink/pull/5342
  
@kl0u I made some changes on how I handle events on either side of the 
stream. By introducing some generic methods we can now reuse large parts of the 
code for either input stream and remove a lot of code duplication. Could you 
have another look at this?


> Implement time-bounded inner join of streams as a TwoInputStreamOperator
> 
>
> Key: FLINK-8479
> URL: https://issues.apache.org/jira/browse/FLINK-8479
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Florian Schmidt
>Assignee: Florian Schmidt
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator

2018-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391037#comment-16391037
 ] 

ASF GitHub Bot commented on FLINK-8479:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5342
  
Changes look good to me! I will let it run on Travis and then merge.


> Implement time-bounded inner join of streams as a TwoInputStreamOperator
> 
>
> Key: FLINK-8479
> URL: https://issues.apache.org/jira/browse/FLINK-8479
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Florian Schmidt
>Assignee: Florian Schmidt
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363732#comment-16363732
 ] 

ASF GitHub Bot commented on FLINK-8479:
---

GitHub user florianschmidt1994 opened a pull request:

https://github.com/apache/flink/pull/5482

[Flink-8480][DataStream] Add Java API for timebounded stream join

## What is the purpose of the change

* Add a JavaAPI to the DataStream API to join two streams based on 
user-defined time boundaries
* Design doc can be found here 
https://docs.google.com/document/d/16GMH5VM6JJiWj_N0W8y3PtQ1aoJFxsKoOTSYOfqlsRE/edit#heading=h.bgl260hr56g6

## Brief change log
* Add option`.between(Time, Time)` to streams that are already joined and 
have their key selectors `where` and `equalTo` defined
* Add new inner class `TimeBounded` to `JoinedStreams`, which exposes 
`process(TimeBoundedJoinFunction)` as well as optional 
`upperBoundExclusive(boolean)` and `lowerBoundExclusive(boolean)` to the user
* Add new integration test `TimeboundedJoinITCase`
* **Depends on [FLINK-8479] to be merged**

Full example usage:

```java
streamOne
.join(streamTwo)
.where(new MyKeySelector())
.equalTo(new MyKeySelector())
.between(Time.milliseconds(-1), Time.milliseconds(1))
.process(new UdfTimeBoundedJoinFunction())
.addSink(new ResultSink());
```

## Verifying this change
This change added tests and can be verified as follows: 
- Added integration tests in `TimeboundedJoinITCase` that validate 
parameter translation and execution

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): yes
  - Anything that affects deployment or recovery: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? JavaDocs


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/florianschmidt1994/flink 
flink-8480-timebounded-join-java-api

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5482.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5482


commit 34451540116d8bdd284fd01016a4cc74d8564d37
Author: Florian Schmidt 
Date:   2018-01-18T14:47:14Z

[FLINK-8479] Implement TimeBoundedStreamJoinOperator

This operator is the basis for performing an inner join on two
streams using a time criteria defined as a lower and upper bound

commit fe65b1ead0511b0df5d640c728f5ce9e273d7ed5
Author: Florian Schmidt 
Date:   2018-02-13T14:48:40Z

[FLINK-8480][DataStream] Add java api for timebounded stream joins

This commit adds a java implementation for timebounded stream joins.
The usage looks roughly like the following:

```java
streamOne
.join(streamTwo)
.where(new Tuple2KeyExtractor())
.equalTo(new Tuple2KeyExtractor())
.between(Time.milliseconds(0), Time.milliseconds(1))
.process(new CombineToStringJoinFunction())
.addSink(new ResultSink());
```

This change adds the functionality in JoinedStreams.java and adds
integration tests in TimeboundedJoinITCase.java




> Implement time-bounded inner join of streams as a TwoInputStreamOperator
> 
>
> Key: FLINK-8479
> URL: https://issues.apache.org/jira/browse/FLINK-8479
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Florian Schmidt
>Assignee: Florian Schmidt
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345220#comment-16345220
 ] 

ASF GitHub Bot commented on FLINK-8479:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5342#discussion_r164776439
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+
+// TODO: Make bucket granularity adaptable
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * As soon as elements are joined they are passed to a user-defined 
{@link JoinedProcessFunction},
+ * as a {@link Tuple2}, with f0 being the left element and f1 being the 
right element
+ *
+ * @param  The type of the elements in the left stream
+ * @param  The type of the elements in the right stream
+ * @param  The output type created by the user-defined function
+ */
+public class TimeBoundedStreamJoinOperator
+   extends AbstractUdfStreamOperator>
+   implements TwoInputStreamOperator {
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final long inverseLowerBound;
+   private final long inverseUpperBound;
+
+   private final boolean lowerBoundInclusive;
+   private final boolean upperBoundInclusive;
+
+   private final long bucketGranularity = 1;
+
+   private static final String LEFT_BUFFER = "LEFT_BUFFER";
+   private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
+   private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT";
+   private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT";
+
+   private transient ValueState lastCleanupRightBuffer;
+   private transient ValueState lastCleanupLeftBuffer;
+
+   private transient MapState>> 
leftBuffer;
+   private transient MapState>> 
rightBuffer;
+
+   private final TypeSerializer leftTypeSerializer;
+   private final TypeSerializer rightTypeSerializer;
+
+   private transient TimestampedCollector collector;
+
+   private ContextImpl context;
+
+   

[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345206#comment-16345206
 ] 

ASF GitHub Bot commented on FLINK-8479:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5342#discussion_r164773927
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+
+// TODO: Make bucket granularity adaptable
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * As soon as elements are joined they are passed to a user-defined 
{@link JoinedProcessFunction},
+ * as a {@link Tuple2}, with f0 being the left element and f1 being the 
right element
+ *
+ * @param  The type of the elements in the left stream
+ * @param  The type of the elements in the right stream
+ * @param  The output type created by the user-defined function
+ */
+public class TimeBoundedStreamJoinOperator
+   extends AbstractUdfStreamOperator>
+   implements TwoInputStreamOperator {
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final long inverseLowerBound;
+   private final long inverseUpperBound;
+
+   private final boolean lowerBoundInclusive;
+   private final boolean upperBoundInclusive;
+
+   private final long bucketGranularity = 1;
+
+   private static final String LEFT_BUFFER = "LEFT_BUFFER";
+   private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
+   private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT";
+   private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT";
+
+   private transient ValueState lastCleanupRightBuffer;
+   private transient ValueState lastCleanupLeftBuffer;
+
+   private transient MapState>> 
leftBuffer;
+   private transient MapState>> 
rightBuffer;
+
+   private final TypeSerializer leftTypeSerializer;
+   private final TypeSerializer rightTypeSerializer;
+
+   private transient TimestampedCollector collector;
+
+   private ContextImpl context;
+
+ 

[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345166#comment-16345166
 ] 

ASF GitHub Bot commented on FLINK-8479:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5342#discussion_r164766198
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+
+// TODO: Make bucket granularity adaptable
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * As soon as elements are joined they are passed to a user-defined 
{@link JoinedProcessFunction},
+ * as a {@link Tuple2}, with f0 being the left element and f1 being the 
right element
+ *
+ * @param  The type of the elements in the left stream
+ * @param  The type of the elements in the right stream
+ * @param  The output type created by the user-defined function
+ */
+public class TimeBoundedStreamJoinOperator
+   extends AbstractUdfStreamOperator>
+   implements TwoInputStreamOperator {
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final long inverseLowerBound;
+   private final long inverseUpperBound;
+
+   private final boolean lowerBoundInclusive;
+   private final boolean upperBoundInclusive;
+
+   private final long bucketGranularity = 1;
+
+   private static final String LEFT_BUFFER = "LEFT_BUFFER";
+   private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
+   private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT";
+   private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT";
+
+   private transient ValueState lastCleanupRightBuffer;
+   private transient ValueState lastCleanupLeftBuffer;
+
+   private transient MapState>> 
leftBuffer;
+   private transient MapState>> 
rightBuffer;
+
+   private final TypeSerializer leftTypeSerializer;
+   private final TypeSerializer rightTypeSerializer;
+
+   private transient TimestampedCollector collector;
+
+   private ContextImpl context;
+
+   

[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345140#comment-16345140
 ] 

ASF GitHub Bot commented on FLINK-8479:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5342#discussion_r164763618
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+
+// TODO: Make bucket granularity adaptable
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * As soon as elements are joined they are passed to a user-defined 
{@link JoinedProcessFunction},
+ * as a {@link Tuple2}, with f0 being the left element and f1 being the 
right element
+ *
+ * @param  The type of the elements in the left stream
+ * @param  The type of the elements in the right stream
+ * @param  The output type created by the user-defined function
+ */
+public class TimeBoundedStreamJoinOperator
+   extends AbstractUdfStreamOperator>
+   implements TwoInputStreamOperator {
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final long inverseLowerBound;
+   private final long inverseUpperBound;
+
+   private final boolean lowerBoundInclusive;
+   private final boolean upperBoundInclusive;
+
+   private final long bucketGranularity = 1;
+
+   private static final String LEFT_BUFFER = "LEFT_BUFFER";
+   private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
+   private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT";
+   private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT";
+
+   private transient ValueState lastCleanupRightBuffer;
+   private transient ValueState lastCleanupLeftBuffer;
+
+   private transient MapState>> 
leftBuffer;
+   private transient MapState>> 
rightBuffer;
+
+   private final TypeSerializer leftTypeSerializer;
+   private final TypeSerializer rightTypeSerializer;
+
+   private transient TimestampedCollector collector;
+
+   private ContextImpl context;
+
+ 

[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345056#comment-16345056
 ] 

ASF GitHub Bot commented on FLINK-8479:
---

Github user florianschmidt1994 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5342#discussion_r164747531
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+
+// TODO: Make bucket granularity adaptable
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * As soon as elements are joined they are passed to a user-defined 
{@link JoinedProcessFunction},
+ * as a {@link Tuple2}, with f0 being the left element and f1 being the 
right element
+ *
+ * @param  The type of the elements in the left stream
+ * @param  The type of the elements in the right stream
+ * @param  The output type created by the user-defined function
+ */
+public class TimeBoundedStreamJoinOperator
+   extends AbstractUdfStreamOperator>
+   implements TwoInputStreamOperator {
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final long inverseLowerBound;
+   private final long inverseUpperBound;
+
+   private final boolean lowerBoundInclusive;
+   private final boolean upperBoundInclusive;
+
+   private final long bucketGranularity = 1;
+
+   private static final String LEFT_BUFFER = "LEFT_BUFFER";
+   private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
+   private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT";
+   private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT";
+
+   private transient ValueState lastCleanupRightBuffer;
+   private transient ValueState lastCleanupLeftBuffer;
+
+   private transient MapState>> 
leftBuffer;
+   private transient MapState>> 
rightBuffer;
+
+   private final TypeSerializer leftTypeSerializer;
+   private final TypeSerializer rightTypeSerializer;
+
+   private transient TimestampedCollector collector;
+
+   private ContextImpl context;
 

[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344882#comment-16344882
 ] 

ASF GitHub Bot commented on FLINK-8479:
---

Github user florianschmidt1994 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5342#discussion_r164708594
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+
+// TODO: Make bucket granularity adaptable
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * As soon as elements are joined they are passed to a user-defined 
{@link JoinedProcessFunction},
+ * as a {@link Tuple2}, with f0 being the left element and f1 being the 
right element
+ *
+ * @param  The type of the elements in the left stream
+ * @param  The type of the elements in the right stream
+ * @param  The output type created by the user-defined function
+ */
+public class TimeBoundedStreamJoinOperator
+   extends AbstractUdfStreamOperator>
+   implements TwoInputStreamOperator {
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final long inverseLowerBound;
+   private final long inverseUpperBound;
+
+   private final boolean lowerBoundInclusive;
+   private final boolean upperBoundInclusive;
+
+   private final long bucketGranularity = 1;
+
+   private static final String LEFT_BUFFER = "LEFT_BUFFER";
+   private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
+   private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT";
+   private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT";
+
+   private transient ValueState lastCleanupRightBuffer;
+   private transient ValueState lastCleanupLeftBuffer;
+
+   private transient MapState>> 
leftBuffer;
+   private transient MapState>> 
rightBuffer;
+
+   private final TypeSerializer leftTypeSerializer;
+   private final TypeSerializer rightTypeSerializer;
+
+   private transient TimestampedCollector collector;
+
+   private ContextImpl context;
 

[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344862#comment-16344862
 ] 

ASF GitHub Bot commented on FLINK-8479:
---

Github user florianschmidt1994 commented on the issue:

https://github.com/apache/flink/pull/5342
  
@hequn8128 Thank you for the review. Regarding your concern about delaying 
the watermark I added some sketches and description about my thought process to 
the design document.


> Implement time-bounded inner join of streams as a TwoInputStreamOperator
> 
>
> Key: FLINK-8479
> URL: https://issues.apache.org/jira/browse/FLINK-8479
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Florian Schmidt
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator

2018-01-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344501#comment-16344501
 ] 

ASF GitHub Bot commented on FLINK-8479:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5342#discussion_r164639453
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+
+// TODO: Make bucket granularity adaptable
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * As soon as elements are joined they are passed to a user-defined 
{@link JoinedProcessFunction},
+ * as a {@link Tuple2}, with f0 being the left element and f1 being the 
right element
+ *
+ * @param  The type of the elements in the left stream
+ * @param  The type of the elements in the right stream
+ * @param  The output type created by the user-defined function
+ */
+public class TimeBoundedStreamJoinOperator
+   extends AbstractUdfStreamOperator>
+   implements TwoInputStreamOperator {
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final long inverseLowerBound;
+   private final long inverseUpperBound;
+
+   private final boolean lowerBoundInclusive;
+   private final boolean upperBoundInclusive;
+
+   private final long bucketGranularity = 1;
+
+   private static final String LEFT_BUFFER = "LEFT_BUFFER";
+   private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
+   private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT";
+   private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT";
+
+   private transient ValueState lastCleanupRightBuffer;
+   private transient ValueState lastCleanupLeftBuffer;
+
+   private transient MapState>> 
leftBuffer;
+   private transient MapState>> 
rightBuffer;
+
+   private final TypeSerializer leftTypeSerializer;
+   private final TypeSerializer rightTypeSerializer;
+
+   private transient TimestampedCollector collector;
+
+   private ContextImpl context;
+

[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator

2018-01-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344500#comment-16344500
 ] 

ASF GitHub Bot commented on FLINK-8479:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5342#discussion_r164639439
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+
+// TODO: Make bucket granularity adaptable
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * As soon as elements are joined they are passed to a user-defined 
{@link JoinedProcessFunction},
+ * as a {@link Tuple2}, with f0 being the left element and f1 being the 
right element
+ *
+ * @param  The type of the elements in the left stream
+ * @param  The type of the elements in the right stream
+ * @param  The output type created by the user-defined function
+ */
+public class TimeBoundedStreamJoinOperator
+   extends AbstractUdfStreamOperator>
+   implements TwoInputStreamOperator {
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final long inverseLowerBound;
+   private final long inverseUpperBound;
+
+   private final boolean lowerBoundInclusive;
+   private final boolean upperBoundInclusive;
+
+   private final long bucketGranularity = 1;
+
+   private static final String LEFT_BUFFER = "LEFT_BUFFER";
+   private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
+   private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT";
+   private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT";
+
+   private transient ValueState lastCleanupRightBuffer;
+   private transient ValueState lastCleanupLeftBuffer;
+
+   private transient MapState>> 
leftBuffer;
+   private transient MapState>> 
rightBuffer;
+
+   private final TypeSerializer leftTypeSerializer;
+   private final TypeSerializer rightTypeSerializer;
+
+   private transient TimestampedCollector collector;
+
+   private ContextImpl context;
+

[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator

2018-01-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1636#comment-1636
 ] 

ASF GitHub Bot commented on FLINK-8479:
---

Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5342#discussion_r164631861
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+
+// TODO: Make bucket granularity adaptable
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * As soon as elements are joined they are passed to a user-defined 
{@link JoinedProcessFunction},
+ * as a {@link Tuple2}, with f0 being the left element and f1 being the 
right element
+ *
+ * @param  The type of the elements in the left stream
+ * @param  The type of the elements in the right stream
+ * @param  The output type created by the user-defined function
+ */
+public class TimeBoundedStreamJoinOperator
+   extends AbstractUdfStreamOperator>
+   implements TwoInputStreamOperator {
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final long inverseLowerBound;
+   private final long inverseUpperBound;
+
+   private final boolean lowerBoundInclusive;
+   private final boolean upperBoundInclusive;
+
+   private final long bucketGranularity = 1;
+
+   private static final String LEFT_BUFFER = "LEFT_BUFFER";
+   private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
+   private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT";
+   private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT";
+
+   private transient ValueState lastCleanupRightBuffer;
+   private transient ValueState lastCleanupLeftBuffer;
+
+   private transient MapState>> 
leftBuffer;
+   private transient MapState>> 
rightBuffer;
+
+   private final TypeSerializer leftTypeSerializer;
+   private final TypeSerializer rightTypeSerializer;
+
+   private transient TimestampedCollector collector;
+
+   private ContextImpl context;
+

[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator

2018-01-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1638#comment-1638
 ] 

ASF GitHub Bot commented on FLINK-8479:
---

Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5342#discussion_r164631884
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+
+// TODO: Make bucket granularity adaptable
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * As soon as elements are joined they are passed to a user-defined 
{@link JoinedProcessFunction},
+ * as a {@link Tuple2}, with f0 being the left element and f1 being the 
right element
+ *
+ * @param  The type of the elements in the left stream
+ * @param  The type of the elements in the right stream
+ * @param  The output type created by the user-defined function
+ */
+public class TimeBoundedStreamJoinOperator
+   extends AbstractUdfStreamOperator>
+   implements TwoInputStreamOperator {
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final long inverseLowerBound;
+   private final long inverseUpperBound;
+
+   private final boolean lowerBoundInclusive;
+   private final boolean upperBoundInclusive;
+
+   private final long bucketGranularity = 1;
+
+   private static final String LEFT_BUFFER = "LEFT_BUFFER";
+   private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
+   private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT";
+   private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT";
+
+   private transient ValueState lastCleanupRightBuffer;
+   private transient ValueState lastCleanupLeftBuffer;
+
+   private transient MapState>> 
leftBuffer;
+   private transient MapState>> 
rightBuffer;
+
+   private final TypeSerializer leftTypeSerializer;
+   private final TypeSerializer rightTypeSerializer;
+
+   private transient TimestampedCollector collector;
+
+   private ContextImpl context;
+

[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator

2018-01-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1637#comment-1637
 ] 

ASF GitHub Bot commented on FLINK-8479:
---

Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5342#discussion_r164631864
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+
+// TODO: Make bucket granularity adaptable
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * As soon as elements are joined they are passed to a user-defined 
{@link JoinedProcessFunction},
+ * as a {@link Tuple2}, with f0 being the left element and f1 being the 
right element
+ *
+ * @param  The type of the elements in the left stream
+ * @param  The type of the elements in the right stream
+ * @param  The output type created by the user-defined function
+ */
+public class TimeBoundedStreamJoinOperator
+   extends AbstractUdfStreamOperator>
+   implements TwoInputStreamOperator {
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final long inverseLowerBound;
+   private final long inverseUpperBound;
+
+   private final boolean lowerBoundInclusive;
+   private final boolean upperBoundInclusive;
+
+   private final long bucketGranularity = 1;
+
+   private static final String LEFT_BUFFER = "LEFT_BUFFER";
+   private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
+   private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT";
+   private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT";
+
+   private transient ValueState lastCleanupRightBuffer;
+   private transient ValueState lastCleanupLeftBuffer;
+
+   private transient MapState>> 
leftBuffer;
+   private transient MapState>> 
rightBuffer;
+
+   private final TypeSerializer leftTypeSerializer;
+   private final TypeSerializer rightTypeSerializer;
+
+   private transient TimestampedCollector collector;
+
+   private ContextImpl context;
+

[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator

2018-01-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16343044#comment-16343044
 ] 

ASF GitHub Bot commented on FLINK-8479:
---

Github user florianschmidt1994 commented on the issue:

https://github.com/apache/flink/pull/5342
  
@bowenli86 the document should now be be public for everyone to comment on. 
Yes, it caches data on either side, and for each incoming element it looks 
up eligible records from the other side, and joins and emits those if they 
fulfil the user criteria. Entries get removed from the cache whenever they are 
too old to be joined, which is determined by a combination of the current 
watermark and the time boundary defined by the user.


> Implement time-bounded inner join of streams as a TwoInputStreamOperator
> 
>
> Key: FLINK-8479
> URL: https://issues.apache.org/jira/browse/FLINK-8479
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Florian Schmidt
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16341638#comment-16341638
 ] 

ASF GitHub Bot commented on FLINK-8479:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5342
  
Very interesting! two things:
1. can you make the google doc publicly viewable? I cannot access it right 
now
2. how does it handle event time window joins of two streams, where data in 
one stream always quite late than the other? For example, we are joining stream 
A and B on a 10 min event-time tumbling window from 12:00 -12:10, 12:10 - 
12:20 data in stream B always arrive 30 mins later than the data in stream 
A. How does the operators handle that? Does it cache A's data until B's data 
arrives, do the join, and remove A's data from cache?   (I haven't read the 
code in detail, just try to get a general idea of the overall design)


> Implement time-bounded inner join of streams as a TwoInputStreamOperator
> 
>
> Key: FLINK-8479
> URL: https://issues.apache.org/jira/browse/FLINK-8479
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Florian Schmidt
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337354#comment-16337354
 ] 

ASF GitHub Bot commented on FLINK-8479:
---

Github user florianschmidt1994 commented on the issue:

https://github.com/apache/flink/pull/5342
  
Yeah seems like I made a typo there. It's actually 
https://issues.apache.org/jira/browse/FLINK-8479, I just fixed the title. 


> Implement time-bounded inner join of streams as a TwoInputStreamOperator
> 
>
> Key: FLINK-8479
> URL: https://issues.apache.org/jira/browse/FLINK-8479
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Florian Schmidt
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)