[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator
[ https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16533686#comment-16533686 ] ASF GitHub Bot commented on FLINK-8479: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5342 > Implement time-bounded inner join of streams as a TwoInputStreamOperator > > > Key: FLINK-8479 > URL: https://issues.apache.org/jira/browse/FLINK-8479 > Project: Flink > Issue Type: Sub-task >Reporter: Florian Schmidt >Assignee: Florian Schmidt >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator
[ https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16533681#comment-16533681 ] ASF GitHub Bot commented on FLINK-8479: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5342 Thanks for the work @florianschmidt1994 ! Merging this. > Implement time-bounded inner join of streams as a TwoInputStreamOperator > > > Key: FLINK-8479 > URL: https://issues.apache.org/jira/browse/FLINK-8479 > Project: Flink > Issue Type: Sub-task >Reporter: Florian Schmidt >Assignee: Florian Schmidt >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator
[ https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511091#comment-16511091 ] ASF GitHub Bot commented on FLINK-8479: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5342 Thanks @florianschmidt1994 . I will, but may be not today. > Implement time-bounded inner join of streams as a TwoInputStreamOperator > > > Key: FLINK-8479 > URL: https://issues.apache.org/jira/browse/FLINK-8479 > Project: Flink > Issue Type: Sub-task >Reporter: Florian Schmidt >Assignee: Florian Schmidt >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator
[ https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511086#comment-16511086 ] ASF GitHub Bot commented on FLINK-8479: --- Github user florianschmidt1994 commented on the issue: https://github.com/apache/flink/pull/5342 @kl0u I made some changes on how I handle events on either side of the stream. By introducing some generic methods we can now reuse large parts of the code for either input stream and remove a lot of code duplication. Could you have another look at this? > Implement time-bounded inner join of streams as a TwoInputStreamOperator > > > Key: FLINK-8479 > URL: https://issues.apache.org/jira/browse/FLINK-8479 > Project: Flink > Issue Type: Sub-task >Reporter: Florian Schmidt >Assignee: Florian Schmidt >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator
[ https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391037#comment-16391037 ] ASF GitHub Bot commented on FLINK-8479: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5342 Changes look good to me! I will let it run on Travis and then merge. > Implement time-bounded inner join of streams as a TwoInputStreamOperator > > > Key: FLINK-8479 > URL: https://issues.apache.org/jira/browse/FLINK-8479 > Project: Flink > Issue Type: Sub-task >Reporter: Florian Schmidt >Assignee: Florian Schmidt >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator
[ https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363732#comment-16363732 ] ASF GitHub Bot commented on FLINK-8479: --- GitHub user florianschmidt1994 opened a pull request: https://github.com/apache/flink/pull/5482 [Flink-8480][DataStream] Add Java API for timebounded stream join ## What is the purpose of the change * Add a JavaAPI to the DataStream API to join two streams based on user-defined time boundaries * Design doc can be found here https://docs.google.com/document/d/16GMH5VM6JJiWj_N0W8y3PtQ1aoJFxsKoOTSYOfqlsRE/edit#heading=h.bgl260hr56g6 ## Brief change log * Add option`.between(Time, Time)` to streams that are already joined and have their key selectors `where` and `equalTo` defined * Add new inner class `TimeBounded` to `JoinedStreams`, which exposes `process(TimeBoundedJoinFunction)` as well as optional `upperBoundExclusive(boolean)` and `lowerBoundExclusive(boolean)` to the user * Add new integration test `TimeboundedJoinITCase` * **Depends on [FLINK-8479] to be merged** Full example usage: ```java streamOne .join(streamTwo) .where(new MyKeySelector()) .equalTo(new MyKeySelector()) .between(Time.milliseconds(-1), Time.milliseconds(1)) .process(new UdfTimeBoundedJoinFunction()) .addSink(new ResultSink()); ``` ## Verifying this change This change added tests and can be verified as follows: - Added integration tests in `TimeboundedJoinITCase` that validate parameter translation and execution ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - The serializers: no - The runtime per-record code paths (performance sensitive): yes - Anything that affects deployment or recovery: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? JavaDocs You can merge this pull request into a Git repository by running: $ git pull https://github.com/florianschmidt1994/flink flink-8480-timebounded-join-java-api Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5482.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5482 commit 34451540116d8bdd284fd01016a4cc74d8564d37 Author: Florian SchmidtDate: 2018-01-18T14:47:14Z [FLINK-8479] Implement TimeBoundedStreamJoinOperator This operator is the basis for performing an inner join on two streams using a time criteria defined as a lower and upper bound commit fe65b1ead0511b0df5d640c728f5ce9e273d7ed5 Author: Florian Schmidt Date: 2018-02-13T14:48:40Z [FLINK-8480][DataStream] Add java api for timebounded stream joins This commit adds a java implementation for timebounded stream joins. The usage looks roughly like the following: ```java streamOne .join(streamTwo) .where(new Tuple2KeyExtractor()) .equalTo(new Tuple2KeyExtractor()) .between(Time.milliseconds(0), Time.milliseconds(1)) .process(new CombineToStringJoinFunction()) .addSink(new ResultSink()); ``` This change adds the functionality in JoinedStreams.java and adds integration tests in TimeboundedJoinITCase.java > Implement time-bounded inner join of streams as a TwoInputStreamOperator > > > Key: FLINK-8479 > URL: https://issues.apache.org/jira/browse/FLINK-8479 > Project: Flink > Issue Type: Sub-task >Reporter: Florian Schmidt >Assignee: Florian Schmidt >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator
[ https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345220#comment-16345220 ] ASF GitHub Bot commented on FLINK-8479: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5342#discussion_r164776439 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java --- @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.BooleanSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO; + +// TODO: Make bucket granularity adaptable +/** + * A TwoInputStreamOperator to execute time-bounded stream inner joins. + * + * By using a configurable lower and upper bound this operator will emit exactly those pairs + * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both the lower and the + * upper bound can be configured to be either inclusive or exclusive. + * + * As soon as elements are joined they are passed to a user-defined {@link JoinedProcessFunction}, + * as a {@link Tuple2}, with f0 being the left element and f1 being the right element + * + * @param The type of the elements in the left stream + * @param The type of the elements in the right stream + * @param The output type created by the user-defined function + */ +public class TimeBoundedStreamJoinOperator+ extends AbstractUdfStreamOperator > + implements TwoInputStreamOperator { + + private final long lowerBound; + private final long upperBound; + + private final long inverseLowerBound; + private final long inverseUpperBound; + + private final boolean lowerBoundInclusive; + private final boolean upperBoundInclusive; + + private final long bucketGranularity = 1; + + private static final String LEFT_BUFFER = "LEFT_BUFFER"; + private static final String RIGHT_BUFFER = "RIGHT_BUFFER"; + private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT"; + private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT"; + + private transient ValueState lastCleanupRightBuffer; + private transient ValueState lastCleanupLeftBuffer; + + private transient MapState >> leftBuffer; + private transient MapState >> rightBuffer; + + private final TypeSerializer leftTypeSerializer; + private final TypeSerializer rightTypeSerializer; + + private transient TimestampedCollector collector; + + private ContextImpl context; + +
[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator
[ https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345206#comment-16345206 ] ASF GitHub Bot commented on FLINK-8479: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5342#discussion_r164773927 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java --- @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.BooleanSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO; + +// TODO: Make bucket granularity adaptable +/** + * A TwoInputStreamOperator to execute time-bounded stream inner joins. + * + * By using a configurable lower and upper bound this operator will emit exactly those pairs + * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both the lower and the + * upper bound can be configured to be either inclusive or exclusive. + * + * As soon as elements are joined they are passed to a user-defined {@link JoinedProcessFunction}, + * as a {@link Tuple2}, with f0 being the left element and f1 being the right element + * + * @param The type of the elements in the left stream + * @param The type of the elements in the right stream + * @param The output type created by the user-defined function + */ +public class TimeBoundedStreamJoinOperator+ extends AbstractUdfStreamOperator > + implements TwoInputStreamOperator { + + private final long lowerBound; + private final long upperBound; + + private final long inverseLowerBound; + private final long inverseUpperBound; + + private final boolean lowerBoundInclusive; + private final boolean upperBoundInclusive; + + private final long bucketGranularity = 1; + + private static final String LEFT_BUFFER = "LEFT_BUFFER"; + private static final String RIGHT_BUFFER = "RIGHT_BUFFER"; + private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT"; + private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT"; + + private transient ValueState lastCleanupRightBuffer; + private transient ValueState lastCleanupLeftBuffer; + + private transient MapState >> leftBuffer; + private transient MapState >> rightBuffer; + + private final TypeSerializer leftTypeSerializer; + private final TypeSerializer rightTypeSerializer; + + private transient TimestampedCollector collector; + + private ContextImpl context; + +
[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator
[ https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345166#comment-16345166 ] ASF GitHub Bot commented on FLINK-8479: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5342#discussion_r164766198 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java --- @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.BooleanSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO; + +// TODO: Make bucket granularity adaptable +/** + * A TwoInputStreamOperator to execute time-bounded stream inner joins. + * + * By using a configurable lower and upper bound this operator will emit exactly those pairs + * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both the lower and the + * upper bound can be configured to be either inclusive or exclusive. + * + * As soon as elements are joined they are passed to a user-defined {@link JoinedProcessFunction}, + * as a {@link Tuple2}, with f0 being the left element and f1 being the right element + * + * @param The type of the elements in the left stream + * @param The type of the elements in the right stream + * @param The output type created by the user-defined function + */ +public class TimeBoundedStreamJoinOperator+ extends AbstractUdfStreamOperator > + implements TwoInputStreamOperator { + + private final long lowerBound; + private final long upperBound; + + private final long inverseLowerBound; + private final long inverseUpperBound; + + private final boolean lowerBoundInclusive; + private final boolean upperBoundInclusive; + + private final long bucketGranularity = 1; + + private static final String LEFT_BUFFER = "LEFT_BUFFER"; + private static final String RIGHT_BUFFER = "RIGHT_BUFFER"; + private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT"; + private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT"; + + private transient ValueState lastCleanupRightBuffer; + private transient ValueState lastCleanupLeftBuffer; + + private transient MapState >> leftBuffer; + private transient MapState >> rightBuffer; + + private final TypeSerializer leftTypeSerializer; + private final TypeSerializer rightTypeSerializer; + + private transient TimestampedCollector collector; + + private ContextImpl context; + +
[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator
[ https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345140#comment-16345140 ] ASF GitHub Bot commented on FLINK-8479: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5342#discussion_r164763618 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java --- @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.BooleanSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO; + +// TODO: Make bucket granularity adaptable +/** + * A TwoInputStreamOperator to execute time-bounded stream inner joins. + * + * By using a configurable lower and upper bound this operator will emit exactly those pairs + * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both the lower and the + * upper bound can be configured to be either inclusive or exclusive. + * + * As soon as elements are joined they are passed to a user-defined {@link JoinedProcessFunction}, + * as a {@link Tuple2}, with f0 being the left element and f1 being the right element + * + * @param The type of the elements in the left stream + * @param The type of the elements in the right stream + * @param The output type created by the user-defined function + */ +public class TimeBoundedStreamJoinOperator+ extends AbstractUdfStreamOperator > + implements TwoInputStreamOperator { + + private final long lowerBound; + private final long upperBound; + + private final long inverseLowerBound; + private final long inverseUpperBound; + + private final boolean lowerBoundInclusive; + private final boolean upperBoundInclusive; + + private final long bucketGranularity = 1; + + private static final String LEFT_BUFFER = "LEFT_BUFFER"; + private static final String RIGHT_BUFFER = "RIGHT_BUFFER"; + private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT"; + private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT"; + + private transient ValueState lastCleanupRightBuffer; + private transient ValueState lastCleanupLeftBuffer; + + private transient MapState >> leftBuffer; + private transient MapState >> rightBuffer; + + private final TypeSerializer leftTypeSerializer; + private final TypeSerializer rightTypeSerializer; + + private transient TimestampedCollector collector; + + private ContextImpl context; + +
[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator
[ https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345056#comment-16345056 ] ASF GitHub Bot commented on FLINK-8479: --- Github user florianschmidt1994 commented on a diff in the pull request: https://github.com/apache/flink/pull/5342#discussion_r164747531 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java --- @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.BooleanSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO; + +// TODO: Make bucket granularity adaptable +/** + * A TwoInputStreamOperator to execute time-bounded stream inner joins. + * + * By using a configurable lower and upper bound this operator will emit exactly those pairs + * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both the lower and the + * upper bound can be configured to be either inclusive or exclusive. + * + * As soon as elements are joined they are passed to a user-defined {@link JoinedProcessFunction}, + * as a {@link Tuple2}, with f0 being the left element and f1 being the right element + * + * @param The type of the elements in the left stream + * @param The type of the elements in the right stream + * @param The output type created by the user-defined function + */ +public class TimeBoundedStreamJoinOperator+ extends AbstractUdfStreamOperator > + implements TwoInputStreamOperator { + + private final long lowerBound; + private final long upperBound; + + private final long inverseLowerBound; + private final long inverseUpperBound; + + private final boolean lowerBoundInclusive; + private final boolean upperBoundInclusive; + + private final long bucketGranularity = 1; + + private static final String LEFT_BUFFER = "LEFT_BUFFER"; + private static final String RIGHT_BUFFER = "RIGHT_BUFFER"; + private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT"; + private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT"; + + private transient ValueState lastCleanupRightBuffer; + private transient ValueState lastCleanupLeftBuffer; + + private transient MapState >> leftBuffer; + private transient MapState >> rightBuffer; + + private final TypeSerializer leftTypeSerializer; + private final TypeSerializer rightTypeSerializer; + + private transient TimestampedCollector collector; + + private ContextImpl context;
[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator
[ https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344882#comment-16344882 ] ASF GitHub Bot commented on FLINK-8479: --- Github user florianschmidt1994 commented on a diff in the pull request: https://github.com/apache/flink/pull/5342#discussion_r164708594 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java --- @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.BooleanSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO; + +// TODO: Make bucket granularity adaptable +/** + * A TwoInputStreamOperator to execute time-bounded stream inner joins. + * + * By using a configurable lower and upper bound this operator will emit exactly those pairs + * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both the lower and the + * upper bound can be configured to be either inclusive or exclusive. + * + * As soon as elements are joined they are passed to a user-defined {@link JoinedProcessFunction}, + * as a {@link Tuple2}, with f0 being the left element and f1 being the right element + * + * @param The type of the elements in the left stream + * @param The type of the elements in the right stream + * @param The output type created by the user-defined function + */ +public class TimeBoundedStreamJoinOperator+ extends AbstractUdfStreamOperator > + implements TwoInputStreamOperator { + + private final long lowerBound; + private final long upperBound; + + private final long inverseLowerBound; + private final long inverseUpperBound; + + private final boolean lowerBoundInclusive; + private final boolean upperBoundInclusive; + + private final long bucketGranularity = 1; + + private static final String LEFT_BUFFER = "LEFT_BUFFER"; + private static final String RIGHT_BUFFER = "RIGHT_BUFFER"; + private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT"; + private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT"; + + private transient ValueState lastCleanupRightBuffer; + private transient ValueState lastCleanupLeftBuffer; + + private transient MapState >> leftBuffer; + private transient MapState >> rightBuffer; + + private final TypeSerializer leftTypeSerializer; + private final TypeSerializer rightTypeSerializer; + + private transient TimestampedCollector collector; + + private ContextImpl context;
[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator
[ https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344862#comment-16344862 ] ASF GitHub Bot commented on FLINK-8479: --- Github user florianschmidt1994 commented on the issue: https://github.com/apache/flink/pull/5342 @hequn8128 Thank you for the review. Regarding your concern about delaying the watermark I added some sketches and description about my thought process to the design document. > Implement time-bounded inner join of streams as a TwoInputStreamOperator > > > Key: FLINK-8479 > URL: https://issues.apache.org/jira/browse/FLINK-8479 > Project: Flink > Issue Type: Sub-task >Reporter: Florian Schmidt >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator
[ https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344501#comment-16344501 ] ASF GitHub Bot commented on FLINK-8479: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5342#discussion_r164639453 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java --- @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.BooleanSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO; + +// TODO: Make bucket granularity adaptable +/** + * A TwoInputStreamOperator to execute time-bounded stream inner joins. + * + * By using a configurable lower and upper bound this operator will emit exactly those pairs + * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both the lower and the + * upper bound can be configured to be either inclusive or exclusive. + * + * As soon as elements are joined they are passed to a user-defined {@link JoinedProcessFunction}, + * as a {@link Tuple2}, with f0 being the left element and f1 being the right element + * + * @param The type of the elements in the left stream + * @param The type of the elements in the right stream + * @param The output type created by the user-defined function + */ +public class TimeBoundedStreamJoinOperator+ extends AbstractUdfStreamOperator > + implements TwoInputStreamOperator { + + private final long lowerBound; + private final long upperBound; + + private final long inverseLowerBound; + private final long inverseUpperBound; + + private final boolean lowerBoundInclusive; + private final boolean upperBoundInclusive; + + private final long bucketGranularity = 1; + + private static final String LEFT_BUFFER = "LEFT_BUFFER"; + private static final String RIGHT_BUFFER = "RIGHT_BUFFER"; + private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT"; + private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT"; + + private transient ValueState lastCleanupRightBuffer; + private transient ValueState lastCleanupLeftBuffer; + + private transient MapState >> leftBuffer; + private transient MapState >> rightBuffer; + + private final TypeSerializer leftTypeSerializer; + private final TypeSerializer rightTypeSerializer; + + private transient TimestampedCollector collector; + + private ContextImpl context; +
[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator
[ https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344500#comment-16344500 ] ASF GitHub Bot commented on FLINK-8479: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5342#discussion_r164639439 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java --- @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.BooleanSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO; + +// TODO: Make bucket granularity adaptable +/** + * A TwoInputStreamOperator to execute time-bounded stream inner joins. + * + * By using a configurable lower and upper bound this operator will emit exactly those pairs + * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both the lower and the + * upper bound can be configured to be either inclusive or exclusive. + * + * As soon as elements are joined they are passed to a user-defined {@link JoinedProcessFunction}, + * as a {@link Tuple2}, with f0 being the left element and f1 being the right element + * + * @param The type of the elements in the left stream + * @param The type of the elements in the right stream + * @param The output type created by the user-defined function + */ +public class TimeBoundedStreamJoinOperator+ extends AbstractUdfStreamOperator > + implements TwoInputStreamOperator { + + private final long lowerBound; + private final long upperBound; + + private final long inverseLowerBound; + private final long inverseUpperBound; + + private final boolean lowerBoundInclusive; + private final boolean upperBoundInclusive; + + private final long bucketGranularity = 1; + + private static final String LEFT_BUFFER = "LEFT_BUFFER"; + private static final String RIGHT_BUFFER = "RIGHT_BUFFER"; + private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT"; + private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT"; + + private transient ValueState lastCleanupRightBuffer; + private transient ValueState lastCleanupLeftBuffer; + + private transient MapState >> leftBuffer; + private transient MapState >> rightBuffer; + + private final TypeSerializer leftTypeSerializer; + private final TypeSerializer rightTypeSerializer; + + private transient TimestampedCollector collector; + + private ContextImpl context; +
[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator
[ https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1636#comment-1636 ] ASF GitHub Bot commented on FLINK-8479: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5342#discussion_r164631861 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java --- @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.BooleanSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO; + +// TODO: Make bucket granularity adaptable +/** + * A TwoInputStreamOperator to execute time-bounded stream inner joins. + * + * By using a configurable lower and upper bound this operator will emit exactly those pairs + * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both the lower and the + * upper bound can be configured to be either inclusive or exclusive. + * + * As soon as elements are joined they are passed to a user-defined {@link JoinedProcessFunction}, + * as a {@link Tuple2}, with f0 being the left element and f1 being the right element + * + * @param The type of the elements in the left stream + * @param The type of the elements in the right stream + * @param The output type created by the user-defined function + */ +public class TimeBoundedStreamJoinOperator+ extends AbstractUdfStreamOperator > + implements TwoInputStreamOperator { + + private final long lowerBound; + private final long upperBound; + + private final long inverseLowerBound; + private final long inverseUpperBound; + + private final boolean lowerBoundInclusive; + private final boolean upperBoundInclusive; + + private final long bucketGranularity = 1; + + private static final String LEFT_BUFFER = "LEFT_BUFFER"; + private static final String RIGHT_BUFFER = "RIGHT_BUFFER"; + private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT"; + private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT"; + + private transient ValueState lastCleanupRightBuffer; + private transient ValueState lastCleanupLeftBuffer; + + private transient MapState >> leftBuffer; + private transient MapState >> rightBuffer; + + private final TypeSerializer leftTypeSerializer; + private final TypeSerializer rightTypeSerializer; + + private transient TimestampedCollector collector; + + private ContextImpl context; +
[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator
[ https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1638#comment-1638 ] ASF GitHub Bot commented on FLINK-8479: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5342#discussion_r164631884 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java --- @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.BooleanSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO; + +// TODO: Make bucket granularity adaptable +/** + * A TwoInputStreamOperator to execute time-bounded stream inner joins. + * + * By using a configurable lower and upper bound this operator will emit exactly those pairs + * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both the lower and the + * upper bound can be configured to be either inclusive or exclusive. + * + * As soon as elements are joined they are passed to a user-defined {@link JoinedProcessFunction}, + * as a {@link Tuple2}, with f0 being the left element and f1 being the right element + * + * @param The type of the elements in the left stream + * @param The type of the elements in the right stream + * @param The output type created by the user-defined function + */ +public class TimeBoundedStreamJoinOperator+ extends AbstractUdfStreamOperator > + implements TwoInputStreamOperator { + + private final long lowerBound; + private final long upperBound; + + private final long inverseLowerBound; + private final long inverseUpperBound; + + private final boolean lowerBoundInclusive; + private final boolean upperBoundInclusive; + + private final long bucketGranularity = 1; + + private static final String LEFT_BUFFER = "LEFT_BUFFER"; + private static final String RIGHT_BUFFER = "RIGHT_BUFFER"; + private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT"; + private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT"; + + private transient ValueState lastCleanupRightBuffer; + private transient ValueState lastCleanupLeftBuffer; + + private transient MapState >> leftBuffer; + private transient MapState >> rightBuffer; + + private final TypeSerializer leftTypeSerializer; + private final TypeSerializer rightTypeSerializer; + + private transient TimestampedCollector collector; + + private ContextImpl context; +
[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator
[ https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1637#comment-1637 ] ASF GitHub Bot commented on FLINK-8479: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5342#discussion_r164631864 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java --- @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.BooleanSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO; + +// TODO: Make bucket granularity adaptable +/** + * A TwoInputStreamOperator to execute time-bounded stream inner joins. + * + * By using a configurable lower and upper bound this operator will emit exactly those pairs + * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both the lower and the + * upper bound can be configured to be either inclusive or exclusive. + * + * As soon as elements are joined they are passed to a user-defined {@link JoinedProcessFunction}, + * as a {@link Tuple2}, with f0 being the left element and f1 being the right element + * + * @param The type of the elements in the left stream + * @param The type of the elements in the right stream + * @param The output type created by the user-defined function + */ +public class TimeBoundedStreamJoinOperator+ extends AbstractUdfStreamOperator > + implements TwoInputStreamOperator { + + private final long lowerBound; + private final long upperBound; + + private final long inverseLowerBound; + private final long inverseUpperBound; + + private final boolean lowerBoundInclusive; + private final boolean upperBoundInclusive; + + private final long bucketGranularity = 1; + + private static final String LEFT_BUFFER = "LEFT_BUFFER"; + private static final String RIGHT_BUFFER = "RIGHT_BUFFER"; + private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT"; + private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT"; + + private transient ValueState lastCleanupRightBuffer; + private transient ValueState lastCleanupLeftBuffer; + + private transient MapState >> leftBuffer; + private transient MapState >> rightBuffer; + + private final TypeSerializer leftTypeSerializer; + private final TypeSerializer rightTypeSerializer; + + private transient TimestampedCollector collector; + + private ContextImpl context; +
[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator
[ https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16343044#comment-16343044 ] ASF GitHub Bot commented on FLINK-8479: --- Github user florianschmidt1994 commented on the issue: https://github.com/apache/flink/pull/5342 @bowenli86 the document should now be be public for everyone to comment on. Yes, it caches data on either side, and for each incoming element it looks up eligible records from the other side, and joins and emits those if they fulfil the user criteria. Entries get removed from the cache whenever they are too old to be joined, which is determined by a combination of the current watermark and the time boundary defined by the user. > Implement time-bounded inner join of streams as a TwoInputStreamOperator > > > Key: FLINK-8479 > URL: https://issues.apache.org/jira/browse/FLINK-8479 > Project: Flink > Issue Type: Sub-task >Reporter: Florian Schmidt >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator
[ https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16341638#comment-16341638 ] ASF GitHub Bot commented on FLINK-8479: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5342 Very interesting! two things: 1. can you make the google doc publicly viewable? I cannot access it right now 2. how does it handle event time window joins of two streams, where data in one stream always quite late than the other? For example, we are joining stream A and B on a 10 min event-time tumbling window from 12:00 -12:10, 12:10 - 12:20 data in stream B always arrive 30 mins later than the data in stream A. How does the operators handle that? Does it cache A's data until B's data arrives, do the join, and remove A's data from cache? (I haven't read the code in detail, just try to get a general idea of the overall design) > Implement time-bounded inner join of streams as a TwoInputStreamOperator > > > Key: FLINK-8479 > URL: https://issues.apache.org/jira/browse/FLINK-8479 > Project: Flink > Issue Type: Sub-task >Reporter: Florian Schmidt >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator
[ https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337354#comment-16337354 ] ASF GitHub Bot commented on FLINK-8479: --- Github user florianschmidt1994 commented on the issue: https://github.com/apache/flink/pull/5342 Yeah seems like I made a typo there. It's actually https://issues.apache.org/jira/browse/FLINK-8479, I just fixed the title. > Implement time-bounded inner join of streams as a TwoInputStreamOperator > > > Key: FLINK-8479 > URL: https://issues.apache.org/jira/browse/FLINK-8479 > Project: Flink > Issue Type: Sub-task >Reporter: Florian Schmidt >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)