[GitHub] flink pull request #5342: [FLINK-8479] Timebounded stream join

2018-07-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5342


---


[GitHub] flink pull request #5342: [FLINK-8479] Timebounded stream join

2018-01-30 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5342#discussion_r164776439
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+
+// TODO: Make bucket granularity adaptable
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * As soon as elements are joined they are passed to a user-defined 
{@link JoinedProcessFunction},
+ * as a {@link Tuple2}, with f0 being the left element and f1 being the 
right element
+ *
+ * @param  The type of the elements in the left stream
+ * @param  The type of the elements in the right stream
+ * @param  The output type created by the user-defined function
+ */
+public class TimeBoundedStreamJoinOperator
+   extends AbstractUdfStreamOperator>
+   implements TwoInputStreamOperator {
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final long inverseLowerBound;
+   private final long inverseUpperBound;
+
+   private final boolean lowerBoundInclusive;
+   private final boolean upperBoundInclusive;
+
+   private final long bucketGranularity = 1;
+
+   private static final String LEFT_BUFFER = "LEFT_BUFFER";
+   private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
+   private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT";
+   private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT";
+
+   private transient ValueState lastCleanupRightBuffer;
+   private transient ValueState lastCleanupLeftBuffer;
+
+   private transient MapState>> 
leftBuffer;
+   private transient MapState>> 
rightBuffer;
+
+   private final TypeSerializer leftTypeSerializer;
+   private final TypeSerializer rightTypeSerializer;
+
+   private transient TimestampedCollector collector;
+
+   private ContextImpl context;
+
+   /**
+* Creates a new TimeBoundedStreamJoinOperator.
+*
+* @param lowerBound  The lower bound for evaluating if 
elements should be joined
+* @param upperBound  The upper bound for evaluating if 
elements should be joined
+* @param lowerBoundInclusive Whether or not to include elements where 
the timestamp matches
+   

[GitHub] flink pull request #5342: [FLINK-8479] Timebounded stream join

2018-01-30 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5342#discussion_r164773927
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+
+// TODO: Make bucket granularity adaptable
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * As soon as elements are joined they are passed to a user-defined 
{@link JoinedProcessFunction},
+ * as a {@link Tuple2}, with f0 being the left element and f1 being the 
right element
+ *
+ * @param  The type of the elements in the left stream
+ * @param  The type of the elements in the right stream
+ * @param  The output type created by the user-defined function
+ */
+public class TimeBoundedStreamJoinOperator
+   extends AbstractUdfStreamOperator>
+   implements TwoInputStreamOperator {
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final long inverseLowerBound;
+   private final long inverseUpperBound;
+
+   private final boolean lowerBoundInclusive;
+   private final boolean upperBoundInclusive;
+
+   private final long bucketGranularity = 1;
+
+   private static final String LEFT_BUFFER = "LEFT_BUFFER";
+   private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
+   private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT";
+   private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT";
+
+   private transient ValueState lastCleanupRightBuffer;
+   private transient ValueState lastCleanupLeftBuffer;
+
+   private transient MapState>> 
leftBuffer;
+   private transient MapState>> 
rightBuffer;
+
+   private final TypeSerializer leftTypeSerializer;
+   private final TypeSerializer rightTypeSerializer;
+
+   private transient TimestampedCollector collector;
+
+   private ContextImpl context;
+
+   /**
+* Creates a new TimeBoundedStreamJoinOperator.
+*
+* @param lowerBound  The lower bound for evaluating if 
elements should be joined
+* @param upperBound  The upper bound for evaluating if 
elements should be joined
+* @param lowerBoundInclusive Whether or not to include elements where 
the timestamp matches
+

[GitHub] flink pull request #5342: [FLINK-8479] Timebounded stream join

2018-01-30 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5342#discussion_r164766198
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+
+// TODO: Make bucket granularity adaptable
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * As soon as elements are joined they are passed to a user-defined 
{@link JoinedProcessFunction},
+ * as a {@link Tuple2}, with f0 being the left element and f1 being the 
right element
+ *
+ * @param  The type of the elements in the left stream
+ * @param  The type of the elements in the right stream
+ * @param  The output type created by the user-defined function
+ */
+public class TimeBoundedStreamJoinOperator
+   extends AbstractUdfStreamOperator>
+   implements TwoInputStreamOperator {
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final long inverseLowerBound;
+   private final long inverseUpperBound;
+
+   private final boolean lowerBoundInclusive;
+   private final boolean upperBoundInclusive;
+
+   private final long bucketGranularity = 1;
+
+   private static final String LEFT_BUFFER = "LEFT_BUFFER";
+   private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
+   private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT";
+   private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT";
+
+   private transient ValueState lastCleanupRightBuffer;
+   private transient ValueState lastCleanupLeftBuffer;
+
+   private transient MapState>> 
leftBuffer;
+   private transient MapState>> 
rightBuffer;
+
+   private final TypeSerializer leftTypeSerializer;
+   private final TypeSerializer rightTypeSerializer;
+
+   private transient TimestampedCollector collector;
+
+   private ContextImpl context;
+
+   /**
+* Creates a new TimeBoundedStreamJoinOperator.
+*
+* @param lowerBound  The lower bound for evaluating if 
elements should be joined
+* @param upperBound  The upper bound for evaluating if 
elements should be joined
+* @param lowerBoundInclusive Whether or not to include elements where 
the timestamp matches
+   

[GitHub] flink pull request #5342: [FLINK-8479] Timebounded stream join

2018-01-30 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5342#discussion_r164763618
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+
+// TODO: Make bucket granularity adaptable
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * As soon as elements are joined they are passed to a user-defined 
{@link JoinedProcessFunction},
+ * as a {@link Tuple2}, with f0 being the left element and f1 being the 
right element
+ *
+ * @param  The type of the elements in the left stream
+ * @param  The type of the elements in the right stream
+ * @param  The output type created by the user-defined function
+ */
+public class TimeBoundedStreamJoinOperator
+   extends AbstractUdfStreamOperator>
+   implements TwoInputStreamOperator {
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final long inverseLowerBound;
+   private final long inverseUpperBound;
+
+   private final boolean lowerBoundInclusive;
+   private final boolean upperBoundInclusive;
+
+   private final long bucketGranularity = 1;
+
+   private static final String LEFT_BUFFER = "LEFT_BUFFER";
+   private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
+   private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT";
+   private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT";
+
+   private transient ValueState lastCleanupRightBuffer;
+   private transient ValueState lastCleanupLeftBuffer;
+
+   private transient MapState>> 
leftBuffer;
+   private transient MapState>> 
rightBuffer;
+
+   private final TypeSerializer leftTypeSerializer;
+   private final TypeSerializer rightTypeSerializer;
+
+   private transient TimestampedCollector collector;
+
+   private ContextImpl context;
+
+   /**
+* Creates a new TimeBoundedStreamJoinOperator.
+*
+* @param lowerBound  The lower bound for evaluating if 
elements should be joined
+* @param upperBound  The upper bound for evaluating if 
elements should be joined
+* @param lowerBoundInclusive Whether or not to include elements where 
the timestamp matches
+

[GitHub] flink pull request #5342: [FLINK-8479] Timebounded stream join

2018-01-30 Thread florianschmidt1994
Github user florianschmidt1994 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5342#discussion_r164747531
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+
+// TODO: Make bucket granularity adaptable
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * As soon as elements are joined they are passed to a user-defined 
{@link JoinedProcessFunction},
+ * as a {@link Tuple2}, with f0 being the left element and f1 being the 
right element
+ *
+ * @param  The type of the elements in the left stream
+ * @param  The type of the elements in the right stream
+ * @param  The output type created by the user-defined function
+ */
+public class TimeBoundedStreamJoinOperator
+   extends AbstractUdfStreamOperator>
+   implements TwoInputStreamOperator {
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final long inverseLowerBound;
+   private final long inverseUpperBound;
+
+   private final boolean lowerBoundInclusive;
+   private final boolean upperBoundInclusive;
+
+   private final long bucketGranularity = 1;
+
+   private static final String LEFT_BUFFER = "LEFT_BUFFER";
+   private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
+   private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT";
+   private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT";
+
+   private transient ValueState lastCleanupRightBuffer;
+   private transient ValueState lastCleanupLeftBuffer;
+
+   private transient MapState>> 
leftBuffer;
+   private transient MapState>> 
rightBuffer;
+
+   private final TypeSerializer leftTypeSerializer;
+   private final TypeSerializer rightTypeSerializer;
+
+   private transient TimestampedCollector collector;
+
+   private ContextImpl context;
+
+   /**
+* Creates a new TimeBoundedStreamJoinOperator.
+*
+* @param lowerBound  The lower bound for evaluating if 
elements should be joined
+* @param upperBound  The upper bound for evaluating if 
elements should be joined
+* @param lowerBoundInclusive Whether or not to include elements where 
the timestamp ma

[GitHub] flink pull request #5342: [FLINK-8479] Timebounded stream join

2018-01-30 Thread florianschmidt1994
Github user florianschmidt1994 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5342#discussion_r164708594
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+
+// TODO: Make bucket granularity adaptable
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * As soon as elements are joined they are passed to a user-defined 
{@link JoinedProcessFunction},
+ * as a {@link Tuple2}, with f0 being the left element and f1 being the 
right element
+ *
+ * @param  The type of the elements in the left stream
+ * @param  The type of the elements in the right stream
+ * @param  The output type created by the user-defined function
+ */
+public class TimeBoundedStreamJoinOperator
+   extends AbstractUdfStreamOperator>
+   implements TwoInputStreamOperator {
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final long inverseLowerBound;
+   private final long inverseUpperBound;
+
+   private final boolean lowerBoundInclusive;
+   private final boolean upperBoundInclusive;
+
+   private final long bucketGranularity = 1;
+
+   private static final String LEFT_BUFFER = "LEFT_BUFFER";
+   private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
+   private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT";
+   private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT";
+
+   private transient ValueState lastCleanupRightBuffer;
+   private transient ValueState lastCleanupLeftBuffer;
+
+   private transient MapState>> 
leftBuffer;
+   private transient MapState>> 
rightBuffer;
+
+   private final TypeSerializer leftTypeSerializer;
+   private final TypeSerializer rightTypeSerializer;
+
+   private transient TimestampedCollector collector;
+
+   private ContextImpl context;
+
+   /**
+* Creates a new TimeBoundedStreamJoinOperator.
+*
+* @param lowerBound  The lower bound for evaluating if 
elements should be joined
+* @param upperBound  The upper bound for evaluating if 
elements should be joined
+* @param lowerBoundInclusive Whether or not to include elements where 
the timestamp ma

[GitHub] flink pull request #5342: [FLINK-8479] Timebounded stream join

2018-01-29 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5342#discussion_r164639453
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+
+// TODO: Make bucket granularity adaptable
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * As soon as elements are joined they are passed to a user-defined 
{@link JoinedProcessFunction},
+ * as a {@link Tuple2}, with f0 being the left element and f1 being the 
right element
+ *
+ * @param  The type of the elements in the left stream
+ * @param  The type of the elements in the right stream
+ * @param  The output type created by the user-defined function
+ */
+public class TimeBoundedStreamJoinOperator
+   extends AbstractUdfStreamOperator>
+   implements TwoInputStreamOperator {
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final long inverseLowerBound;
+   private final long inverseUpperBound;
+
+   private final boolean lowerBoundInclusive;
+   private final boolean upperBoundInclusive;
+
+   private final long bucketGranularity = 1;
+
+   private static final String LEFT_BUFFER = "LEFT_BUFFER";
+   private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
+   private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT";
+   private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT";
+
+   private transient ValueState lastCleanupRightBuffer;
+   private transient ValueState lastCleanupLeftBuffer;
+
+   private transient MapState>> 
leftBuffer;
+   private transient MapState>> 
rightBuffer;
+
+   private final TypeSerializer leftTypeSerializer;
+   private final TypeSerializer rightTypeSerializer;
+
+   private transient TimestampedCollector collector;
+
+   private ContextImpl context;
+
+   /**
+* Creates a new TimeBoundedStreamJoinOperator.
+*
+* @param lowerBound  The lower bound for evaluating if 
elements should be joined
+* @param upperBound  The upper bound for evaluating if 
elements should be joined
+* @param lowerBoundInclusive Whether or not to include elements where 
the timestamp matches
   

[GitHub] flink pull request #5342: [FLINK-8479] Timebounded stream join

2018-01-29 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5342#discussion_r164639439
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+
+// TODO: Make bucket granularity adaptable
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * As soon as elements are joined they are passed to a user-defined 
{@link JoinedProcessFunction},
+ * as a {@link Tuple2}, with f0 being the left element and f1 being the 
right element
+ *
+ * @param  The type of the elements in the left stream
+ * @param  The type of the elements in the right stream
+ * @param  The output type created by the user-defined function
+ */
+public class TimeBoundedStreamJoinOperator
+   extends AbstractUdfStreamOperator>
+   implements TwoInputStreamOperator {
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final long inverseLowerBound;
+   private final long inverseUpperBound;
+
+   private final boolean lowerBoundInclusive;
+   private final boolean upperBoundInclusive;
+
+   private final long bucketGranularity = 1;
+
+   private static final String LEFT_BUFFER = "LEFT_BUFFER";
+   private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
+   private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT";
+   private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT";
+
+   private transient ValueState lastCleanupRightBuffer;
+   private transient ValueState lastCleanupLeftBuffer;
+
+   private transient MapState>> 
leftBuffer;
+   private transient MapState>> 
rightBuffer;
+
+   private final TypeSerializer leftTypeSerializer;
+   private final TypeSerializer rightTypeSerializer;
+
+   private transient TimestampedCollector collector;
+
+   private ContextImpl context;
+
+   /**
+* Creates a new TimeBoundedStreamJoinOperator.
+*
+* @param lowerBound  The lower bound for evaluating if 
elements should be joined
+* @param upperBound  The upper bound for evaluating if 
elements should be joined
+* @param lowerBoundInclusive Whether or not to include elements where 
the timestamp matches
   

[GitHub] flink pull request #5342: [FLINK-8479] Timebounded stream join

2018-01-29 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5342#discussion_r164631861
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+
+// TODO: Make bucket granularity adaptable
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * As soon as elements are joined they are passed to a user-defined 
{@link JoinedProcessFunction},
+ * as a {@link Tuple2}, with f0 being the left element and f1 being the 
right element
+ *
+ * @param  The type of the elements in the left stream
+ * @param  The type of the elements in the right stream
+ * @param  The output type created by the user-defined function
+ */
+public class TimeBoundedStreamJoinOperator
+   extends AbstractUdfStreamOperator>
+   implements TwoInputStreamOperator {
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final long inverseLowerBound;
+   private final long inverseUpperBound;
+
+   private final boolean lowerBoundInclusive;
+   private final boolean upperBoundInclusive;
+
+   private final long bucketGranularity = 1;
+
+   private static final String LEFT_BUFFER = "LEFT_BUFFER";
+   private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
+   private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT";
+   private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT";
+
+   private transient ValueState lastCleanupRightBuffer;
+   private transient ValueState lastCleanupLeftBuffer;
+
+   private transient MapState>> 
leftBuffer;
+   private transient MapState>> 
rightBuffer;
+
+   private final TypeSerializer leftTypeSerializer;
+   private final TypeSerializer rightTypeSerializer;
+
+   private transient TimestampedCollector collector;
+
+   private ContextImpl context;
+
+   /**
+* Creates a new TimeBoundedStreamJoinOperator.
+*
+* @param lowerBound  The lower bound for evaluating if 
elements should be joined
+* @param upperBound  The upper bound for evaluating if 
elements should be joined
+* @param lowerBoundInclusive Whether or not to include elements where 
the timestamp matches
   

[GitHub] flink pull request #5342: [FLINK-8479] Timebounded stream join

2018-01-29 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5342#discussion_r164631864
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+
+// TODO: Make bucket granularity adaptable
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * As soon as elements are joined they are passed to a user-defined 
{@link JoinedProcessFunction},
+ * as a {@link Tuple2}, with f0 being the left element and f1 being the 
right element
+ *
+ * @param  The type of the elements in the left stream
+ * @param  The type of the elements in the right stream
+ * @param  The output type created by the user-defined function
+ */
+public class TimeBoundedStreamJoinOperator
+   extends AbstractUdfStreamOperator>
+   implements TwoInputStreamOperator {
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final long inverseLowerBound;
+   private final long inverseUpperBound;
+
+   private final boolean lowerBoundInclusive;
+   private final boolean upperBoundInclusive;
+
+   private final long bucketGranularity = 1;
+
+   private static final String LEFT_BUFFER = "LEFT_BUFFER";
+   private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
+   private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT";
+   private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT";
+
+   private transient ValueState lastCleanupRightBuffer;
+   private transient ValueState lastCleanupLeftBuffer;
+
+   private transient MapState>> 
leftBuffer;
+   private transient MapState>> 
rightBuffer;
+
+   private final TypeSerializer leftTypeSerializer;
+   private final TypeSerializer rightTypeSerializer;
+
+   private transient TimestampedCollector collector;
+
+   private ContextImpl context;
+
+   /**
+* Creates a new TimeBoundedStreamJoinOperator.
+*
+* @param lowerBound  The lower bound for evaluating if 
elements should be joined
+* @param upperBound  The upper bound for evaluating if 
elements should be joined
+* @param lowerBoundInclusive Whether or not to include elements where 
the timestamp matches
   

[GitHub] flink pull request #5342: [FLINK-8479] Timebounded stream join

2018-01-29 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5342#discussion_r164631884
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+
+// TODO: Make bucket granularity adaptable
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * As soon as elements are joined they are passed to a user-defined 
{@link JoinedProcessFunction},
+ * as a {@link Tuple2}, with f0 being the left element and f1 being the 
right element
+ *
+ * @param  The type of the elements in the left stream
+ * @param  The type of the elements in the right stream
+ * @param  The output type created by the user-defined function
+ */
+public class TimeBoundedStreamJoinOperator
+   extends AbstractUdfStreamOperator>
+   implements TwoInputStreamOperator {
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final long inverseLowerBound;
+   private final long inverseUpperBound;
+
+   private final boolean lowerBoundInclusive;
+   private final boolean upperBoundInclusive;
+
+   private final long bucketGranularity = 1;
+
+   private static final String LEFT_BUFFER = "LEFT_BUFFER";
+   private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
+   private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT";
+   private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT";
+
+   private transient ValueState lastCleanupRightBuffer;
+   private transient ValueState lastCleanupLeftBuffer;
+
+   private transient MapState>> 
leftBuffer;
+   private transient MapState>> 
rightBuffer;
+
+   private final TypeSerializer leftTypeSerializer;
+   private final TypeSerializer rightTypeSerializer;
+
+   private transient TimestampedCollector collector;
+
+   private ContextImpl context;
+
+   /**
+* Creates a new TimeBoundedStreamJoinOperator.
+*
+* @param lowerBound  The lower bound for evaluating if 
elements should be joined
+* @param upperBound  The upper bound for evaluating if 
elements should be joined
+* @param lowerBoundInclusive Whether or not to include elements where 
the timestamp matches