[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r143211000 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +*/ +
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r143208392 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +*/ +
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142891284 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; --- End diff -- Forgive me for the repeated silly sequelae ð¤¦ââï¸ ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r141993041 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, +
[GitHub] flink issue #4625: [FLINK-6233] [table] Support time-bounded stream inner jo...
Github user xccui commented on the issue: https://github.com/apache/flink/pull/4625 Hi @fhueske, the PR has been updated. Temporarily, I keep the logic for dealing with the late data, as well as the fine-grained cache. For the late data semantics problem, I think we need to rethink it and make a final decision (maybe we should consult others). For the cache optimization problem, I want to leave it a future work. ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r140645274 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, +
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r140266297 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, +
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r140255052 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.join.JoinTimeIndicator.JoinTimeIndicator +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * @param timeIndicator indicate whether joining on proctime or rowtime + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int, +private val timeIndicator: JoinTimeIndicator) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + //For delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r140251765 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, +
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139986193 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala --- @@ -383,13 +384,158 @@ class JoinHarnessTest extends HarnessTestBase{ val expectedOutput = new ConcurrentLinkedQueue[Object]() expectedOutput.add(new StreamRecord( - CRow(Row.of(2: JInt, "aaa2", 2: JInt, "bbb7"), true), 7)) + CRow(Row.of(2L: JLong, "aaa2", 2L: JLong, "bbb7"), true), 7)) expectedOutput.add(new StreamRecord( - CRow(Row.of(1: JInt, "aaa3", 1: JInt, "bbb12"), true), 12)) + CRow(Row.of(1L: JLong, "aaa3", 1L: JLong, "bbb12"), true), 12)) verify(expectedOutput, result, new RowResultSortComparator()) testHarness.close() } + /** a.c1 >= b.rowtime - 10 and a.rowtime <= b.rowtime + 20 **/ + @Test + def testCommonRowTimeJoin() { --- End diff -- Oh, sorry I miss this part. Will add soon. ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139983961 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, +
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139849018 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row + +/** + * The function to execute row(event) time bounded stream inner-join. + */ +class RowTimeBoundedStreamInnerJoin( +leftLowerBound: Long, +leftUpperBound: Long, +allowedLateness: Long, +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +genJoinFuncName: String, +genJoinFuncCode: String, +leftTimeIdx: Int, +rightTimeIdx: Int) +extends TimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + allowedLateness, + leftType, + rightType, + genJoinFuncName, + genJoinFuncCode, + leftTimeIdx, + rightTimeIdx, + JoinTimeIndicator.ROWTIME) { + + override def checkRowOutOfDate(timeForRow: Long, watermark: Long) = { +timeForRow <= watermark - allowedLateness + } + + override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, CRow]#Context): Unit = { +rightOperatorTime = + if (ctx.timerService().currentWatermark() > 0) ctx.timerService().currentWatermark() --- End diff -- Totally understand ð ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139633281 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, +
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139631978 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row + +/** + * The function to execute row(event) time bounded stream inner-join. + */ +class RowTimeBoundedStreamInnerJoin( +leftLowerBound: Long, +leftUpperBound: Long, +allowedLateness: Long, +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +genJoinFuncName: String, +genJoinFuncCode: String, +leftTimeIdx: Int, +rightTimeIdx: Int) +extends TimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + allowedLateness, + leftType, + rightType, + genJoinFuncName, + genJoinFuncCode, + leftTimeIdx, + rightTimeIdx, + JoinTimeIndicator.ROWTIME) { + + override def checkRowOutOfDate(timeForRow: Long, watermark: Long) = { +timeForRow <= watermark - allowedLateness + } + + override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, CRow]#Context): Unit = { +rightOperatorTime = + if (ctx.timerService().currentWatermark() > 0) ctx.timerService().currentWatermark() --- End diff -- After checking the codes, I find out that the number of watermark usages is even larger than that of their updates (i.e., each update is followed by at least one usage). Considering that the number of usage may increase in the future, I suggest to keep this check in the update method. Of course, it would be better if we can use a "safer" initial value for the watermarkð. What do you think? ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139628583 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, +
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139611419 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, +
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139591493 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row + +/** + * The function to execute row(event) time bounded stream inner-join. + */ +class RowTimeBoundedStreamInnerJoin( +leftLowerBound: Long, +leftUpperBound: Long, +allowedLateness: Long, +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +genJoinFuncName: String, +genJoinFuncCode: String, +leftTimeIdx: Int, +rightTimeIdx: Int) +extends TimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + allowedLateness, + leftType, + rightType, + genJoinFuncName, + genJoinFuncCode, + leftTimeIdx, + rightTimeIdx, + JoinTimeIndicator.ROWTIME) { + + override def checkRowOutOfDate(timeForRow: Long, watermark: Long) = { +timeForRow <= watermark - allowedLateness + } + + override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, CRow]#Context): Unit = { +rightOperatorTime = + if (ctx.timerService().currentWatermark() > 0) ctx.timerService().currentWatermark() --- End diff -- Yes, you are right. I'll move this check to places where we actually use the watermark. ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139585456 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row + +/** + * The function to execute processing time bounded stream inner-join. + */ +class ProcTimeBoundedStreamInnerJoin( +leftLowerBound: Long, +leftUpperBound: Long, +allowedLateness: Long, +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +genJoinFuncName: String, +genJoinFuncCode: String) +extends TimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + allowedLateness, + leftType, + rightType, + genJoinFuncName, + genJoinFuncCode, + leftTimeIdx = -1, + rightTimeIdx = -1, + JoinTimeIndicator.PROCTIME) { + + override def checkRowOutOfDate(timeForRow: Long, watermark: Long) = false + + override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, CRow]#Context): Unit = { +rightOperatorTime = ctx.timerService().currentProcessingTime() +leftOperatorTime = ctx.timerService().currentProcessingTime() + } + + override def getTimeForLeftStream( + context: CoProcessFunction[CRow, CRow, CRow]#Context, + row: CRow): Long = { +context.timerService().currentProcessingTime() --- End diff -- Yes, you are right. To keep them identical, we should return the `leftOperatorTime` here. However, this makes `updateOperatorTime` and `getTimeForLeftStream` coupled, i.e., `updateOperatorTime` must be invoked before `getTimeForLeftStream`. Can we bear this? I've got an idea about the processing time. How about temporarily caching the machine time for the same `StreamRecord` instead of invoking the `System.currentTimeMillis()` each time? ---
[GitHub] flink issue #4625: [FLINK-6233] [table] Support time-bounded stream inner jo...
Github user xccui commented on the issue: https://github.com/apache/flink/pull/4625 Hi @fhueske, the PR has been updated. However, there are still some unfinished tasks, e.g., optimise the data caching and cleaning up policies and distinguish the `<` and `<=`signs. I want to leave them as future works. What do you think? BTW, I find most of the recent PRs are failed on the build jobs with `TEST = "misc"`. I'm not sure if there exist some problems with the CI. Here is a [build log](https://s3.amazonaws.com/archive.travis-ci.org/jobs/274508987/log.txt?X-Amz-Expires=30&X-Amz-Date=20170912T130101Z&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAJRYRXRSVGNKPKO5A/20170912/us-east-1/s3/aws4_request&X-Amz-SignedHeaders=host&X-Amz-Signature=af6d9a141ef245be9a7393e69c484785340a9bb1f1030e1039d4ed44f0344a42) for this PR. ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r137272749 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,533 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.text.SimpleDateFormat +import java.util +import java.util.Map.Entry +import java.util.{Date, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.join.JoinTimeIndicator.JoinTimeIndicator +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * + * Sample criteria: + * + * L.time between R.time + X and R.time + Y + * or AND R.time between L.time - Y and L.time - X + * + * @param leftLowerBound X + * @param leftUpperBound Y + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * @param timeIndicator indicate whether joining on proctime or rowtime + * + */ +class TimeBoundedStreamInnerJoin( + private val leftLowerBound: Long, + private val leftUpperBound: Long, + private val allowedLateness: Long, + private val leftType: TypeInformation[Row], + private val rightType: TypeInformation[Row], + private val genJoinFuncName: String, + private val genJoinFuncCode: String, + private val leftTimeIdx: Int, + private val rightTimeIdx: Int, + private val timeIndicator: JoinTimeIndicator) + extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store the left stream records + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store right stream records + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private val relativeWindowSize = rightRelativeSize + leftRelativeSize + + private var leftOperatorTime: Long = 0L + private var rightOperatorTime: Long = 0L + + private var backPressureSuggestion: Long = 0L + + if (relativeWindowSize <= 0) { +LOG.warn("The relative window size is non-positive, please check the join conditions.") + } + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + + /** +* For holding back watermarks
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r137205581 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,533 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.text.SimpleDateFormat +import java.util +import java.util.Map.Entry +import java.util.{Date, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.join.JoinTimeIndicator.JoinTimeIndicator +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * + * Sample criteria: + * + * L.time between R.time + X and R.time + Y + * or AND R.time between L.time - Y and L.time - X + * + * @param leftLowerBound X + * @param leftUpperBound Y + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * @param timeIndicator indicate whether joining on proctime or rowtime + * + */ +class TimeBoundedStreamInnerJoin( + private val leftLowerBound: Long, + private val leftUpperBound: Long, + private val allowedLateness: Long, + private val leftType: TypeInformation[Row], + private val rightType: TypeInformation[Row], + private val genJoinFuncName: String, + private val genJoinFuncCode: String, + private val leftTimeIdx: Int, + private val rightTimeIdx: Int, + private val timeIndicator: JoinTimeIndicator) + extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store the left stream records + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store right stream records + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private val relativeWindowSize = rightRelativeSize + leftRelativeSize + + private var leftOperatorTime: Long = 0L + private var rightOperatorTime: Long = 0L + + private var backPressureSuggestion: Long = 0L + + if (relativeWindowSize <= 0) { +LOG.warn("The relative window size is non-positive, please check the join conditions.") + } + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + + /** +* For holding back watermarks
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r137201317 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,533 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.text.SimpleDateFormat +import java.util +import java.util.Map.Entry +import java.util.{Date, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.join.JoinTimeIndicator.JoinTimeIndicator +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * + * Sample criteria: + * + * L.time between R.time + X and R.time + Y + * or AND R.time between L.time - Y and L.time - X + * + * @param leftLowerBound X + * @param leftUpperBound Y + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * @param timeIndicator indicate whether joining on proctime or rowtime + * + */ +class TimeBoundedStreamInnerJoin( + private val leftLowerBound: Long, + private val leftUpperBound: Long, + private val allowedLateness: Long, + private val leftType: TypeInformation[Row], + private val rightType: TypeInformation[Row], + private val genJoinFuncName: String, + private val genJoinFuncCode: String, + private val leftTimeIdx: Int, + private val rightTimeIdx: Int, + private val timeIndicator: JoinTimeIndicator) + extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store the left stream records + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store right stream records + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private val relativeWindowSize = rightRelativeSize + leftRelativeSize + + private var leftOperatorTime: Long = 0L + private var rightOperatorTime: Long = 0L + + private var backPressureSuggestion: Long = 0L + + if (relativeWindowSize <= 0) { +LOG.warn("The relative window size is non-positive, please check the join conditions.") + } + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + + /** +* For holding back watermarks
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r137168799 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,533 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.text.SimpleDateFormat +import java.util +import java.util.Map.Entry +import java.util.{Date, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.join.JoinTimeIndicator.JoinTimeIndicator +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * + * Sample criteria: + * + * L.time between R.time + X and R.time + Y + * or AND R.time between L.time - Y and L.time - X + * + * @param leftLowerBound X + * @param leftUpperBound Y + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * @param timeIndicator indicate whether joining on proctime or rowtime + * + */ +class TimeBoundedStreamInnerJoin( + private val leftLowerBound: Long, + private val leftUpperBound: Long, + private val allowedLateness: Long, + private val leftType: TypeInformation[Row], + private val rightType: TypeInformation[Row], + private val genJoinFuncName: String, + private val genJoinFuncCode: String, + private val leftTimeIdx: Int, + private val rightTimeIdx: Int, + private val timeIndicator: JoinTimeIndicator) + extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store the left stream records + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store right stream records + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private val relativeWindowSize = rightRelativeSize + leftRelativeSize + + private var leftOperatorTime: Long = 0L + private var rightOperatorTime: Long = 0L + + private var backPressureSuggestion: Long = 0L + + if (relativeWindowSize <= 0) { +LOG.warn("The relative window size is non-positive, please check the join conditions.") + } + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + + /** +* For holding back watermarks
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r137163327 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,533 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.text.SimpleDateFormat +import java.util +import java.util.Map.Entry +import java.util.{Date, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.join.JoinTimeIndicator.JoinTimeIndicator +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * + * Sample criteria: + * + * L.time between R.time + X and R.time + Y + * or AND R.time between L.time - Y and L.time - X + * + * @param leftLowerBound X + * @param leftUpperBound Y + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * @param timeIndicator indicate whether joining on proctime or rowtime + * + */ +class TimeBoundedStreamInnerJoin( + private val leftLowerBound: Long, + private val leftUpperBound: Long, + private val allowedLateness: Long, + private val leftType: TypeInformation[Row], + private val rightType: TypeInformation[Row], + private val genJoinFuncName: String, + private val genJoinFuncCode: String, + private val leftTimeIdx: Int, + private val rightTimeIdx: Int, + private val timeIndicator: JoinTimeIndicator) + extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store the left stream records + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store right stream records + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private val relativeWindowSize = rightRelativeSize + leftRelativeSize + + private var leftOperatorTime: Long = 0L + private var rightOperatorTime: Long = 0L + + private var backPressureSuggestion: Long = 0L + + if (relativeWindowSize <= 0) { +LOG.warn("The relative window size is non-positive, please check the join conditions.") + } + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + + /** +* For holding back watermarks
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r137158818 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,533 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.text.SimpleDateFormat +import java.util +import java.util.Map.Entry +import java.util.{Date, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.join.JoinTimeIndicator.JoinTimeIndicator +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * + * Sample criteria: + * + * L.time between R.time + X and R.time + Y + * or AND R.time between L.time - Y and L.time - X + * + * @param leftLowerBound X + * @param leftUpperBound Y + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * @param timeIndicator indicate whether joining on proctime or rowtime + * + */ +class TimeBoundedStreamInnerJoin( + private val leftLowerBound: Long, + private val leftUpperBound: Long, + private val allowedLateness: Long, + private val leftType: TypeInformation[Row], + private val rightType: TypeInformation[Row], + private val genJoinFuncName: String, + private val genJoinFuncCode: String, + private val leftTimeIdx: Int, + private val rightTimeIdx: Int, + private val timeIndicator: JoinTimeIndicator) + extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store the left stream records + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store right stream records + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private val relativeWindowSize = rightRelativeSize + leftRelativeSize + + private var leftOperatorTime: Long = 0L + private var rightOperatorTime: Long = 0L + + private var backPressureSuggestion: Long = 0L --- End diff -- This variable is used to store a suggestion value for performing backpressure *in the future*. We could cache less records if one of the stream is held back with this suggestion. It's just like moving the cache from Flink to upstream components (e.g., Kafka). ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r137150128 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala --- @@ -115,10 +118,15 @@ object WindowJoinUtil { case _ => Some(otherPreds.reduceLeft((l, r) => RelOptUtil.andJoinFilters(rexBuilder, l, r))) } - -val bounds = Some(WindowBounds(timePreds.head.isEventTime, leftLowerBound, leftUpperBound)) - -(bounds, remainCondition) +if (timePreds.head.leftInputOnLeftSide) { --- End diff -- We should also subtract the `leftLogicalFieldCnt` for the later index? ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r137146303 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -184,4 +195,54 @@ class DataStreamWindowJoin( .returns(returnTypeInfo) } } + + def createRowTimeInnerJoinFunction( +leftDataStream: DataStream[CRow], +rightDataStream: DataStream[CRow], +joinFunctionName: String, +joinFunctionCode: String, +leftKeys: Array[Int], +rightKeys: Array[Int]): DataStream[CRow] = { + +val returnTypeInfo = CRowTypeInfo(schema.typeInfo) + +val rowTimeInnerJoinFunc = new TimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + 0L, + leftSchema.typeInfo, + rightSchema.typeInfo, + joinFunctionName, + joinFunctionCode, + leftTimeIdx, + rightTimeIdx, + JoinTimeIndicator.ROWTIME +) + +if (!leftKeys.isEmpty) { + leftDataStream +.connect(rightDataStream) +.keyBy(leftKeys, rightKeys) +.transform( + "rowTimeInnerJoinFunc", + returnTypeInfo, + new KeyedCoProcessOperatorWithWatermarkDelay[CRow, CRow, CRow, CRow]( +rowTimeInnerJoinFunc, +rowTimeInnerJoinFunc.getMaxOutputDelay) --- End diff -- I think the "watermark delay" is considered from the operator level while the "output delay" is named from the function level. So how about keep this name? ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r137144634 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -184,4 +195,54 @@ class DataStreamWindowJoin( .returns(returnTypeInfo) } } + + def createRowTimeInnerJoinFunction( +leftDataStream: DataStream[CRow], +rightDataStream: DataStream[CRow], +joinFunctionName: String, +joinFunctionCode: String, +leftKeys: Array[Int], +rightKeys: Array[Int]): DataStream[CRow] = { + +val returnTypeInfo = CRowTypeInfo(schema.typeInfo) + +val rowTimeInnerJoinFunc = new TimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + 0L, + leftSchema.typeInfo, + rightSchema.typeInfo, + joinFunctionName, + joinFunctionCode, + leftTimeIdx, + rightTimeIdx, + JoinTimeIndicator.ROWTIME +) + +if (!leftKeys.isEmpty) { + leftDataStream +.connect(rightDataStream) +.keyBy(leftKeys, rightKeys) +.transform( + "rowTimeInnerJoinFunc", --- End diff -- I'd like to call this kind of join "time-bounded join" instead of "window join". When referring to window join, the users may think of tumbling-window or sliding-window, while they are actually not the same. However, as the âwindow-joinâ name has been widely used, I can also accept it. Do you have any idea about that? ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r137137915 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala --- @@ -55,8 +55,10 @@ class DataStreamWindowJoinRule if (windowBounds.isDefined) { if (windowBounds.get.isEventTime) { -// we cannot handle event-time window joins yet -false +val procTimeAttrInOutput = join.getRowType.getFieldList.asScala + .exists(f => FlinkTypeFactory.isProctimeIndicatorType(f.getType)) + +!remainingPredsAccessTime && !procTimeAttrInOutput --- End diff -- Shall we also keep the rowtime attributes in the outputs of proctime join? ---
[GitHub] flink issue #4625: [FLINK-6233] [table] Support time-bounded stream inner jo...
Github user xccui commented on the issue: https://github.com/apache/flink/pull/4625 Thanks for the review, @fhueske. This PR is a little rough when I committed. I'll address your comments and submit a refined version as soon as possible. Best, Xingcan ---
[GitHub] flink issue #4633: [FLINK-7564] [table] Fix Watermark semantics in RowTimeUn...
Github user xccui commented on the issue: https://github.com/apache/flink/pull/4633 I see. Thanks :-) ---
[GitHub] flink issue #4633: [FLINK-7564] [table] Fix Watermark semantics in RowTimeUn...
Github user xccui commented on the issue: https://github.com/apache/flink/pull/4633 Thanks for the review, @fhueske. I tried to consolidate the logics for proctime and rowtime watermark processing, but failed. That's because when using proctime, the "watermark" (not sure if this concept exists for proctime) for triggering the timers is set to be identical with the current record's processing time, thus all the records will be considered late. Do you think it's necessary to unify their behaviors, or we just take the proctime as a special case? ---
[GitHub] flink pull request #4633: [FLINK-7564] [table] Fix Watermark semantics in Ro...
GitHub user xccui opened a pull request: https://github.com/apache/flink/pull/4633 [FLINK-7564] [table] Fix Watermark semantics in RowTimeUnboundedOver ## What is the purpose of the change This PR aims to fix the watermark boundary check problem (mentioned in [this thread](https://lists.apache.org/thread.html/3541e72ba3842192e58a487e54c2817f6b2b9d12af5fee97af83e5df@%3Cdev.flink.apache.org%3E.)) in Table API. ## Brief change log - Change the delay-condition for rows in RowTimeUnboundedOver to <= watermark. - Add some tests for late data in OverWindowHarnessTest. ## Verifying this change This change is already covered by OverWindowHarnessTest. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (N/A) You can merge this pull request into a Git repository by running: $ git pull https://github.com/xccui/flink FLINK-7564 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4633.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4633 commit 5312ac421384436d7170d8f1c2e00aba9e670044 Author: Xingcan Cui Date: 2017-09-01T01:16:21Z [FLINK-7564] [table] Fix Watermark semantics in Table API --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
GitHub user xccui opened a pull request: https://github.com/apache/flink/pull/4625 [FLINK-6233] [table] Support time-bounded stream inner join in the SQL API ## What is the purpose of the change This PR aims add an implementation of the time-bounded stream inner join for both proctime and rowtime in the SQL API. For example, ``SELECT * from L, R WHERE L.pid = R.pid AND L.time between R.time + X and R.time + Y``. A design document for this problem can be found [here](http://goo.gl/VW5Gpd). ## Brief change log - I fill the missing part of the compiling stage for the rowtime stream inner join. - Some logics are added to the `WindowJoinUtil` to extract the rowtime indices. - A general `TimeBoundedStreamInnerJoin` is provided. - To test the new join function, I add a `TimeBoundedJoinExample` and some new tests to the `JoinHarnessTest`. ## Verifying this change This change added tests to the existing JoinHarnessTest. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (**no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**no**) - The serializers: (**no**) - The runtime per-record code paths (performance sensitive): (**no**) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**no**) ## Documentation - Does this pull request introduce a new feature? (**yes**) - If yes, how is the feature documented? (**not documented yet**) You can merge this pull request into a Git repository by running: $ git pull https://github.com/xccui/flink FLINK-6233 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4625.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4625 commit c79588b134a1270956a6d32b7a0a13ff4e3f483d Author: Xingcan Cui Date: 2017-08-30T05:57:38Z [FLINK-6233] [table] Support rowtime inner equi-join between two streams in the SQL API --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4530: [FLINK-7245] [stream] Support holding back watermarks wit...
Github user xccui commented on the issue: https://github.com/apache/flink/pull/4530 I totally understand the choice, @fhueske ð Thanks for the refactoring. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4530: [FLINK-7245] [stream] Support holding back watermarks wit...
Github user xccui commented on the issue: https://github.com/apache/flink/pull/4530 Thanks for the comment, @aljoscha. IMO, making the `timeServiceManager` protected indeed will minimise the impact on `AbstractStreamOperator`, while that may introduce duplicated codes in the subclasses. We make some trade-offs here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4530: [FLINK-7245] [stream] Support holding back watermarks wit...
Github user xccui commented on the issue: https://github.com/apache/flink/pull/4530 Thanks for the comments @fhueske. I will pay more attention to the coding style. Actually, there are many ways to implement this feature. At first, I planed to override the `processWatermark` method in the sub-class. However, the instance variable `timeServiceManager` needed is declared as private. I'm not sure if this can be changed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4530: [FLINK-7245] [stream] Support holding back watermarks wit...
Github user xccui commented on the issue: https://github.com/apache/flink/pull/4530 @fhueske Yes, the plural is better. I should have noticed that before. This PR is updated with the new package name and an extra delay parameter added to the co-operator. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4530: [FLINK-7245] [stream] Support holding back watermarks wit...
Github user xccui commented on the issue: https://github.com/apache/flink/pull/4530 Thanks for the suggestion, @aljoscha. Do you think it's appropriate to add a new package `org.apache.flink.table.runtime.operator`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132842811 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -719,33 +715,47 @@ abstract class TableEnvironment(val config: TableConfig) { throw new TableException("Field name can not be '*'.") } -(fieldNames.toArray, fieldIndexes.toArray) +(fieldNames.toArray, fieldIndexes.toArray) // build fails if not converted to array --- End diff -- In my local environment, `toArray` also seems to be redundant. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132833668 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/OutputRowtimeProcessFunction.scala --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime + +import org.apache.calcite.runtime.SqlFunctions +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.util.Collector + +/** + * Wraps a ProcessFunction and sets a Timestamp field of a CRow as + * [[org.apache.flink.streaming.runtime.streamrecord.StreamRecord]] timestamp. + */ +class OutputRowtimeProcessFunction[OUT]( +function: MapFunction[CRow, OUT], +rowtimeIdx: Int) --- End diff -- It seems that this function only changes the data type of the rowtime field from Long to Timestamp. Shall we consider making the `rowtimeIdx` an array? Besides, as @wuchong suggested, I also think a query should keep the data type unchanged. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132820853 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -667,30 +719,62 @@ abstract class StreamTableEnvironment( // get CRow plan val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig) +val rowtimeFields = logicalType + .getFieldList.asScala + .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) + +// convert the input type for the conversion mapper +// the input will be changed in the OutputRowtimeProcessFunction later +val convType = if (rowtimeFields.size > 1) { + throw new TableException( --- End diff -- I got an idea, but not sure if it's applicable. We allow multiple rowtime fields in a stream but only activate one in an operator. Since the timestamps are stored in records, the other inactive rowtime fields can just be taken as common fields. Any changes on the rowtime fields will render them invalid for rowtime use. IMO, there are not too many queries (maybe only over aggregate and join) depending on the rowtime, thus the optimizer may be able to deduce which rowtime field should be activated in an operator. However, some existing logics may be affected by that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132817252 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -667,30 +719,62 @@ abstract class StreamTableEnvironment( // get CRow plan val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig) +val rowtimeFields = logicalType + .getFieldList.asScala + .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) + +// convert the input type for the conversion mapper +// the input will be changed in the OutputRowtimeProcessFunction later +val convType = if (rowtimeFields.size > 1) { + throw new TableException( --- End diff -- Thanks for the PR, @fhueske and @twalthr . I tried to rebase my rowtime join codes on this branch, but encountered this exception. The test SQL is `SELECT * FROM OrderA, OrderB WHERE OrderA.productA = OrderB.productB AND OrderB.rtB BETWEEN OrderA.rtA AND OrderA.rtA + INTERVAL '2' SECOND`. What should I do to *cast all other fields to TIMESTAMP*. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4530: [FLINK-7245] [stream] Support holding back waterma...
GitHub user xccui opened a pull request: https://github.com/apache/flink/pull/4530 [FLINK-7245] [stream] Support holding back watermarks with static delays ## What is the purpose of the change *This pull request aims to allow the operators to support holding back watermarks with **static** delays.* ## Brief change log - *Introduce a new method `getWatermarkToEmit(Watermark inputWatermark)`, which allows to generate a new watermark with different timestamp before emitting it.* - *Add two operators `KeyedProcessOperatorWithWatermarkDelay` and `KeyedCoProcessOperatorWithWatermarkDelay` that support holding back watermarks with static delays.* ## Verifying this change This change is verified by two new test classes `KeyedProcessOperatorWithWatermarkDelayTest` and `KeyedCoProcessOperatorWithWatermarkDelayTest`. They test whether watermarks received by the two added operators can be held back with the given delays and the provided delays are non-negative. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (**no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**no**) - The serializers: (**no**) - The runtime per-record code paths (performance sensitive): (**no**) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**no**) ## Documentation - Does this pull request introduce a new feature? (**yes**) - If yes, how is the feature documented? (**JavaDocs**) You can merge this pull request into a Git repository by running: $ git pull https://github.com/xccui/flink FLINK-7245 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4530.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4530 commit a24a11522af54c547d014d30adbefa23997d0f8d Author: Xingcan Cui Date: 2017-08-09T12:54:16Z [FLINK-7245] [stream] Support holding back watermarks with static delays commit f730ab45c88f8bcbc27e411901e27dee84aa26b2 Author: Xingcan Cui Date: 2017-08-11T16:15:53Z Refine codes --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4297: [FLINK-6936] [streaming] Add multiple targets supp...
GitHub user xccui opened a pull request: https://github.com/apache/flink/pull/4297 [FLINK-6936] [streaming] Add multiple targets support for custom partitioner Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/xccui/flink FLINK-6936 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4297.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4297 commit 754a8ad7ae316a09edc2bf0ea957006aee02b595 Author: Xingcan Cui Date: 2017-07-07T12:25:39Z [FLINK-6936] [streaming] Add multiple targets support for custom partitioner --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3768: [FLINK-6368][table] Grouping keys in stream aggreg...
GitHub user xccui opened a pull request: https://github.com/apache/flink/pull/3768 [FLINK-6368][table] Grouping keys in stream aggregations have wrong order âFLINK-5768 removed the `AggregateUtil.createPrepareMapFunction` stage, who maps all grouping keys to the first n fields of a record. That's why in old versions we generated new shifted grouping keys (`val groupingKeys = grouping.indices.toArray`) by the original keys' indices. Now that the mapping has been removed, we should use the original grouping keys rather than the shifted keys. Also, a test method posted in https://issues.apache.org/jira/browse/FLINK-6368 is added to DataStreamAggregateITCase. You can merge this pull request into a Git repository by running: $ git pull https://github.com/xccui/flink FLINK-6368 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3768.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3768 commit ed03570d9bfa52e634de5a13b3425a5fd21fe6c8 Author: xccui Date: 2017-04-25T06:06:45Z [FLINK-6368] Fix the wrong ordered keys problem --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3284: [FLINK-1526] [Gelly] Add Minimum Spanning Tree lib...
Github user xccui closed the pull request at: https://github.com/apache/flink/pull/3284 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3284: [FLINK-1526] [Gelly] Add Minimum Spanning Tree library me...
Github user xccui commented on the issue: https://github.com/apache/flink/pull/3284 With the present situation (for gelly), shall I close this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3284: [FLINK-1526] [Gelly] Add Minimum Spanning Tree lib...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/3284#discussion_r100455274 --- Diff: docs/dev/libs/gelly/library_methods.md --- @@ -242,6 +242,28 @@ The algorithm takes a directed, vertex (and possibly edge) attributed graph as i vertex represents a group of vertices and each edge represents a group of edges from the input graph. Furthermore, each vertex and edge in the output graph stores the common group value and the number of represented elements. +## Minimum Spanning Tree + + Overview +This is an implementation of the distributed minimum spanning tree (MST) algorithm. A minimum spanning tree for a connected and +undirected graph is a subset of edges with minimum possible total edge weight that connects all the vertices without cycles. +One of the possible use cases for MST could be laying out cables that connect all buildings with the minimum total cable length. + + Details +Unlike a sequential version of the algorithm, a distributed MST algorithm is based on the message-passing model. +We use [vertex-centric iterations](#vertex-centric-iterations) to implement the Borůvka algorithm described in +[this paper](http://ieeexplore.ieee.org/abstract/document/508073/). As there are different steps inside the iteration, --- End diff -- Yes greg, you are right. A pdf is surely better than a web site. Think it over, I want to change the paper to https://cs.uwaterloo.ca/~kdaudjee/comparison.pdf since it fits more. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3284: [FLINK-1526] [Gelly] Add Minimum Spanning Tree lib...
GitHub user xccui opened a pull request: https://github.com/apache/flink/pull/3284 [FLINK-1526] [Gelly] Add Minimum Spanning Tree library method Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [X] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [X] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/xccui/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3284.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3284 commit 495aa623dccecfba66124b44c7dd955b61853c94 Author: xccui Date: 2017-02-08T04:57:21Z [FLINK-1526] [Gelly] Add Minimum Spanning Tree library method --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---