[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4625 ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r14387 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +*/ + def getMaxOutputDelay: Long = Math.max(leftRelativeSize,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r143487841 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +*/ + def getMaxOutputDelay: Long = Math.max(leftRelativeSize,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r143448956 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +*/ + def getMaxOutputDelay: Long = Math.max(leftRelativeSize,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r143445293 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +*/ + def getMaxOutputDelay: Long = Math.max(leftRelativeSize,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r143417404 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +*/ + def getMaxOutputDelay: Long = Math.max(leftRelativeSize,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r143412087 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +*/ + def getMaxOutputDelay: Long = Math.max(leftRelativeSize,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r143401988 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala --- @@ -102,5 +117,154 @@ class JoinITCase extends StreamingWithStateTestBase { env.execute() } + /** test rowtime inner join **/ + @Test + def testRowTimeInnerJoin(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStateBackend(getStateBackend) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +StreamITCase.clear +env.setParallelism(1) + +val sqlQuery = + """ +|SELECT t2.a, t2.c, t1.c +|FROM T1 as t1 join T2 as t2 ON +| t1.a = t2.a AND +| t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND +|t2.rt + INTERVAL '6' SECOND +|""".stripMargin + +val data1 = new mutable.MutableList[(Int, Long, String, Long)] --- End diff -- Yes, you understood the problem correctly. Without PR #4732, join keys are mapped to `Tuple` which do not support null fields unless the field type supports them (which is the case for String but not other primitives). With #4732 keys are mapped to `Row` which supports null fields but treats `null == null`. Therefore, we need to add these predicates into the code-gen'd conditions that correctly evaluate the predicates according to three-value logic. After #4732 all types will support null value keys. ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r143401295 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +*/ + def getMaxOutputDelay: Long = Math.max(leftRelativeSize,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r143400792 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +*/ + def getMaxOutputDelay: Long = Math.max(leftRelativeSize,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r143400052 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -184,4 +229,50 @@ class DataStreamWindowJoin( .returns(returnTypeInfo) } } + + def createRowTimeInnerJoin( + leftDataStream: DataStream[CRow], + rightDataStream: DataStream[CRow], + returnTypeInfo: TypeInformation[CRow], + joinFunctionName: String, + joinFunctionCode: String, + leftKeys: Array[Int], + rightKeys: Array[Int]): DataStream[CRow] = { + +val rowTimeInnerJoinFunc = new RowTimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + allowedLateness = 0L, + leftSchema.typeInfo, + rightSchema.typeInfo, + joinFunctionName, + joinFunctionCode, + leftTimeIdx, + rightTimeIdx) + +if (!leftKeys.isEmpty) { + leftDataStream +.connect(rightDataStream) +.keyBy(leftKeys, rightKeys) --- End diff -- That's a good point. The motivation for this restriction is to prevent nested-loop joins for the batch execution. In the streaming window join case, no equi-join keys would result in a single thread execution which is not efficient either (could be parallelized by broadcasting one of the inputs but without statistics this is quite risky) but not as bad as a full nested-loop join because we can bound the computation due to the window predicates. We could add a boolean flag to the constructor of `FlinkLogicalJoinConverter` to allow cross joins. Right now the set of logical optimization rules is shared by DataSet and DataStream. I'd keep the joint rule set for now and just add an additional rule by overriding `getLogicalOptRuleSet` in `StreamExecutionEnvironment`. ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r143357544 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala --- @@ -102,5 +117,154 @@ class JoinITCase extends StreamingWithStateTestBase { env.execute() } + /** test rowtime inner join **/ + @Test + def testRowTimeInnerJoin(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStateBackend(getStateBackend) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +StreamITCase.clear +env.setParallelism(1) + +val sqlQuery = + """ +|SELECT t2.a, t2.c, t1.c +|FROM T1 as t1 join T2 as t2 ON +| t1.a = t2.a AND +| t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND +|t2.rt + INTERVAL '6' SECOND +|""".stripMargin + +val data1 = new mutable.MutableList[(Int, Long, String, Long)] --- End diff -- Currently not all types are supported for `null` keys. I only use the `String` type for test. ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r143231150 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala --- @@ -102,5 +117,154 @@ class JoinITCase extends StreamingWithStateTestBase { env.execute() } + /** test rowtime inner join **/ + @Test + def testRowTimeInnerJoin(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStateBackend(getStateBackend) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +StreamITCase.clear +env.setParallelism(1) + +val sqlQuery = + """ +|SELECT t2.a, t2.c, t1.c +|FROM T1 as t1 join T2 as t2 ON +| t1.a = t2.a AND +| t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND +|t2.rt + INTERVAL '6' SECOND +|""".stripMargin + +val data1 = new mutable.MutableList[(Int, Long, String, Long)] --- End diff -- If I understand correctly, this problem was caused by the different semantics of `Null` in SQL and other common languages (i.e., `Null =? Null`). We transform the equi-conditions to a `keyBy` operation and maybe rows with `Null` keys are mapped to the same group, thus being taken as identical? ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r143222605 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -184,4 +229,50 @@ class DataStreamWindowJoin( .returns(returnTypeInfo) } } + + def createRowTimeInnerJoin( + leftDataStream: DataStream[CRow], + rightDataStream: DataStream[CRow], + returnTypeInfo: TypeInformation[CRow], + joinFunctionName: String, + joinFunctionCode: String, + leftKeys: Array[Int], + rightKeys: Array[Int]): DataStream[CRow] = { + +val rowTimeInnerJoinFunc = new RowTimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + allowedLateness = 0L, + leftSchema.typeInfo, + rightSchema.typeInfo, + joinFunctionName, + joinFunctionCode, + leftTimeIdx, + rightTimeIdx) + +if (!leftKeys.isEmpty) { + leftDataStream +.connect(rightDataStream) +.keyBy(leftKeys, rightKeys) --- End diff -- Actually, I was quite confused about this condition `!leftKeys.isEmpty`. Since in `FlinkLogicalJoin.scala`, queries without equi-conditions are blocked, when will this condition be hold? ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r143211000 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +*/ + def getMaxOutputDelay: Long = Math.max(leftRelativeSize,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r143208392 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +*/ + def getMaxOutputDelay: Long = Math.max(leftRelativeSize,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142891284 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; --- End diff -- Forgive me for the repeated silly sequelae ð¤¦ââï¸ ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142762761 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +*/ + def getMaxOutputDelay: Long = Math.max(leftRelativeSize,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142762460 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +*/ + def getMaxOutputDelay: Long = Math.max(leftRelativeSize,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142699858 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +*/ + def getMaxOutputDelay: Long = Math.max(leftRelativeSize,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142710061 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +*/ + def getMaxOutputDelay: Long = Math.max(leftRelativeSize,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142703252 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; --- End diff -- rm `;` ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142691841 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row + +/** + * The function to execute row(event) time bounded stream inner-join. + */ +final class RowTimeBoundedStreamInnerJoin( +leftLowerBound: Long, +leftUpperBound: Long, +allowedLateness: Long, +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +genJoinFuncName: String, +genJoinFuncCode: String, +leftTimeIdx: Int, +rightTimeIdx: Int) +extends TimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + allowedLateness, + leftType, + rightType, + genJoinFuncName, + genJoinFuncCode, + leftTimeIdx, + rightTimeIdx) { + + override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, CRow]#Context): Unit = { +leftOperatorTime = + if (ctx.timerService().currentWatermark() > 0) ctx.timerService().currentWatermark() + else 0L +rightOperatorTime = --- End diff -- just use `leftOperatorTime` to avoid the additional method calls and condition? ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142685896 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -184,4 +229,50 @@ class DataStreamWindowJoin( .returns(returnTypeInfo) } } + + def createRowTimeInnerJoin( + leftDataStream: DataStream[CRow], + rightDataStream: DataStream[CRow], + returnTypeInfo: TypeInformation[CRow], + joinFunctionName: String, + joinFunctionCode: String, + leftKeys: Array[Int], + rightKeys: Array[Int]): DataStream[CRow] = { + +val rowTimeInnerJoinFunc = new RowTimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + allowedLateness = 0L, + leftSchema.typeInfo, + rightSchema.typeInfo, + joinFunctionName, + joinFunctionCode, + leftTimeIdx, + rightTimeIdx) + +if (!leftKeys.isEmpty) { + leftDataStream +.connect(rightDataStream) +.keyBy(leftKeys, rightKeys) --- End diff -- we need to make sure to include the fixes of #4732 ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142705409 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +*/ + def getMaxOutputDelay: Long = Math.max(leftRelativeSize,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142714462 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +*/ + def getMaxOutputDelay: Long = Math.max(leftRelativeSize,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142762915 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +*/ + def getMaxOutputDelay: Long = Math.max(leftRelativeSize,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142706610 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +*/ + def getMaxOutputDelay: Long = Math.max(leftRelativeSize,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142694926 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +*/ + def getMaxOutputDelay: Long = Math.max(leftRelativeSize,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142697408 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +*/ + def getMaxOutputDelay: Long = Math.max(leftRelativeSize,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142705185 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +*/ + def getMaxOutputDelay: Long = Math.max(leftRelativeSize,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142689196 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -184,4 +229,50 @@ class DataStreamWindowJoin( .returns(returnTypeInfo) } } + + def createRowTimeInnerJoin( + leftDataStream: DataStream[CRow], + rightDataStream: DataStream[CRow], + returnTypeInfo: TypeInformation[CRow], + joinFunctionName: String, + joinFunctionCode: String, + leftKeys: Array[Int], + rightKeys: Array[Int]): DataStream[CRow] = { + +val rowTimeInnerJoinFunc = new RowTimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + allowedLateness = 0L, + leftSchema.typeInfo, + rightSchema.typeInfo, + joinFunctionName, + joinFunctionCode, + leftTimeIdx, + rightTimeIdx) + +if (!leftKeys.isEmpty) { + leftDataStream +.connect(rightDataStream) +.keyBy(leftKeys, rightKeys) +.transform( + "InnerRowtimeWindowJoin", + returnTypeInfo, + new KeyedCoProcessOperatorWithWatermarkDelay[CRow, CRow, CRow, CRow]( +rowTimeInnerJoinFunc, +rowTimeInnerJoinFunc.getMaxOutputDelay) +) +} else { + leftDataStream.connect(rightDataStream) +.keyBy(new NullByteKeySelector[CRow](), new NullByteKeySelector[CRow]) +.transform( + "InnerRowtimeWindowJoin", + returnTypeInfo, + new KeyedCoProcessOperatorWithWatermarkDelay[CRow, CRow, CRow, CRow]( --- End diff -- `KEY` type is `Byte` instead of `CRow` ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142682283 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -152,19 +176,40 @@ class DataStreamWindowJoin( } } - def createProcTimeInnerJoinFunction( + def createEmptyInnerJoin( + leftDataStream: DataStream[CRow], + rightDataStream: DataStream[CRow], + returnTypeInfo: TypeInformation[CRow]) = { +leftDataStream.connect(rightDataStream).process( + new CoProcessFunction[CRow, CRow, CRow] { +override def processElement1( + value: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]) = { + //Do nothing. +} +override def processElement2( + value: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]) = { + //Do nothing. +} + }) --- End diff -- add a `returns(returnTypeInfo)` call to ensure we use the right type. ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142769397 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala --- @@ -102,5 +117,154 @@ class JoinITCase extends StreamingWithStateTestBase { env.execute() } + /** test rowtime inner join **/ + @Test + def testRowTimeInnerJoin(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStateBackend(getStateBackend) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +StreamITCase.clear +env.setParallelism(1) + +val sqlQuery = + """ +|SELECT t2.a, t2.c, t1.c +|FROM T1 as t1 join T2 as t2 ON +| t1.a = t2.a AND +| t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND +|t2.rt + INTERVAL '6' SECOND +|""".stripMargin + +val data1 = new mutable.MutableList[(Int, Long, String, Long)] --- End diff -- Add two rows with null keys on both sides within join window boundaries to test that join predicates on null values are not evaluated to true. For this to work we need to also fix the `keyBy()` calls to support partitioning of null keys (see #4732) ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142681690 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -152,19 +176,40 @@ class DataStreamWindowJoin( } } - def createProcTimeInnerJoinFunction( + def createEmptyInnerJoin( --- End diff -- please add the return type for the method `DataStream[CRow]` ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142681890 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -152,19 +176,40 @@ class DataStreamWindowJoin( } } - def createProcTimeInnerJoinFunction( + def createEmptyInnerJoin( + leftDataStream: DataStream[CRow], + rightDataStream: DataStream[CRow], + returnTypeInfo: TypeInformation[CRow]) = { +leftDataStream.connect(rightDataStream).process( + new CoProcessFunction[CRow, CRow, CRow] { +override def processElement1( --- End diff -- add `Unit` return types for both `processElement` methods. ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142679463 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala --- @@ -105,6 +104,8 @@ class DataStreamWindowJoinRule windowBounds.get.isEventTime, windowBounds.get.leftLowerBound, windowBounds.get.leftUpperBound, + windowBounds.get.leftTimeIdx, + windowBounds.get.rightTimeIdx, remainCondition, --- End diff -- The `remainCondition` must include the equi-join predicates to ensure that the join condition is correctly evaluated for `null` values (see FLINK-7755 for details). To solve this, I'd suggest to call `WindowJoinUtil.extractWindowBoundsFromPredicate` with `join.getCondition` instead of `joinInfo.getRemaining(join.getCluster.getRexBuilder)`. ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142688996 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -184,4 +229,50 @@ class DataStreamWindowJoin( .returns(returnTypeInfo) } } + + def createRowTimeInnerJoin( + leftDataStream: DataStream[CRow], + rightDataStream: DataStream[CRow], + returnTypeInfo: TypeInformation[CRow], + joinFunctionName: String, + joinFunctionCode: String, + leftKeys: Array[Int], + rightKeys: Array[Int]): DataStream[CRow] = { + +val rowTimeInnerJoinFunc = new RowTimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + allowedLateness = 0L, + leftSchema.typeInfo, + rightSchema.typeInfo, + joinFunctionName, + joinFunctionCode, + leftTimeIdx, + rightTimeIdx) + +if (!leftKeys.isEmpty) { + leftDataStream +.connect(rightDataStream) +.keyBy(leftKeys, rightKeys) +.transform( + "InnerRowtimeWindowJoin", + returnTypeInfo, + new KeyedCoProcessOperatorWithWatermarkDelay[CRow, CRow, CRow, CRow]( --- End diff -- In the current implementation the `KEY` type would be a `Tuple`, but I think we can just pass `_` here. When we adopt #4732, the key will be `Row`. ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142692599 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, --- End diff -- The time indicies are only needed by `RowTimeBoundedStreamInnerJoin`. They can be removed here. ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r141993041 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, + oppositeLowerBound, + oppositeUpperBound, leftOperatorTime,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r141831721 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, + oppositeLowerBound, + oppositeUpperBound, leftOperatorTime,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r140645274 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, + oppositeLowerBound, + oppositeUpperBound, leftOperatorTime,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r140266297 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, + oppositeLowerBound, + oppositeUpperBound, leftOperatorTime,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r140263201 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.join.JoinTimeIndicator.JoinTimeIndicator +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * @param timeIndicator indicate whether joining on proctime or rowtime + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int, +private val timeIndicator: JoinTimeIndicator) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + //For delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r140262200 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, + oppositeLowerBound, + oppositeUpperBound, leftOperatorTime,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r140255052 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.join.JoinTimeIndicator.JoinTimeIndicator +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * @param timeIndicator indicate whether joining on proctime or rowtime + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int, +private val timeIndicator: JoinTimeIndicator) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + //For delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r140251765 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, + oppositeLowerBound, + oppositeUpperBound, leftOperatorTime,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r140201035 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.join.JoinTimeIndicator.JoinTimeIndicator +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * @param timeIndicator indicate whether joining on proctime or rowtime + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int, +private val timeIndicator: JoinTimeIndicator) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + //For delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r140197925 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, + oppositeLowerBound, + oppositeUpperBound, leftOperatorTime,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139986193 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala --- @@ -383,13 +384,158 @@ class JoinHarnessTest extends HarnessTestBase{ val expectedOutput = new ConcurrentLinkedQueue[Object]() expectedOutput.add(new StreamRecord( - CRow(Row.of(2: JInt, "aaa2", 2: JInt, "bbb7"), true), 7)) + CRow(Row.of(2L: JLong, "aaa2", 2L: JLong, "bbb7"), true), 7)) expectedOutput.add(new StreamRecord( - CRow(Row.of(1: JInt, "aaa3", 1: JInt, "bbb12"), true), 12)) + CRow(Row.of(1L: JLong, "aaa3", 1L: JLong, "bbb12"), true), 12)) verify(expectedOutput, result, new RowResultSortComparator()) testHarness.close() } + /** a.c1 >= b.rowtime - 10 and a.rowtime <= b.rowtime + 20 **/ + @Test + def testCommonRowTimeJoin() { --- End diff -- Oh, sorry I miss this part. Will add soon. ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139983961 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, + oppositeLowerBound, + oppositeUpperBound, leftOperatorTime,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139849018 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row + +/** + * The function to execute row(event) time bounded stream inner-join. + */ +class RowTimeBoundedStreamInnerJoin( +leftLowerBound: Long, +leftUpperBound: Long, +allowedLateness: Long, +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +genJoinFuncName: String, +genJoinFuncCode: String, +leftTimeIdx: Int, +rightTimeIdx: Int) +extends TimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + allowedLateness, + leftType, + rightType, + genJoinFuncName, + genJoinFuncCode, + leftTimeIdx, + rightTimeIdx, + JoinTimeIndicator.ROWTIME) { + + override def checkRowOutOfDate(timeForRow: Long, watermark: Long) = { +timeForRow <= watermark - allowedLateness + } + + override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, CRow]#Context): Unit = { +rightOperatorTime = + if (ctx.timerService().currentWatermark() > 0) ctx.timerService().currentWatermark() --- End diff -- Totally understand ð ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139728162 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row + +/** + * The function to execute row(event) time bounded stream inner-join. + */ +class RowTimeBoundedStreamInnerJoin( +leftLowerBound: Long, +leftUpperBound: Long, +allowedLateness: Long, +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +genJoinFuncName: String, +genJoinFuncCode: String, +leftTimeIdx: Int, +rightTimeIdx: Int) +extends TimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + allowedLateness, + leftType, + rightType, + genJoinFuncName, + genJoinFuncCode, + leftTimeIdx, + rightTimeIdx, + JoinTimeIndicator.ROWTIME) { + + override def checkRowOutOfDate(timeForRow: Long, watermark: Long) = { +timeForRow <= watermark - allowedLateness + } + + override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, CRow]#Context): Unit = { +rightOperatorTime = + if (ctx.timerService().currentWatermark() > 0) ctx.timerService().currentWatermark() --- End diff -- OK, let's keep it here. Changing the value of watermarks won't be possible as it is built into the DataStream API and some users rely on the current behavior. The curse of public APIs ;-) ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139727416 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row + +/** + * The function to execute processing time bounded stream inner-join. + */ +class ProcTimeBoundedStreamInnerJoin( +leftLowerBound: Long, +leftUpperBound: Long, +allowedLateness: Long, +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +genJoinFuncName: String, +genJoinFuncCode: String) +extends TimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + allowedLateness, + leftType, + rightType, + genJoinFuncName, + genJoinFuncCode, + leftTimeIdx = -1, + rightTimeIdx = -1, + JoinTimeIndicator.PROCTIME) { + + override def checkRowOutOfDate(timeForRow: Long, watermark: Long) = false + + override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, CRow]#Context): Unit = { +rightOperatorTime = ctx.timerService().currentProcessingTime() +leftOperatorTime = ctx.timerService().currentProcessingTime() + } + + override def getTimeForLeftStream( + context: CoProcessFunction[CRow, CRow, CRow]#Context, + row: CRow): Long = { +context.timerService().currentProcessingTime() --- End diff -- To be honest, I would not put too much effort into the processing time case, especially not if it affects the performance of event-time processing. Processing time is non-deterministic anyway. The reason I brought this up is because I wasn't sure of the side effects if the the row proctime > operator time. If this is not an issue, we can keep it like this. Otherwise, the easiest solution would be to just add a comment to the invocations of `updateOperatorTime` that this call must be the first call in all processing methods (`processElement()`, `onTimer()`). Since this is just internal API, this should be fine. ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139723927 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, + oppositeLowerBound, + oppositeUpperBound, leftOperatorTime,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139723011 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, + oppositeLowerBound, + oppositeUpperBound, leftOperatorTime,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139722338 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, + oppositeLowerBound, + oppositeUpperBound, leftOperatorTime,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139633281 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, + oppositeLowerBound, + oppositeUpperBound, leftOperatorTime,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139631978 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row + +/** + * The function to execute row(event) time bounded stream inner-join. + */ +class RowTimeBoundedStreamInnerJoin( +leftLowerBound: Long, +leftUpperBound: Long, +allowedLateness: Long, +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +genJoinFuncName: String, +genJoinFuncCode: String, +leftTimeIdx: Int, +rightTimeIdx: Int) +extends TimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + allowedLateness, + leftType, + rightType, + genJoinFuncName, + genJoinFuncCode, + leftTimeIdx, + rightTimeIdx, + JoinTimeIndicator.ROWTIME) { + + override def checkRowOutOfDate(timeForRow: Long, watermark: Long) = { +timeForRow <= watermark - allowedLateness + } + + override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, CRow]#Context): Unit = { +rightOperatorTime = + if (ctx.timerService().currentWatermark() > 0) ctx.timerService().currentWatermark() --- End diff -- After checking the codes, I find out that the number of watermark usages is even larger than that of their updates (i.e., each update is followed by at least one usage). Considering that the number of usage may increase in the future, I suggest to keep this check in the update method. Of course, it would be better if we can use a "safer" initial value for the watermarkð. What do you think? ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139628583 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, + oppositeLowerBound, + oppositeUpperBound, leftOperatorTime,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139611419 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, + oppositeLowerBound, + oppositeUpperBound, leftOperatorTime,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139591493 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row + +/** + * The function to execute row(event) time bounded stream inner-join. + */ +class RowTimeBoundedStreamInnerJoin( +leftLowerBound: Long, +leftUpperBound: Long, +allowedLateness: Long, +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +genJoinFuncName: String, +genJoinFuncCode: String, +leftTimeIdx: Int, +rightTimeIdx: Int) +extends TimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + allowedLateness, + leftType, + rightType, + genJoinFuncName, + genJoinFuncCode, + leftTimeIdx, + rightTimeIdx, + JoinTimeIndicator.ROWTIME) { + + override def checkRowOutOfDate(timeForRow: Long, watermark: Long) = { +timeForRow <= watermark - allowedLateness + } + + override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, CRow]#Context): Unit = { +rightOperatorTime = + if (ctx.timerService().currentWatermark() > 0) ctx.timerService().currentWatermark() --- End diff -- Yes, you are right. I'll move this check to places where we actually use the watermark. ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139585456 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row + +/** + * The function to execute processing time bounded stream inner-join. + */ +class ProcTimeBoundedStreamInnerJoin( +leftLowerBound: Long, +leftUpperBound: Long, +allowedLateness: Long, +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +genJoinFuncName: String, +genJoinFuncCode: String) +extends TimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + allowedLateness, + leftType, + rightType, + genJoinFuncName, + genJoinFuncCode, + leftTimeIdx = -1, + rightTimeIdx = -1, + JoinTimeIndicator.PROCTIME) { + + override def checkRowOutOfDate(timeForRow: Long, watermark: Long) = false + + override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, CRow]#Context): Unit = { +rightOperatorTime = ctx.timerService().currentProcessingTime() +leftOperatorTime = ctx.timerService().currentProcessingTime() + } + + override def getTimeForLeftStream( + context: CoProcessFunction[CRow, CRow, CRow]#Context, + row: CRow): Long = { +context.timerService().currentProcessingTime() --- End diff -- Yes, you are right. To keep them identical, we should return the `leftOperatorTime` here. However, this makes `updateOperatorTime` and `getTimeForLeftStream` coupled, i.e., `updateOperatorTime` must be invoked before `getTimeForLeftStream`. Can we bear this? I've got an idea about the processing time. How about temporarily caching the machine time for the same `StreamRecord` instead of invoking the `System.currentTimeMillis()` each time? ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139439684 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row + +/** + * The function to execute row(event) time bounded stream inner-join. + */ +class RowTimeBoundedStreamInnerJoin( +leftLowerBound: Long, +leftUpperBound: Long, +allowedLateness: Long, +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +genJoinFuncName: String, +genJoinFuncCode: String, +leftTimeIdx: Int, +rightTimeIdx: Int) +extends TimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + allowedLateness, + leftType, + rightType, + genJoinFuncName, + genJoinFuncCode, + leftTimeIdx, + rightTimeIdx, + JoinTimeIndicator.ROWTIME) { + + override def checkRowOutOfDate(timeForRow: Long, watermark: Long) = { +timeForRow <= watermark - allowedLateness + } + + override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, CRow]#Context): Unit = { +rightOperatorTime = + if (ctx.timerService().currentWatermark() > 0) ctx.timerService().currentWatermark() --- End diff -- We need this check to avoid an underflow in case the current watermark is `Long.MIN_VALUE` correct? I see that it is nice to encapsulate the check here, but would we need fewer checks if we would perform this check every time we actually use the watermark? ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139427057 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, + oppositeLowerBound, + oppositeUpperBound, leftOperatorTime,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139400962 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala --- @@ -383,13 +384,158 @@ class JoinHarnessTest extends HarnessTestBase{ val expectedOutput = new ConcurrentLinkedQueue[Object]() expectedOutput.add(new StreamRecord( - CRow(Row.of(2: JInt, "aaa2", 2: JInt, "bbb7"), true), 7)) + CRow(Row.of(2L: JLong, "aaa2", 2L: JLong, "bbb7"), true), 7)) expectedOutput.add(new StreamRecord( - CRow(Row.of(1: JInt, "aaa3", 1: JInt, "bbb12"), true), 12)) + CRow(Row.of(1L: JLong, "aaa3", 1L: JLong, "bbb12"), true), 12)) verify(expectedOutput, result, new RowResultSortComparator()) testHarness.close() } + /** a.c1 >= b.rowtime - 10 and a.rowtime <= b.rowtime + 20 **/ + @Test + def testCommonRowTimeJoin() { + +val joinProcessFunc = new RowTimeBoundedStreamInnerJoin( + -10, 20, 0, rT, rT, "TestJoinFunction", funcCode, 0, 0) + +val operator: KeyedCoProcessOperator[String, CRow, CRow, CRow] = + new KeyedCoProcessOperator[String, CRow, CRow, CRow](joinProcessFunc) +val testHarness: KeyedTwoInputStreamOperatorTestHarness[String, CRow, CRow, CRow] = + new KeyedTwoInputStreamOperatorTestHarness[String, CRow, CRow, CRow]( +operator, +new TupleRowKeySelector[String](1), +new TupleRowKeySelector[String](1), +BasicTypeInfo.STRING_TYPE_INFO, +1, 1, 0) + +testHarness.open() + +// Advance +testHarness.processWatermark1(new Watermark(1)) +testHarness.processWatermark2(new Watermark(1)) + +// Test late data +testHarness.processElement1(new StreamRecord[CRow]( + CRow(Row.of(1L: JLong, "k1"), true), 0)) + +assertEquals(0, testHarness.numEventTimeTimers()) + +testHarness.processElement1(new StreamRecord[CRow]( + CRow(Row.of(2L: JLong, "k1"), true), 0)) +testHarness.processElement2(new StreamRecord[CRow]( + CRow(Row.of(2L: JLong, "k1"), true), 0)) + +assertEquals(2, testHarness.numEventTimeTimers()) +assertEquals(4, testHarness.numKeyedStateEntries()) + +testHarness.processElement1(new StreamRecord[CRow]( + CRow(Row.of(5L: JLong, "k1"), true), 0)) +testHarness.processElement2(new StreamRecord[CRow]( + CRow(Row.of(15L: JLong, "k1"), true), 0)) + +testHarness.processWatermark1(new Watermark(20)) +testHarness.processWatermark2(new Watermark(20)) + +assertEquals(4, testHarness.numKeyedStateEntries()) + +testHarness.processElement1(new StreamRecord[CRow]( + CRow(Row.of(35L: JLong, "k1"), true), 0)) + +testHarness.processWatermark1(new Watermark(38)) +testHarness.processWatermark2(new Watermark(38)) + +testHarness.processElement1(new StreamRecord[CRow]( + CRow(Row.of(40L: JLong, "k2"), true), 0)) +testHarness.processElement2(new StreamRecord[CRow]( + CRow(Row.of(39L: JLong, "k2"), true), 0)) + +assertEquals(6, testHarness.numKeyedStateEntries()) + +testHarness.processWatermark1(new Watermark(61)) +testHarness.processWatermark2(new Watermark(61)) + +assertEquals(4, testHarness.numKeyedStateEntries()) + +val expectedOutput = new ConcurrentLinkedQueue[Object]() +expectedOutput.add(new StreamRecord( + CRow(Row.of(2L: JLong, "k1", 2L: JLong, "k1"), true), 0)) +expectedOutput.add(new StreamRecord( + CRow(Row.of(5L: JLong, "k1", 2L: JLong, "k1"), true), 0)) +expectedOutput.add(new StreamRecord( + CRow(Row.of(5L: JLong, "k1", 15L: JLong, "k1"), true), 0)) +expectedOutput.add(new StreamRecord( + CRow(Row.of(35L: JLong, "k1", 15L: JLong, "k1"), true), 0)) +expectedOutput.add(new StreamRecord( + CRow(Row.of(40L: JLong, "k2", 39L: JLong, "k2"), true), 0)) + +val result = testHarness.getOutput +verify(expectedOutput, result, new RowResultSortComparator()) +testHarness.close() + } + + /** a.rowtime >= b.rowtime - 10 and a.rowtime <= b.rowtime - 7 **/ + @Test + def testNegativeRowTimeJoin() { + +val joinProcessFunc = new RowTimeBoundedStreamInnerJoin( + -10, -7, 0, rT, rT, "TestJoinFunction", funcCode, 0, 0) + +val operator: KeyedCoProcessOperator[String, CRow, CRow, CRow] = + new KeyedCoProcessOperator[String, CRow, CRow, CRow](joinProcessFunc) +val testHarness: KeyedTwoInputStreamOperatorTestHarness[String, CRow, CRow, CRow] = + new KeyedTwoInputStreamOperatorTestHarness[String, CRow,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139402125 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala --- @@ -383,13 +384,158 @@ class JoinHarnessTest extends HarnessTestBase{ val expectedOutput = new ConcurrentLinkedQueue[Object]() expectedOutput.add(new StreamRecord( - CRow(Row.of(2: JInt, "aaa2", 2: JInt, "bbb7"), true), 7)) + CRow(Row.of(2L: JLong, "aaa2", 2L: JLong, "bbb7"), true), 7)) expectedOutput.add(new StreamRecord( - CRow(Row.of(1: JInt, "aaa3", 1: JInt, "bbb12"), true), 12)) + CRow(Row.of(1L: JLong, "aaa3", 1L: JLong, "bbb12"), true), 12)) verify(expectedOutput, result, new RowResultSortComparator()) testHarness.close() } + /** a.c1 >= b.rowtime - 10 and a.rowtime <= b.rowtime + 20 **/ + @Test + def testCommonRowTimeJoin() { --- End diff -- These tests are a good start. We also need a few (2 +/- 1) end-to-end tests (see `org.apache.flink.table.runtime.stream.sql.JoinITCase`) that check the correct translation and execution (incl. result validation). ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139437709 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row + +/** + * The function to execute row(event) time bounded stream inner-join. + */ +class RowTimeBoundedStreamInnerJoin( --- End diff -- make class `final` ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139446305 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, + oppositeLowerBound, + oppositeUpperBound, leftOperatorTime,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139410699 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, + oppositeLowerBound, + oppositeUpperBound, leftOperatorTime,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139426723 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, + oppositeLowerBound, + oppositeUpperBound, leftOperatorTime,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139423665 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, + oppositeLowerBound, + oppositeUpperBound, leftOperatorTime,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139401341 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala --- @@ -383,13 +384,158 @@ class JoinHarnessTest extends HarnessTestBase{ val expectedOutput = new ConcurrentLinkedQueue[Object]() expectedOutput.add(new StreamRecord( - CRow(Row.of(2: JInt, "aaa2", 2: JInt, "bbb7"), true), 7)) + CRow(Row.of(2L: JLong, "aaa2", 2L: JLong, "bbb7"), true), 7)) expectedOutput.add(new StreamRecord( - CRow(Row.of(1: JInt, "aaa3", 1: JInt, "bbb12"), true), 12)) + CRow(Row.of(1L: JLong, "aaa3", 1L: JLong, "bbb12"), true), 12)) verify(expectedOutput, result, new RowResultSortComparator()) testHarness.close() } + /** a.c1 >= b.rowtime - 10 and a.rowtime <= b.rowtime + 20 **/ + @Test + def testCommonRowTimeJoin() { + +val joinProcessFunc = new RowTimeBoundedStreamInnerJoin( + -10, 20, 0, rT, rT, "TestJoinFunction", funcCode, 0, 0) + +val operator: KeyedCoProcessOperator[String, CRow, CRow, CRow] = + new KeyedCoProcessOperator[String, CRow, CRow, CRow](joinProcessFunc) +val testHarness: KeyedTwoInputStreamOperatorTestHarness[String, CRow, CRow, CRow] = + new KeyedTwoInputStreamOperatorTestHarness[String, CRow, CRow, CRow]( +operator, +new TupleRowKeySelector[String](1), +new TupleRowKeySelector[String](1), +BasicTypeInfo.STRING_TYPE_INFO, +1, 1, 0) + +testHarness.open() + +// Advance +testHarness.processWatermark1(new Watermark(1)) +testHarness.processWatermark2(new Watermark(1)) + +// Test late data +testHarness.processElement1(new StreamRecord[CRow]( + CRow(Row.of(1L: JLong, "k1"), true), 0)) + +assertEquals(0, testHarness.numEventTimeTimers()) + +testHarness.processElement1(new StreamRecord[CRow]( + CRow(Row.of(2L: JLong, "k1"), true), 0)) +testHarness.processElement2(new StreamRecord[CRow]( + CRow(Row.of(2L: JLong, "k1"), true), 0)) + +assertEquals(2, testHarness.numEventTimeTimers()) +assertEquals(4, testHarness.numKeyedStateEntries()) + +testHarness.processElement1(new StreamRecord[CRow]( + CRow(Row.of(5L: JLong, "k1"), true), 0)) +testHarness.processElement2(new StreamRecord[CRow]( + CRow(Row.of(15L: JLong, "k1"), true), 0)) + +testHarness.processWatermark1(new Watermark(20)) +testHarness.processWatermark2(new Watermark(20)) + +assertEquals(4, testHarness.numKeyedStateEntries()) + +testHarness.processElement1(new StreamRecord[CRow]( + CRow(Row.of(35L: JLong, "k1"), true), 0)) + +testHarness.processWatermark1(new Watermark(38)) +testHarness.processWatermark2(new Watermark(38)) + +testHarness.processElement1(new StreamRecord[CRow]( + CRow(Row.of(40L: JLong, "k2"), true), 0)) +testHarness.processElement2(new StreamRecord[CRow]( + CRow(Row.of(39L: JLong, "k2"), true), 0)) + +assertEquals(6, testHarness.numKeyedStateEntries()) + +testHarness.processWatermark1(new Watermark(61)) +testHarness.processWatermark2(new Watermark(61)) + +assertEquals(4, testHarness.numKeyedStateEntries()) + +val expectedOutput = new ConcurrentLinkedQueue[Object]() --- End diff -- We should test the boundaries of all join windows (left/right, lower/upper) to ensure we don't have off-by-one bugs. ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139435309 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, + oppositeLowerBound, + oppositeUpperBound, leftOperatorTime,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139435951 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row + +/** + * The function to execute processing time bounded stream inner-join. + */ +class ProcTimeBoundedStreamInnerJoin( +leftLowerBound: Long, +leftUpperBound: Long, +allowedLateness: Long, +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +genJoinFuncName: String, +genJoinFuncCode: String) +extends TimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + allowedLateness, + leftType, + rightType, + genJoinFuncName, + genJoinFuncCode, + leftTimeIdx = -1, + rightTimeIdx = -1, + JoinTimeIndicator.PROCTIME) { + + override def checkRowOutOfDate(timeForRow: Long, watermark: Long) = false + + override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, CRow]#Context): Unit = { +rightOperatorTime = ctx.timerService().currentProcessingTime() +leftOperatorTime = ctx.timerService().currentProcessingTime() + } + + override def getTimeForLeftStream( + context: CoProcessFunction[CRow, CRow, CRow]#Context, + row: CRow): Long = { +context.timerService().currentProcessingTime() --- End diff -- should be `leftOperatorTime` to ensure that both are the same? ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139399103 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -166,16 +185,16 @@ class DataStreamWindowJoin( def createProcTimeInnerJoinFunction( leftDataStream: DataStream[CRow], rightDataStream: DataStream[CRow], + returnTypeInfo: TypeInformation[CRow], joinFunctionName: String, joinFunctionCode: String, leftKeys: Array[Int], rightKeys: Array[Int]): DataStream[CRow] = { -val returnTypeInfo = CRowTypeInfo(schema.typeInfo) - -val procInnerJoinFunc = new ProcTimeWindowInnerJoin( --- End diff -- We should remove the previous implementation if we don't use it anymore. ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139385852 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -57,6 +61,8 @@ class DataStreamWindowJoin( with CommonJoin with DataStreamRel { + lazy val LOG: Logger = LoggerFactory.getLogger(getClass) --- End diff -- use `Logging` trait instead of creating logger yourself. ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139401824 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala --- @@ -383,13 +384,158 @@ class JoinHarnessTest extends HarnessTestBase{ val expectedOutput = new ConcurrentLinkedQueue[Object]() expectedOutput.add(new StreamRecord( - CRow(Row.of(2: JInt, "aaa2", 2: JInt, "bbb7"), true), 7)) + CRow(Row.of(2L: JLong, "aaa2", 2L: JLong, "bbb7"), true), 7)) expectedOutput.add(new StreamRecord( - CRow(Row.of(1: JInt, "aaa3", 1: JInt, "bbb12"), true), 12)) + CRow(Row.of(1L: JLong, "aaa3", 1L: JLong, "bbb12"), true), 12)) verify(expectedOutput, result, new RowResultSortComparator()) testHarness.close() } + /** a.c1 >= b.rowtime - 10 and a.rowtime <= b.rowtime + 20 **/ + @Test + def testCommonRowTimeJoin() { + +val joinProcessFunc = new RowTimeBoundedStreamInnerJoin( + -10, 20, 0, rT, rT, "TestJoinFunction", funcCode, 0, 0) + +val operator: KeyedCoProcessOperator[String, CRow, CRow, CRow] = + new KeyedCoProcessOperator[String, CRow, CRow, CRow](joinProcessFunc) +val testHarness: KeyedTwoInputStreamOperatorTestHarness[String, CRow, CRow, CRow] = + new KeyedTwoInputStreamOperatorTestHarness[String, CRow, CRow, CRow]( +operator, +new TupleRowKeySelector[String](1), +new TupleRowKeySelector[String](1), +BasicTypeInfo.STRING_TYPE_INFO, +1, 1, 0) + +testHarness.open() + +// Advance +testHarness.processWatermark1(new Watermark(1)) +testHarness.processWatermark2(new Watermark(1)) + +// Test late data +testHarness.processElement1(new StreamRecord[CRow]( + CRow(Row.of(1L: JLong, "k1"), true), 0)) + +assertEquals(0, testHarness.numEventTimeTimers()) + +testHarness.processElement1(new StreamRecord[CRow]( + CRow(Row.of(2L: JLong, "k1"), true), 0)) +testHarness.processElement2(new StreamRecord[CRow]( + CRow(Row.of(2L: JLong, "k1"), true), 0)) + +assertEquals(2, testHarness.numEventTimeTimers()) +assertEquals(4, testHarness.numKeyedStateEntries()) + +testHarness.processElement1(new StreamRecord[CRow]( + CRow(Row.of(5L: JLong, "k1"), true), 0)) +testHarness.processElement2(new StreamRecord[CRow]( + CRow(Row.of(15L: JLong, "k1"), true), 0)) + +testHarness.processWatermark1(new Watermark(20)) +testHarness.processWatermark2(new Watermark(20)) + +assertEquals(4, testHarness.numKeyedStateEntries()) + +testHarness.processElement1(new StreamRecord[CRow]( + CRow(Row.of(35L: JLong, "k1"), true), 0)) + +testHarness.processWatermark1(new Watermark(38)) +testHarness.processWatermark2(new Watermark(38)) + +testHarness.processElement1(new StreamRecord[CRow]( + CRow(Row.of(40L: JLong, "k2"), true), 0)) +testHarness.processElement2(new StreamRecord[CRow]( + CRow(Row.of(39L: JLong, "k2"), true), 0)) + +assertEquals(6, testHarness.numKeyedStateEntries()) + +testHarness.processWatermark1(new Watermark(61)) +testHarness.processWatermark2(new Watermark(61)) + +assertEquals(4, testHarness.numKeyedStateEntries()) + +val expectedOutput = new ConcurrentLinkedQueue[Object]() --- End diff -- Add multiple rows for the same key and time to validate that this case is correctly handled. It might make sense to add another string field to the data with a unique value ("left1", ...) to make the input and output records easier to compare. ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139407531 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -55,18 +49,18 @@ import org.apache.flink.util.Collector * @param timeIndicator indicate whether joining on proctime or rowtime * */ -class TimeBoundedStreamInnerJoin( - private val leftLowerBound: Long, - private val leftUpperBound: Long, - private val allowedLateness: Long, - private val leftType: TypeInformation[Row], - private val rightType: TypeInformation[Row], - private val genJoinFuncName: String, - private val genJoinFuncCode: String, - private val leftTimeIdx: Int, - private val rightTimeIdx: Int, - private val timeIndicator: JoinTimeIndicator) - extends CoProcessFunction[CRow, CRow, CRow] +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int, +private val timeIndicator: JoinTimeIndicator) --- End diff -- We can remove `timeIndicator` and `JoinTimeIndicator`. They are only used for the state names which do not need to distinguish row and processing time. ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139387007 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -166,16 +185,16 @@ class DataStreamWindowJoin( def createProcTimeInnerJoinFunction( --- End diff -- rename method to `createProcTimeInnerJoin()` as it does not return the function but a joined stream. ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139386887 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -196,53 +215,69 @@ class DataStreamWindowJoin( } } + def createEmptyInnerJoinFunction( + leftDataStream: DataStream[CRow], + rightDataStream: DataStream[CRow], + returnTypeInfo: TypeInformation[CRow]) = { +leftDataStream.connect(rightDataStream).process( + new CoProcessFunction[CRow, CRow, CRow] { +override def processElement1( + value: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]) = { + //Do nothing. +} +override def processElement2( + value: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]) = { + //Do nothing. +} + }) + } def createRowTimeInnerJoinFunction( --- End diff -- rename method to `createRowTimeInnerJoin()` as it does not return the function but a joined stream. ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139436010 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row + +/** + * The function to execute processing time bounded stream inner-join. + */ +class ProcTimeBoundedStreamInnerJoin( +leftLowerBound: Long, +leftUpperBound: Long, +allowedLateness: Long, +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +genJoinFuncName: String, +genJoinFuncCode: String) +extends TimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + allowedLateness, + leftType, + rightType, + genJoinFuncName, + genJoinFuncCode, + leftTimeIdx = -1, + rightTimeIdx = -1, + JoinTimeIndicator.PROCTIME) { + + override def checkRowOutOfDate(timeForRow: Long, watermark: Long) = false + + override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, CRow]#Context): Unit = { +rightOperatorTime = ctx.timerService().currentProcessingTime() +leftOperatorTime = ctx.timerService().currentProcessingTime() + } + + override def getTimeForLeftStream( + context: CoProcessFunction[CRow, CRow, CRow]#Context, + row: CRow): Long = { +context.timerService().currentProcessingTime() + } + + override def getTimeForRightStream( + context: CoProcessFunction[CRow, CRow, CRow]#Context, + row: CRow): Long = { +context.timerService().currentProcessingTime() --- End diff -- should be `rightOperatorTime` to ensure that both are the same? ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139434334 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, + oppositeLowerBound, + oppositeUpperBound, leftOperatorTime,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139436599 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row + +/** + * The function to execute processing time bounded stream inner-join. + */ +class ProcTimeBoundedStreamInnerJoin( --- End diff -- make class `final` to support inlining of method. ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139424544 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, + oppositeLowerBound, + oppositeUpperBound, leftOperatorTime,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139386347 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -196,53 +215,69 @@ class DataStreamWindowJoin( } } + def createEmptyInnerJoinFunction( + leftDataStream: DataStream[CRow], + rightDataStream: DataStream[CRow], + returnTypeInfo: TypeInformation[CRow]) = { +leftDataStream.connect(rightDataStream).process( + new CoProcessFunction[CRow, CRow, CRow] { +override def processElement1( + value: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]) = { + //Do nothing. +} +override def processElement2( + value: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]) = { + //Do nothing. +} + }) + } def createRowTimeInnerJoinFunction( --- End diff -- Add newline ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139386829 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -196,53 +215,69 @@ class DataStreamWindowJoin( } } + def createEmptyInnerJoinFunction( --- End diff -- rename method to `createEmptyInnerJoin()` as it does not return the function but a joined stream. ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139387143 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -196,53 +215,69 @@ class DataStreamWindowJoin( } } + def createEmptyInnerJoinFunction( --- End diff -- Move method above `createProcTimeInnerJoinFunction()` ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139385974 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -117,6 +123,11 @@ class DataStreamWindowJoin( val leftKeys = joinInfo.leftKeys.toIntArray val rightKeys = joinInfo.rightKeys.toIntArray +val relativeWindowSize = leftUpperBound - leftLowerBound + +val returnTypeInfo = CRowTypeInfo(schema.typeInfo) + --- End diff -- rm newline ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139430727 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row + +/** + * The function to execute processing time bounded stream inner-join. + */ +class ProcTimeBoundedStreamInnerJoin( +leftLowerBound: Long, +leftUpperBound: Long, +allowedLateness: Long, +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +genJoinFuncName: String, +genJoinFuncCode: String) +extends TimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + allowedLateness, + leftType, + rightType, + genJoinFuncName, + genJoinFuncCode, + leftTimeIdx = -1, + rightTimeIdx = -1, + JoinTimeIndicator.PROCTIME) { + + override def checkRowOutOfDate(timeForRow: Long, watermark: Long) = false + + override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, CRow]#Context): Unit = { +rightOperatorTime = ctx.timerService().currentProcessingTime() +leftOperatorTime = ctx.timerService().currentProcessingTime() --- End diff -- `leftOperatorTime = rightOperatorTime` to ensure that both sides have the same time ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139400726 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala --- @@ -383,13 +384,158 @@ class JoinHarnessTest extends HarnessTestBase{ val expectedOutput = new ConcurrentLinkedQueue[Object]() expectedOutput.add(new StreamRecord( - CRow(Row.of(2: JInt, "aaa2", 2: JInt, "bbb7"), true), 7)) + CRow(Row.of(2L: JLong, "aaa2", 2L: JLong, "bbb7"), true), 7)) expectedOutput.add(new StreamRecord( - CRow(Row.of(1: JInt, "aaa3", 1: JInt, "bbb12"), true), 12)) + CRow(Row.of(1L: JLong, "aaa3", 1L: JLong, "bbb12"), true), 12)) verify(expectedOutput, result, new RowResultSortComparator()) testHarness.close() } + /** a.c1 >= b.rowtime - 10 and a.rowtime <= b.rowtime + 20 **/ + @Test + def testCommonRowTimeJoin() { + +val joinProcessFunc = new RowTimeBoundedStreamInnerJoin( + -10, 20, 0, rT, rT, "TestJoinFunction", funcCode, 0, 0) + +val operator: KeyedCoProcessOperator[String, CRow, CRow, CRow] = + new KeyedCoProcessOperator[String, CRow, CRow, CRow](joinProcessFunc) +val testHarness: KeyedTwoInputStreamOperatorTestHarness[String, CRow, CRow, CRow] = + new KeyedTwoInputStreamOperatorTestHarness[String, CRow, CRow, CRow]( +operator, +new TupleRowKeySelector[String](1), +new TupleRowKeySelector[String](1), +BasicTypeInfo.STRING_TYPE_INFO, +1, 1, 0) + +testHarness.open() + +// Advance +testHarness.processWatermark1(new Watermark(1)) +testHarness.processWatermark2(new Watermark(1)) + +// Test late data --- End diff -- A few more comments like this would be good ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139407845 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], --- End diff -- change to `Types.LONG.asInstanceOf[TypeInformation[Long]]` ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r137272749 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,533 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.text.SimpleDateFormat +import java.util +import java.util.Map.Entry +import java.util.{Date, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.join.JoinTimeIndicator.JoinTimeIndicator +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * + * Sample criteria: + * + * L.time between R.time + X and R.time + Y + * or AND R.time between L.time - Y and L.time - X + * + * @param leftLowerBound X + * @param leftUpperBound Y + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * @param timeIndicator indicate whether joining on proctime or rowtime + * + */ +class TimeBoundedStreamInnerJoin( + private val leftLowerBound: Long, + private val leftUpperBound: Long, + private val allowedLateness: Long, + private val leftType: TypeInformation[Row], + private val rightType: TypeInformation[Row], + private val genJoinFuncName: String, + private val genJoinFuncCode: String, + private val leftTimeIdx: Int, + private val rightTimeIdx: Int, + private val timeIndicator: JoinTimeIndicator) + extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store the left stream records + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store right stream records + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private val relativeWindowSize = rightRelativeSize + leftRelativeSize + + private var leftOperatorTime: Long = 0L + private var rightOperatorTime: Long = 0L + + private var backPressureSuggestion: Long = 0L + + if (relativeWindowSize <= 0) { +LOG.warn("The relative window size is non-positive, please check the join conditions.") + } + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + + /** +* For holding back watermarks. +* +* @return the maximum delay
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r137229663 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,533 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.text.SimpleDateFormat +import java.util +import java.util.Map.Entry +import java.util.{Date, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.join.JoinTimeIndicator.JoinTimeIndicator +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * + * Sample criteria: + * + * L.time between R.time + X and R.time + Y + * or AND R.time between L.time - Y and L.time - X + * + * @param leftLowerBound X + * @param leftUpperBound Y + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * @param timeIndicator indicate whether joining on proctime or rowtime + * + */ +class TimeBoundedStreamInnerJoin( + private val leftLowerBound: Long, + private val leftUpperBound: Long, + private val allowedLateness: Long, + private val leftType: TypeInformation[Row], + private val rightType: TypeInformation[Row], + private val genJoinFuncName: String, + private val genJoinFuncCode: String, + private val leftTimeIdx: Int, + private val rightTimeIdx: Int, + private val timeIndicator: JoinTimeIndicator) + extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store the left stream records + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store right stream records + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private val relativeWindowSize = rightRelativeSize + leftRelativeSize + + private var leftOperatorTime: Long = 0L + private var rightOperatorTime: Long = 0L + + private var backPressureSuggestion: Long = 0L + + if (relativeWindowSize <= 0) { +LOG.warn("The relative window size is non-positive, please check the join conditions.") + } + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + + /** +* For holding back watermarks. +* +* @return the maximum delay
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r137229356 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,533 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.text.SimpleDateFormat +import java.util +import java.util.Map.Entry +import java.util.{Date, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.join.JoinTimeIndicator.JoinTimeIndicator +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * + * Sample criteria: + * + * L.time between R.time + X and R.time + Y + * or AND R.time between L.time - Y and L.time - X + * + * @param leftLowerBound X + * @param leftUpperBound Y + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * @param timeIndicator indicate whether joining on proctime or rowtime + * + */ +class TimeBoundedStreamInnerJoin( + private val leftLowerBound: Long, + private val leftUpperBound: Long, + private val allowedLateness: Long, + private val leftType: TypeInformation[Row], + private val rightType: TypeInformation[Row], + private val genJoinFuncName: String, + private val genJoinFuncCode: String, + private val leftTimeIdx: Int, + private val rightTimeIdx: Int, + private val timeIndicator: JoinTimeIndicator) + extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store the left stream records + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store right stream records + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private val relativeWindowSize = rightRelativeSize + leftRelativeSize + + private var leftOperatorTime: Long = 0L + private var rightOperatorTime: Long = 0L + + private var backPressureSuggestion: Long = 0L + + if (relativeWindowSize <= 0) { +LOG.warn("The relative window size is non-positive, please check the join conditions.") + } + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + + /** +* For holding back watermarks. +* +* @return the maximum delay
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r137228034 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,533 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.text.SimpleDateFormat +import java.util +import java.util.Map.Entry +import java.util.{Date, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.join.JoinTimeIndicator.JoinTimeIndicator +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * + * Sample criteria: + * + * L.time between R.time + X and R.time + Y + * or AND R.time between L.time - Y and L.time - X + * + * @param leftLowerBound X + * @param leftUpperBound Y + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * @param timeIndicator indicate whether joining on proctime or rowtime + * + */ +class TimeBoundedStreamInnerJoin( + private val leftLowerBound: Long, + private val leftUpperBound: Long, + private val allowedLateness: Long, + private val leftType: TypeInformation[Row], + private val rightType: TypeInformation[Row], + private val genJoinFuncName: String, + private val genJoinFuncCode: String, + private val leftTimeIdx: Int, + private val rightTimeIdx: Int, + private val timeIndicator: JoinTimeIndicator) + extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store the left stream records + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store right stream records + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private val relativeWindowSize = rightRelativeSize + leftRelativeSize + + private var leftOperatorTime: Long = 0L + private var rightOperatorTime: Long = 0L + + private var backPressureSuggestion: Long = 0L + + if (relativeWindowSize <= 0) { +LOG.warn("The relative window size is non-positive, please check the join conditions.") + } + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + + /** +* For holding back watermarks. +* +* @return the maximum delay
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r137227871 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,533 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.text.SimpleDateFormat +import java.util +import java.util.Map.Entry +import java.util.{Date, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.join.JoinTimeIndicator.JoinTimeIndicator +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * + * Sample criteria: + * + * L.time between R.time + X and R.time + Y + * or AND R.time between L.time - Y and L.time - X + * + * @param leftLowerBound X + * @param leftUpperBound Y + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * @param timeIndicator indicate whether joining on proctime or rowtime + * + */ +class TimeBoundedStreamInnerJoin( + private val leftLowerBound: Long, + private val leftUpperBound: Long, + private val allowedLateness: Long, + private val leftType: TypeInformation[Row], + private val rightType: TypeInformation[Row], + private val genJoinFuncName: String, + private val genJoinFuncCode: String, + private val leftTimeIdx: Int, + private val rightTimeIdx: Int, + private val timeIndicator: JoinTimeIndicator) + extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store the left stream records + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store right stream records + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private val relativeWindowSize = rightRelativeSize + leftRelativeSize + + private var leftOperatorTime: Long = 0L + private var rightOperatorTime: Long = 0L + + private var backPressureSuggestion: Long = 0L + + if (relativeWindowSize <= 0) { +LOG.warn("The relative window size is non-positive, please check the join conditions.") + } + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + + /** +* For holding back watermarks. +* +* @return the maximum delay
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r137227455 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,533 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.text.SimpleDateFormat +import java.util +import java.util.Map.Entry +import java.util.{Date, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.join.JoinTimeIndicator.JoinTimeIndicator +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * + * Sample criteria: + * + * L.time between R.time + X and R.time + Y + * or AND R.time between L.time - Y and L.time - X + * + * @param leftLowerBound X + * @param leftUpperBound Y + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * @param timeIndicator indicate whether joining on proctime or rowtime + * + */ +class TimeBoundedStreamInnerJoin( + private val leftLowerBound: Long, + private val leftUpperBound: Long, + private val allowedLateness: Long, + private val leftType: TypeInformation[Row], + private val rightType: TypeInformation[Row], + private val genJoinFuncName: String, + private val genJoinFuncCode: String, + private val leftTimeIdx: Int, + private val rightTimeIdx: Int, + private val timeIndicator: JoinTimeIndicator) + extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store the left stream records + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store right stream records + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private val relativeWindowSize = rightRelativeSize + leftRelativeSize + + private var leftOperatorTime: Long = 0L + private var rightOperatorTime: Long = 0L + + private var backPressureSuggestion: Long = 0L --- End diff -- AFAIK, there are no plans for a feature like that. This would also need to be integrated with the DataStream API and all connectors which means it is a major change and requires careful design. I would remove the code for now. ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r137227189 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -184,4 +195,54 @@ class DataStreamWindowJoin( .returns(returnTypeInfo) } } + + def createRowTimeInnerJoinFunction( +leftDataStream: DataStream[CRow], +rightDataStream: DataStream[CRow], +joinFunctionName: String, +joinFunctionCode: String, +leftKeys: Array[Int], +rightKeys: Array[Int]): DataStream[CRow] = { + +val returnTypeInfo = CRowTypeInfo(schema.typeInfo) + +val rowTimeInnerJoinFunc = new TimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + 0L, + leftSchema.typeInfo, + rightSchema.typeInfo, + joinFunctionName, + joinFunctionCode, + leftTimeIdx, + rightTimeIdx, + JoinTimeIndicator.ROWTIME +) + +if (!leftKeys.isEmpty) { + leftDataStream +.connect(rightDataStream) +.keyBy(leftKeys, rightKeys) +.transform( + "rowTimeInnerJoinFunc", + returnTypeInfo, + new KeyedCoProcessOperatorWithWatermarkDelay[CRow, CRow, CRow, CRow]( +rowTimeInnerJoinFunc, +rowTimeInnerJoinFunc.getMaxOutputDelay) --- End diff -- OK, let's keep `getMaxOutputDelay()` but we should improve the documentation of the method and make clear that this returns the maximum interval between receiving a row and emitting it (as part of a joined row). ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r137225735 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -184,4 +195,54 @@ class DataStreamWindowJoin( .returns(returnTypeInfo) } } + + def createRowTimeInnerJoinFunction( +leftDataStream: DataStream[CRow], +rightDataStream: DataStream[CRow], +joinFunctionName: String, +joinFunctionCode: String, +leftKeys: Array[Int], +rightKeys: Array[Int]): DataStream[CRow] = { + +val returnTypeInfo = CRowTypeInfo(schema.typeInfo) + +val rowTimeInnerJoinFunc = new TimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + 0L, + leftSchema.typeInfo, + rightSchema.typeInfo, + joinFunctionName, + joinFunctionCode, + leftTimeIdx, + rightTimeIdx, + JoinTimeIndicator.ROWTIME +) + +if (!leftKeys.isEmpty) { + leftDataStream +.connect(rightDataStream) +.keyBy(leftKeys, rightKeys) +.transform( + "rowTimeInnerJoinFunc", --- End diff -- I think it is fine to call it window join. The join predicate effectively defines a window for each row a join window on the opposite stream. In fact, these windows are more similar to `OVER` windows than group windows such as `TUMBLE` or `HOP`. ---