[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16478781#comment-16478781 ] ASF GitHub Bot commented on FLINK-8428: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5327 > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Fix For: 1.6.0 > > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16478720#comment-16478720 ] ASF GitHub Bot commented on FLINK-8428: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/5327 Thanks for the update @hequn8128. The changes look good. I tested your implementation on a cluster with TPC-H data. The results were equal to the batch results and the state clean-up worked. I will merge this :-) > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476998#comment-16476998 ] ASF GitHub Bot commented on FLINK-8428: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r188532278 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala --- @@ -53,4 +53,8 @@ trait DataStreamRel extends FlinkRelNode { */ def consumesRetractions: Boolean = false + /** +* Whether the [[DataStreamRel]] produces retraction messages. +*/ + def producesRetractions: Boolean = false --- End diff -- Thanks for the explanation. > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476992#comment-16476992 ] ASF GitHub Bot commented on FLINK-8428: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r188531694 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -60,6 +60,9 @@ class DataStreamJoin( override def needsUpdatesAsRetraction: Boolean = true + // outer join will generate retractions + override def producesRetractions: Boolean = joinType != JoinRelType.INNER --- End diff -- Thanks, now I understand the terminology between producing and just forwarding retractions. > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476994#comment-16476994 ] ASF GitHub Bot commented on FLINK-8428: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r188531785 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -176,14 +179,34 @@ class DataStreamJoin( body, returnType) -val coMapFun = - new NonWindowInnerJoin( -leftSchema.typeInfo, -rightSchema.typeInfo, -CRowTypeInfo(returnType), -genFunction.name, -genFunction.code, -queryConfig) +val coMapFun = joinType match { + case JoinRelType.INNER => +new NonWindowInnerJoin( + leftSchema.typeInfo, + rightSchema.typeInfo, + CRowTypeInfo(returnType), + genFunction.name, + genFunction.code, + queryConfig) + case JoinRelType.LEFT if joinInfo.isEqui => +new NonWindowLeftRightJoin( + leftSchema.typeInfo, + rightSchema.typeInfo, + CRowTypeInfo(returnType), + genFunction.name, + genFunction.code, + joinType == JoinRelType.LEFT, + queryConfig) + case JoinRelType.LEFT => --- End diff -- We can also do it as part of FLINK-8429. > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16458620#comment-16458620 ] ASF GitHub Bot commented on FLINK-8428: --- Github user hequn8128 commented on the issue: https://github.com/apache/flink/pull/5327 @twalthr Hi, thanks for your review. I have updated the pr according to your suggestions. Changes mainly include: - Remove changes about UpsertSink - Refactor test case name and add more test to cover code path - Add more method comments - Add another base class `NonWindowOuterJoinWithNonEquiPredicates` and move corresponding variables and functions into it. - Split `CRowWrappingMultiOutputCollector` into `CRowWrappingMultiOutputCollector` and `LazyOutputCollector`. Best, Hequn. > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16458614#comment-16458614 ] ASF GitHub Bot commented on FLINK-8428: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184995939 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala --- @@ -977,6 +922,254 @@ class JoinITCase extends StreamingWithStateTestBase { expected.add("D,R-8,null") StreamITCase.compareWithList(expected) } + + /** test non-window inner join **/ + @Test + def testNonWindowInnerJoin(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStateBackend(getStateBackend) +StreamITCase.clear + +val data1 = new mutable.MutableList[(Int, Long, String)] +data1.+=((1, 1L, "Hi1")) +data1.+=((1, 2L, "Hi2")) +data1.+=((1, 2L, "Hi2")) +data1.+=((1, 5L, "Hi3")) +data1.+=((2, 7L, "Hi5")) +data1.+=((1, 9L, "Hi6")) +data1.+=((1, 8L, "Hi8")) +data1.+=((3, 8L, "Hi9")) + +val data2 = new mutable.MutableList[(Int, Long, String)] +data2.+=((1, 1L, "HiHi")) +data2.+=((2, 2L, "HeHe")) +data2.+=((3, 2L, "HeHe")) + +val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c) + .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c) +val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c) + .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c) + +tEnv.registerTable("T1", t1) +tEnv.registerTable("T2", t2) + +val sqlQuery = + """ +|SELECT t2.a, t2.c, t1.c +|FROM T1 as t1 JOIN T2 as t2 ON +| t1.a = t2.a AND +| t1.b > t2.b +|""".stripMargin + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = mutable.MutableList( + "1,HiHi,Hi2", + "1,HiHi,Hi2", + "1,HiHi,Hi3", + "1,HiHi,Hi6", + "1,HiHi,Hi8", + "2,HeHe,Hi5", + "null,HeHe,Hi9") + +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testJoin(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.clear +env.setStateBackend(getStateBackend) +val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e" + +val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) +val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) +tEnv.registerTable("Table3", ds1) +tEnv.registerTable("Table5", ds2) + +val result = tEnv.sqlQuery(sqlQuery) + +val expected = Seq("Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo Welt") +val results = result.toRetractStream[Row] +results.addSink(new StreamITCase.RetractingSink) +env.execute() +assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) + } + + @Test + def testJoinWithFilter(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.clear +env.setStateBackend(getStateBackend) + +val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND b < 2" + +val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) +val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) +tEnv.registerTable("Table3", ds1) +tEnv.registerTable("Table5", ds2) + +val result = tEnv.sqlQuery(sqlQuery) + +val expected = Seq("Hi,Hallo") +val results = result.toRetractStream[Row] +results.addSink(new StreamITCase.RetractingSink) +env.execute() +assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) + } + + @Test + def testInnerJoinWithNonEquiJoinPredicate(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.clear +env.setStateBackend(getStateBackend) + +val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND a < 6 AND h < b" + +val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) +val ds2 = StreamTestData.get5
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16458610#comment-16458610 ] ASF GitHub Bot commented on FLINK-8428: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184995596 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +leftResultRow = new Row(resultType.getArity) +rightResultRow = new Row(resultType.getArity) + +joinCntState = new Array[MapState[Row, Long]](2) +val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(0) = getRuntimeContext.getMapState(leftJoinCntStateDescriptor) +val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "rightJoinCnt", rightType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(1) = getRuntimeContext.getMapState(rightJoinCntStateDescriptor) + +LOG.debug(s"Instantiating NonWindowOuterJoin") + } + + /** +* Join current row with other side rows. Preserve current row if there are no matched rows +* from other side. +*/ + def preservedJoin( + inputRow: Row, + inputRowFromLeft: Boolean, + otherSideState: MapState[Row, JTuple2[Long, Long]], + curProcessTime: Long): Long = { + +val otherSideIterator = otherSideState.iterator() +while (otherSideIterator.hasNext) { + val otherSideEntry = otherSideIterator.next()
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16458613#comment-16458613 ] ASF GitHub Bot commented on FLINK-8428: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184995707 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -176,14 +179,34 @@ class DataStreamJoin( body, returnType) -val coMapFun = - new NonWindowInnerJoin( -leftSchema.typeInfo, -rightSchema.typeInfo, -CRowTypeInfo(returnType), -genFunction.name, -genFunction.code, -queryConfig) +val coMapFun = joinType match { + case JoinRelType.INNER => +new NonWindowInnerJoin( + leftSchema.typeInfo, + rightSchema.typeInfo, + CRowTypeInfo(returnType), + genFunction.name, + genFunction.code, + queryConfig) + case JoinRelType.LEFT if joinInfo.isEqui => +new NonWindowLeftRightJoin( + leftSchema.typeInfo, + rightSchema.typeInfo, + CRowTypeInfo(returnType), + genFunction.name, + genFunction.code, + joinType == JoinRelType.LEFT, + queryConfig) + case JoinRelType.LEFT => --- End diff -- I planed to add right join in FLINK-8429. It's ok to add right join in this pr if you prefer. > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16458609#comment-16458609 ] ASF GitHub Bot commented on FLINK-8428: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184995557 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -230,8 +230,12 @@ abstract class StreamTableEnvironment( tableKeys match { case Some(keys) => upsertSink.setKeyFields(keys) case None if isAppendOnlyTable => upsertSink.setKeyFields(null) - case None if !isAppendOnlyTable => throw new TableException( -"UpsertStreamTableSink requires that Table has a full primary keys if it is updated.") + case None if !isAppendOnlyTable && upsertSink.enforceKeyFields() == null => --- End diff -- OK. > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16458612#comment-16458612 ] ASF GitHub Bot commented on FLINK-8428: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184995668 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala --- @@ -302,8 +303,87 @@ class RetractionRulesTest extends TableTestBase { ) util.verifyTableTrait(resultTable, expected) } -} + @Test + def testInnerJoinWithoutAgg(): Unit = { +val util = streamTestForRetractionUtil() +val lTable = util.addTable[(Int, Int)]('a, 'b) +val rTable = util.addTable[(Int, Int)]('bb, 'c) + +val resultTable = lTable + .join(rTable) + .where('b === 'bb) + .select('a, 'b, 'c) + +val expected = + unaryNode( +"DataStreamCalc", +binaryNode( + "DataStreamJoin", + "DataStreamScan(true, Acc)", + "DataStreamScan(true, Acc)", + "false, Acc" +), +"false, Acc" + ) +util.verifyTableTrait(resultTable, expected) + } + + @Test + def testLeftJoin(): Unit = { +val util = streamTestForRetractionUtil() +val lTable = util.addTable[(Int, Int)]('a, 'b) +val rTable = util.addTable[(Int, String)]('bb, 'c) + +val resultTable = lTable + .leftOuterJoin(rTable, 'b === 'bb) + .select('a, 'b, 'c) + +val expected = + unaryNode( +"DataStreamCalc", +binaryNode( + "DataStreamJoin", + "DataStreamScan(true, Acc)", + "DataStreamScan(true, Acc)", + "false, AccRetract" +), +"false, AccRetract" + ) +util.verifyTableTrait(resultTable, expected) + } + + @Test + def testAggFollowedWithLeftJoin(): Unit = { +val util = streamTestForRetractionUtil() +val lTable = util.addTable[(Int, Int)]('a, 'b) +val rTable = util.addTable[(Int, String)]('bb, 'c) + +val countDistinct = new CountDistinct +val resultTable = lTable + .leftOuterJoin(rTable, 'b === 'bb) + .select('a, 'b, 'c) + .groupBy('a) + .select('a, countDistinct('c)) + +val expected = + unaryNode( +"DataStreamGroupAggregate", +unaryNode( + "DataStreamCalc", + binaryNode( +"DataStreamJoin", +"DataStreamScan(true, Acc)", --- End diff -- `testJoin()` has covered this case. > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16458607#comment-16458607 ] ASF GitHub Bot commented on FLINK-8428: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184995438 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +leftResultRow = new Row(resultType.getArity) +rightResultRow = new Row(resultType.getArity) + +joinCntState = new Array[MapState[Row, Long]](2) +val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(0) = getRuntimeContext.getMapState(leftJoinCntStateDescriptor) +val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "rightJoinCnt", rightType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(1) = getRuntimeContext.getMapState(rightJoinCntStateDescriptor) + +LOG.debug(s"Instantiating NonWindowOuterJoin") + } + + /** +* Join current row with other side rows. Preserve current row if there are no matched rows +* from other side. +*/ + def preservedJoin( + inputRow: Row, + inputRowFromLeft: Boolean, + otherSideState: MapState[Row, JTuple2[Long, Long]], + curProcessTime: Long): Long = { + +val otherSideIterator = otherSideState.iterator() +while (otherSideIterator.hasNext) { + val otherSideEntry = otherSideIterator.next()
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16458606#comment-16458606 ] ASF GitHub Bot commented on FLINK-8428: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184995228 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +leftResultRow = new Row(resultType.getArity) +rightResultRow = new Row(resultType.getArity) + +joinCntState = new Array[MapState[Row, Long]](2) +val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(0) = getRuntimeContext.getMapState(leftJoinCntStateDescriptor) +val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "rightJoinCnt", rightType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(1) = getRuntimeContext.getMapState(rightJoinCntStateDescriptor) + +LOG.debug(s"Instantiating NonWindowOuterJoin") + } + + /** +* Join current row with other side rows. Preserve current row if there are no matched rows +* from other side. +*/ + def preservedJoin( + inputRow: Row, + inputRowFromLeft: Boolean, + otherSideState: MapState[Row, JTuple2[Long, Long]], + curProcessTime: Long): Long = { + +val otherSideIterator = otherSideState.iterator() +while (otherSideIterator.hasNext) { + val otherSideEntry = otherSideIterator.next()
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16458603#comment-16458603 ] ASF GitHub Bot commented on FLINK-8428: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184994673 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +leftResultRow = new Row(resultType.getArity) +rightResultRow = new Row(resultType.getArity) + +joinCntState = new Array[MapState[Row, Long]](2) +val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(0) = getRuntimeContext.getMapState(leftJoinCntStateDescriptor) +val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "rightJoinCnt", rightType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(1) = getRuntimeContext.getMapState(rightJoinCntStateDescriptor) + +LOG.debug(s"Instantiating NonWindowOuterJoin") + } + + /** +* Join current row with other side rows. Preserve current row if there are no matched rows +* from other side. +*/ + def preservedJoin( + inputRow: Row, + inputRowFromLeft: Boolean, + otherSideState: MapState[Row, JTuple2[Long, Long]], + curProcessTime: Long): Long = { + +val otherSideIterator = otherSideState.iterator() +while (otherSideIterator.hasNext) { + val otherSideEntry = otherSideIterator.next()
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16458601#comment-16458601 ] ASF GitHub Bot commented on FLINK-8428: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184994503 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +leftResultRow = new Row(resultType.getArity) +rightResultRow = new Row(resultType.getArity) + +joinCntState = new Array[MapState[Row, Long]](2) +val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(0) = getRuntimeContext.getMapState(leftJoinCntStateDescriptor) +val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "rightJoinCnt", rightType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(1) = getRuntimeContext.getMapState(rightJoinCntStateDescriptor) + +LOG.debug(s"Instantiating NonWindowOuterJoin") + } + + /** +* Join current row with other side rows. Preserve current row if there are no matched rows +* from other side. +*/ + def preservedJoin( + inputRow: Row, + inputRowFromLeft: Boolean, + otherSideState: MapState[Row, JTuple2[Long, Long]], + curProcessTime: Long): Long = { + +val otherSideIterator = otherSideState.iterator() +while (otherSideIterator.hasNext) { + val otherSideEntry = otherSideIterator.next()
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16458598#comment-16458598 ] ASF GitHub Bot commented on FLINK-8428: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184994194 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ --- End diff -- OK. I create a base class for outer Join with non-equal predicates(`NonWindowOuterJoinWithNonEquiPredicates`). > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16458595#comment-16458595 ] ASF GitHub Bot commented on FLINK-8428: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184993658 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -60,6 +60,9 @@ class DataStreamJoin( override def needsUpdatesAsRetraction: Boolean = true + // outer join will generate retractions + override def producesRetractions: Boolean = joinType != JoinRelType.INNER --- End diff -- Inner join doesn't produce retractions, left/right/full join does, for example, left join will retract the previous non-matched output when new matched row comes from the right side. > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16458593#comment-16458593 ] ASF GitHub Bot commented on FLINK-8428: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184993457 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala --- @@ -53,4 +53,8 @@ trait DataStreamRel extends FlinkRelNode { */ def consumesRetractions: Boolean = false + /** +* Whether the [[DataStreamRel]] produces retraction messages. +*/ + def producesRetractions: Boolean = false --- End diff -- A join generates retraction if it's type is left/right/full. It is different from agg which generates retractions if `sendsUpdatesAsRetraction(node) && node.producesUpdates` is true. > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16445652#comment-16445652 ] ASF GitHub Bot commented on FLINK-8428: --- Github user hequn8128 commented on the issue: https://github.com/apache/flink/pull/5327 @twalthr Hi, Great to see your review and valuable suggestions. I will update my pr late next week(maybe next weekend). Thanks very much. > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443902#comment-16443902 ] ASF GitHub Bot commented on FLINK-8428: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182470078 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +leftResultRow = new Row(resultType.getArity) +rightResultRow = new Row(resultType.getArity) + +joinCntState = new Array[MapState[Row, Long]](2) +val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(0) = getRuntimeContext.getMapState(leftJoinCntStateDescriptor) +val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "rightJoinCnt", rightType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(1) = getRuntimeContext.getMapState(rightJoinCntStateDescriptor) + +LOG.debug(s"Instantiating NonWindowOuterJoin") + } + + /** +* Join current row with other side rows. Preserve current row if there are no matched rows +* from other side. +*/ + def preservedJoin( + inputRow: Row, + inputRowFromLeft: Boolean, + otherSideState: MapState[Row, JTuple2[Long, Long]], + curProcessTime: Long): Long = { + +val otherSideIterator = otherSideState.iterator() +while (otherSideIterator.hasNext) { + val otherSideEntry = otherSideIterator.next() +
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443912#comment-16443912 ] ASF GitHub Bot commented on FLINK-8428: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182687448 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +leftResultRow = new Row(resultType.getArity) +rightResultRow = new Row(resultType.getArity) + +joinCntState = new Array[MapState[Row, Long]](2) +val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(0) = getRuntimeContext.getMapState(leftJoinCntStateDescriptor) +val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "rightJoinCnt", rightType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(1) = getRuntimeContext.getMapState(rightJoinCntStateDescriptor) + +LOG.debug(s"Instantiating NonWindowOuterJoin") + } + + /** +* Join current row with other side rows. Preserve current row if there are no matched rows +* from other side. +*/ + def preservedJoin( --- End diff -- Explain return type. > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Compo
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443910#comment-16443910 ] ASF GitHub Bot commented on FLINK-8428: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182691350 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +leftResultRow = new Row(resultType.getArity) +rightResultRow = new Row(resultType.getArity) + +joinCntState = new Array[MapState[Row, Long]](2) +val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(0) = getRuntimeContext.getMapState(leftJoinCntStateDescriptor) +val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "rightJoinCnt", rightType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(1) = getRuntimeContext.getMapState(rightJoinCntStateDescriptor) + +LOG.debug(s"Instantiating NonWindowOuterJoin") + } + + /** +* Join current row with other side rows. Preserve current row if there are no matched rows +* from other side. +*/ + def preservedJoin( + inputRow: Row, + inputRowFromLeft: Boolean, + otherSideState: MapState[Row, JTuple2[Long, Long]], + curProcessTime: Long): Long = { + +val otherSideIterator = otherSideState.iterator() +while (otherSideIterator.hasNext) { + val otherSideEntry = otherSideIterator.next() +
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443915#comment-16443915 ] ASF GitHub Bot commented on FLINK-8428: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182658427 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +leftResultRow = new Row(resultType.getArity) +rightResultRow = new Row(resultType.getArity) + +joinCntState = new Array[MapState[Row, Long]](2) +val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(0) = getRuntimeContext.getMapState(leftJoinCntStateDescriptor) +val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "rightJoinCnt", rightType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(1) = getRuntimeContext.getMapState(rightJoinCntStateDescriptor) + +LOG.debug(s"Instantiating NonWindowOuterJoin") + } + + /** +* Join current row with other side rows. Preserve current row if there are no matched rows +* from other side. +*/ + def preservedJoin( + inputRow: Row, + inputRowFromLeft: Boolean, + otherSideState: MapState[Row, JTuple2[Long, Long]], + curProcessTime: Long): Long = { + +val otherSideIterator = otherSideState.iterator() +while (otherSideIterator.hasNext) { + val otherSideEntry = otherSideIterator.next() +
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443900#comment-16443900 ] ASF GitHub Bot commented on FLINK-8428: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182431840 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftRightJoin.scala --- @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Only use for left or right join without + * non-equal predicates. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code without any non-equi condition + * @param genJoinFuncCode the function name without any non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +class NonWindowLeftRightJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowOuterJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +isLeftJoin, +queryConfig) { + + override def open(parameters: Configuration): Unit = { +super.open(parameters) +val joinType = if (isLeftJoin) "Left" else "Right" +LOG.debug(s"Instantiating NonWindow${joinType}OuterJoin") + } + + /** +* Puts or Retract an element from the input stream into state and search the other state to +* output records meet the condition. The input row will be preserved and appended with null, if +* there is no match. Records will be expired in state if state retention time has been +* specified. +*/ + override def processElement( + value: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow], + timerState: ValueState[Long], + currentSideState: MapState[Row, JTuple2[Long, Long]], + otherSideState: MapState[Row, JTuple2[Long, Long]], + recordFromLeft: Boolean): Unit = { + +val inputRow = value.row +val (curProcessTime, _) = updateCurrentSide(value, ctx, timerState, currentSideState) --- End diff -- Same object creation issue as above. > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#760
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443903#comment-16443903 ] ASF GitHub Bot commented on FLINK-8428: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182476347 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +leftResultRow = new Row(resultType.getArity) +rightResultRow = new Row(resultType.getArity) + +joinCntState = new Array[MapState[Row, Long]](2) +val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(0) = getRuntimeContext.getMapState(leftJoinCntStateDescriptor) +val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "rightJoinCnt", rightType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(1) = getRuntimeContext.getMapState(rightJoinCntStateDescriptor) + +LOG.debug(s"Instantiating NonWindowOuterJoin") + } + + /** +* Join current row with other side rows. Preserve current row if there are no matched rows +* from other side. +*/ + def preservedJoin( + inputRow: Row, + inputRowFromLeft: Boolean, + otherSideState: MapState[Row, JTuple2[Long, Long]], + curProcessTime: Long): Long = { + +val otherSideIterator = otherSideState.iterator() +while (otherSideIterator.hasNext) { + val otherSideEntry = otherSideIterator.next() +
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443908#comment-16443908 ] ASF GitHub Bot commented on FLINK-8428: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182660370 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +leftResultRow = new Row(resultType.getArity) +rightResultRow = new Row(resultType.getArity) + +joinCntState = new Array[MapState[Row, Long]](2) +val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(0) = getRuntimeContext.getMapState(leftJoinCntStateDescriptor) +val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "rightJoinCnt", rightType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(1) = getRuntimeContext.getMapState(rightJoinCntStateDescriptor) + +LOG.debug(s"Instantiating NonWindowOuterJoin") + } + + /** +* Join current row with other side rows. Preserve current row if there are no matched rows +* from other side. +*/ + def preservedJoin( + inputRow: Row, + inputRowFromLeft: Boolean, + otherSideState: MapState[Row, JTuple2[Long, Long]], + curProcessTime: Long): Long = { + +val otherSideIterator = otherSideState.iterator() +while (otherSideIterator.hasNext) { + val otherSideEntry = otherSideIterator.next() +
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443913#comment-16443913 ] ASF GitHub Bot commented on FLINK-8428: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182700177 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala --- @@ -977,6 +922,254 @@ class JoinITCase extends StreamingWithStateTestBase { expected.add("D,R-8,null") StreamITCase.compareWithList(expected) } + + /** test non-window inner join **/ + @Test + def testNonWindowInnerJoin(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStateBackend(getStateBackend) +StreamITCase.clear + +val data1 = new mutable.MutableList[(Int, Long, String)] +data1.+=((1, 1L, "Hi1")) +data1.+=((1, 2L, "Hi2")) +data1.+=((1, 2L, "Hi2")) +data1.+=((1, 5L, "Hi3")) +data1.+=((2, 7L, "Hi5")) +data1.+=((1, 9L, "Hi6")) +data1.+=((1, 8L, "Hi8")) +data1.+=((3, 8L, "Hi9")) + +val data2 = new mutable.MutableList[(Int, Long, String)] +data2.+=((1, 1L, "HiHi")) +data2.+=((2, 2L, "HeHe")) +data2.+=((3, 2L, "HeHe")) + +val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c) + .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c) +val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c) + .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c) + +tEnv.registerTable("T1", t1) +tEnv.registerTable("T2", t2) + +val sqlQuery = + """ +|SELECT t2.a, t2.c, t1.c +|FROM T1 as t1 JOIN T2 as t2 ON +| t1.a = t2.a AND +| t1.b > t2.b +|""".stripMargin + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = mutable.MutableList( + "1,HiHi,Hi2", + "1,HiHi,Hi2", + "1,HiHi,Hi3", + "1,HiHi,Hi6", + "1,HiHi,Hi8", + "2,HeHe,Hi5", + "null,HeHe,Hi9") + +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testJoin(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.clear +env.setStateBackend(getStateBackend) +val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e" + +val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) +val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) +tEnv.registerTable("Table3", ds1) +tEnv.registerTable("Table5", ds2) + +val result = tEnv.sqlQuery(sqlQuery) + +val expected = Seq("Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo Welt") +val results = result.toRetractStream[Row] +results.addSink(new StreamITCase.RetractingSink) +env.execute() +assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) + } + + @Test + def testJoinWithFilter(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.clear +env.setStateBackend(getStateBackend) + +val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND b < 2" + +val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) +val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) +tEnv.registerTable("Table3", ds1) +tEnv.registerTable("Table5", ds2) + +val result = tEnv.sqlQuery(sqlQuery) + +val expected = Seq("Hi,Hallo") +val results = result.toRetractStream[Row] +results.addSink(new StreamITCase.RetractingSink) +env.execute() +assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) + } + + @Test + def testInnerJoinWithNonEquiJoinPredicate(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.clear +env.setStateBackend(getStateBackend) + +val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND a < 6 AND h < b" + +val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) +val ds2 = StreamTestData.get5Tu
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443914#comment-16443914 ] ASF GitHub Bot commented on FLINK-8428: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182695351 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -176,14 +179,34 @@ class DataStreamJoin( body, returnType) -val coMapFun = - new NonWindowInnerJoin( -leftSchema.typeInfo, -rightSchema.typeInfo, -CRowTypeInfo(returnType), -genFunction.name, -genFunction.code, -queryConfig) +val coMapFun = joinType match { + case JoinRelType.INNER => +new NonWindowInnerJoin( + leftSchema.typeInfo, + rightSchema.typeInfo, + CRowTypeInfo(returnType), + genFunction.name, + genFunction.code, + queryConfig) + case JoinRelType.LEFT if joinInfo.isEqui => +new NonWindowLeftRightJoin( + leftSchema.typeInfo, + rightSchema.typeInfo, + CRowTypeInfo(returnType), + genFunction.name, + genFunction.code, + joinType == JoinRelType.LEFT, + queryConfig) + case JoinRelType.LEFT => --- End diff -- Is there a reason why we don't support right outer joins here? > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443907#comment-16443907 ] ASF GitHub Bot commented on FLINK-8428: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182683951 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +leftResultRow = new Row(resultType.getArity) +rightResultRow = new Row(resultType.getArity) + +joinCntState = new Array[MapState[Row, Long]](2) +val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(0) = getRuntimeContext.getMapState(leftJoinCntStateDescriptor) +val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "rightJoinCnt", rightType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(1) = getRuntimeContext.getMapState(rightJoinCntStateDescriptor) + +LOG.debug(s"Instantiating NonWindowOuterJoin") + } + + /** +* Join current row with other side rows. Preserve current row if there are no matched rows +* from other side. +*/ + def preservedJoin( + inputRow: Row, + inputRowFromLeft: Boolean, + otherSideState: MapState[Row, JTuple2[Long, Long]], + curProcessTime: Long): Long = { + +val otherSideIterator = otherSideState.iterator() +while (otherSideIterator.hasNext) { + val otherSideEntry = otherSideIterator.next() +
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443904#comment-16443904 ] ASF GitHub Bot commented on FLINK-8428: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182660019 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +leftResultRow = new Row(resultType.getArity) +rightResultRow = new Row(resultType.getArity) + +joinCntState = new Array[MapState[Row, Long]](2) +val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(0) = getRuntimeContext.getMapState(leftJoinCntStateDescriptor) +val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "rightJoinCnt", rightType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(1) = getRuntimeContext.getMapState(rightJoinCntStateDescriptor) + +LOG.debug(s"Instantiating NonWindowOuterJoin") + } + + /** +* Join current row with other side rows. Preserve current row if there are no matched rows +* from other side. +*/ + def preservedJoin( + inputRow: Row, + inputRowFromLeft: Boolean, + otherSideState: MapState[Row, JTuple2[Long, Long]], + curProcessTime: Long): Long = { + +val otherSideIterator = otherSideState.iterator() +while (otherSideIterator.hasNext) { + val otherSideEntry = otherSideIterator.next() +
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443899#comment-16443899 ] ASF GitHub Bot commented on FLINK-8428: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182445400 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/CRowWrappingMultiOutputCollector.scala --- @@ -16,35 +16,61 @@ * limitations under the License. */ -package org.apache.flink.table.runtime +package org.apache.flink.table.runtime.join import org.apache.flink.table.runtime.types.CRow import org.apache.flink.types.Row import org.apache.flink.util.Collector /** - * The collector to wrap a [[Row]] into a [[CRow]] and collect it multiple times. + * The collector to wrap a [[Row]] into a [[CRow]] and collect it multiple times. This collector + * can also used to count output record number and do lazy output. */ class CRowWrappingMultiOutputCollector() extends Collector[Row] { private var out: Collector[CRow] = _ - private val outCRow: CRow = new CRow() + private val outCRow: CRow = new CRow(null, true) + // times for collect private var times: Long = 0L + // count how many records have been emitted + private var emitCnt: Long = 0L + // don't collect to downstream if set lazyOutput to true + private var lazyOutput: Boolean = false def setCollector(collector: Collector[CRow]): Unit = this.out = collector def setChange(change: Boolean): Unit = this.outCRow.change = change + def setRow(row: Row): Unit = this.outCRow.row = row + + def getRow(): Row = this.outCRow.row + def setTimes(times: Long): Unit = this.times = times + def setEmitCnt(emitted: Long): Unit = this.emitCnt = emitted + + def getEmitCnt(): Long = emitCnt + + def setLazyOutput(lazyOutput: Boolean): Unit = this.lazyOutput = lazyOutput + override def collect(record: Row): Unit = { outCRow.row = record -var i: Long = 0L -while (i < times) { - out.collect(outCRow) - i += 1 +if (!lazyOutput) { + emitCnt += times + var i: Long = 0L + while (i < times) { +out.collect(outCRow) +i += 1 + } } } + def reset(): Unit = { +this.outCRow.change = true --- End diff -- Remove this line. The change must be set after every reset call anyway. > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443901#comment-16443901 ] ASF GitHub Bot commented on FLINK-8428: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182441219 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ --- End diff -- Convert this to left/right variable as we did it at other locations as well? In any case this state belongs to `NonWindowLeftRightJoinWithNonEquiPredicates` and does not need to be initialized for `NonWindowLeftRightJoin`. > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443898#comment-16443898 ] ASF GitHub Bot commented on FLINK-8428: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182407785 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -60,6 +60,9 @@ class DataStreamJoin( override def needsUpdatesAsRetraction: Boolean = true + // outer join will generate retractions + override def producesRetractions: Boolean = joinType != JoinRelType.INNER --- End diff -- Could you clarify this? A inner join is producing retractions. > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443909#comment-16443909 ] ASF GitHub Bot commented on FLINK-8428: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182664353 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -230,8 +230,12 @@ abstract class StreamTableEnvironment( tableKeys match { case Some(keys) => upsertSink.setKeyFields(keys) case None if isAppendOnlyTable => upsertSink.setKeyFields(null) - case None if !isAppendOnlyTable => throw new TableException( -"UpsertStreamTableSink requires that Table has a full primary keys if it is updated.") + case None if !isAppendOnlyTable && upsertSink.enforceKeyFields() == null => --- End diff -- Can we move this change into a separate issue and PR? It is not related to outer joins and breaks existing table sinks for Java developers. > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443906#comment-16443906 ] ASF GitHub Bot commented on FLINK-8428: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182492444 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +leftResultRow = new Row(resultType.getArity) +rightResultRow = new Row(resultType.getArity) + +joinCntState = new Array[MapState[Row, Long]](2) +val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(0) = getRuntimeContext.getMapState(leftJoinCntStateDescriptor) +val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "rightJoinCnt", rightType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(1) = getRuntimeContext.getMapState(rightJoinCntStateDescriptor) + +LOG.debug(s"Instantiating NonWindowOuterJoin") + } + + /** +* Join current row with other side rows. Preserve current row if there are no matched rows +* from other side. +*/ + def preservedJoin( + inputRow: Row, + inputRowFromLeft: Boolean, + otherSideState: MapState[Row, JTuple2[Long, Long]], + curProcessTime: Long): Long = { + +val otherSideIterator = otherSideState.iterator() +while (otherSideIterator.hasNext) { + val otherSideEntry = otherSideIterator.next() +
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443911#comment-16443911 ] ASF GitHub Bot commented on FLINK-8428: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182692511 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala --- @@ -302,8 +303,87 @@ class RetractionRulesTest extends TableTestBase { ) util.verifyTableTrait(resultTable, expected) } -} + @Test + def testInnerJoinWithoutAgg(): Unit = { +val util = streamTestForRetractionUtil() +val lTable = util.addTable[(Int, Int)]('a, 'b) +val rTable = util.addTable[(Int, Int)]('bb, 'c) + +val resultTable = lTable + .join(rTable) + .where('b === 'bb) + .select('a, 'b, 'c) + +val expected = + unaryNode( +"DataStreamCalc", +binaryNode( + "DataStreamJoin", + "DataStreamScan(true, Acc)", + "DataStreamScan(true, Acc)", + "false, Acc" +), +"false, Acc" + ) +util.verifyTableTrait(resultTable, expected) + } + + @Test + def testLeftJoin(): Unit = { +val util = streamTestForRetractionUtil() +val lTable = util.addTable[(Int, Int)]('a, 'b) +val rTable = util.addTable[(Int, String)]('bb, 'c) + +val resultTable = lTable + .leftOuterJoin(rTable, 'b === 'bb) + .select('a, 'b, 'c) + +val expected = + unaryNode( +"DataStreamCalc", +binaryNode( + "DataStreamJoin", + "DataStreamScan(true, Acc)", + "DataStreamScan(true, Acc)", + "false, AccRetract" +), +"false, AccRetract" + ) +util.verifyTableTrait(resultTable, expected) + } + + @Test + def testAggFollowedWithLeftJoin(): Unit = { +val util = streamTestForRetractionUtil() +val lTable = util.addTable[(Int, Int)]('a, 'b) +val rTable = util.addTable[(Int, String)]('bb, 'c) + +val countDistinct = new CountDistinct +val resultTable = lTable + .leftOuterJoin(rTable, 'b === 'bb) + .select('a, 'b, 'c) + .groupBy('a) + .select('a, countDistinct('c)) + +val expected = + unaryNode( +"DataStreamGroupAggregate", +unaryNode( + "DataStreamCalc", + binaryNode( +"DataStreamJoin", +"DataStreamScan(true, Acc)", --- End diff -- Can you also add a test for a join that consumes from an aggregation? > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443905#comment-16443905 ] ASF GitHub Bot commented on FLINK-8428: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182488812 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala --- @@ -977,6 +922,254 @@ class JoinITCase extends StreamingWithStateTestBase { expected.add("D,R-8,null") StreamITCase.compareWithList(expected) } + + /** test non-window inner join **/ + @Test + def testNonWindowInnerJoin(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStateBackend(getStateBackend) +StreamITCase.clear + +val data1 = new mutable.MutableList[(Int, Long, String)] +data1.+=((1, 1L, "Hi1")) +data1.+=((1, 2L, "Hi2")) +data1.+=((1, 2L, "Hi2")) +data1.+=((1, 5L, "Hi3")) +data1.+=((2, 7L, "Hi5")) +data1.+=((1, 9L, "Hi6")) +data1.+=((1, 8L, "Hi8")) +data1.+=((3, 8L, "Hi9")) + +val data2 = new mutable.MutableList[(Int, Long, String)] +data2.+=((1, 1L, "HiHi")) +data2.+=((2, 2L, "HeHe")) +data2.+=((3, 2L, "HeHe")) + +val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c) + .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c) +val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c) + .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c) + +tEnv.registerTable("T1", t1) +tEnv.registerTable("T2", t2) + +val sqlQuery = + """ +|SELECT t2.a, t2.c, t1.c +|FROM T1 as t1 JOIN T2 as t2 ON +| t1.a = t2.a AND +| t1.b > t2.b +|""".stripMargin + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = mutable.MutableList( + "1,HiHi,Hi2", + "1,HiHi,Hi2", + "1,HiHi,Hi3", + "1,HiHi,Hi6", + "1,HiHi,Hi8", + "2,HeHe,Hi5", + "null,HeHe,Hi9") + +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testJoin(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.clear +env.setStateBackend(getStateBackend) +val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e" + +val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) +val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) +tEnv.registerTable("Table3", ds1) +tEnv.registerTable("Table5", ds2) + +val result = tEnv.sqlQuery(sqlQuery) + +val expected = Seq("Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo Welt") +val results = result.toRetractStream[Row] +results.addSink(new StreamITCase.RetractingSink) +env.execute() +assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) + } + + @Test + def testJoinWithFilter(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.clear +env.setStateBackend(getStateBackend) + +val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND b < 2" + +val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) +val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) +tEnv.registerTable("Table3", ds1) +tEnv.registerTable("Table5", ds2) + +val result = tEnv.sqlQuery(sqlQuery) + +val expected = Seq("Hi,Hallo") +val results = result.toRetractStream[Row] +results.addSink(new StreamITCase.RetractingSink) +env.execute() +assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) + } + + @Test + def testInnerJoinWithNonEquiJoinPredicate(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.clear +env.setStateBackend(getStateBackend) + +val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND a < 6 AND h < b" + +val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) +val ds2 = StreamTestData.get5Tu
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443895#comment-16443895 ] ASF GitHub Bot commented on FLINK-8428: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182407417 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala --- @@ -53,4 +53,8 @@ trait DataStreamRel extends FlinkRelNode { */ def consumesRetractions: Boolean = false + /** +* Whether the [[DataStreamRel]] produces retraction messages. +*/ + def producesRetractions: Boolean = false --- End diff -- Why isn't `producesUpdates` enough? > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443896#comment-16443896 ] ASF GitHub Bot commented on FLINK-8428: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182431730 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftRightJoin.scala --- @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Only use for left or right join without + * non-equal predicates. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code without any non-equi condition + * @param genJoinFuncCode the function name without any non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +class NonWindowLeftRightJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowOuterJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +isLeftJoin, +queryConfig) { + + override def open(parameters: Configuration): Unit = { +super.open(parameters) +val joinType = if (isLeftJoin) "Left" else "Right" +LOG.debug(s"Instantiating NonWindow${joinType}OuterJoin") + } + + /** +* Puts or Retract an element from the input stream into state and search the other state to +* output records meet the condition. The input row will be preserved and appended with null, if +* there is no match. Records will be expired in state if state retention time has been +* specified. +*/ + override def processElement( + value: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow], + timerState: ValueState[Long], + currentSideState: MapState[Row, JTuple2[Long, Long]], + otherSideState: MapState[Row, JTuple2[Long, Long]], + recordFromLeft: Boolean): Unit = { + +val inputRow = value.row +val (curProcessTime, _) = updateCurrentSide(value, ctx, timerState, currentSideState) + +cRowWrapper.reset() +cRowWrapper.setCollector(out) +cRowWrapper.setChange(value.change) +cRowWrapper.setEmitCnt(0) --- End diff -- Remove this line. > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443897#comment-16443897 ] ASF GitHub Bot commented on FLINK-8428: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182427888 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowInnerJoin.scala --- @@ -54,238 +44,51 @@ class NonWindowInnerJoin( genJoinFuncName: String, genJoinFuncCode: String, queryConfig: StreamQueryConfig) - extends CoProcessFunction[CRow, CRow, CRow] - with Compiler[FlatJoinFunction[Row, Row, Row]] - with Logging { - - // check if input types implement proper equals/hashCode - validateEqualsHashCode("join", leftType) - validateEqualsHashCode("join", rightType) - - // state to hold left stream element - private var leftState: MapState[Row, JTuple2[Int, Long]] = _ - // state to hold right stream element - private var rightState: MapState[Row, JTuple2[Int, Long]] = _ - private var cRowWrapper: CRowWrappingMultiOutputCollector = _ - - private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime - private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime - private val stateCleaningEnabled: Boolean = minRetentionTime > 1 - - // state to record last timer of left stream, 0 means no timer - private var leftTimer: ValueState[Long] = _ - // state to record last timer of right stream, 0 means no timer - private var rightTimer: ValueState[Long] = _ - - // other condition function - private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { override def open(parameters: Configuration): Unit = { -LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " + -s"Code:\n$genJoinFuncCode") -val clazz = compile( - getRuntimeContext.getUserCodeClassLoader, - genJoinFuncName, - genJoinFuncCode) -LOG.debug("Instantiating JoinFunction.") -joinFunction = clazz.newInstance() - -// initialize left and right state, the first element of tuple2 indicates how many rows of -// this row, while the second element represents the expired time of this row. -val tupleTypeInfo = new TupleTypeInfo[JTuple2[Int, Long]](Types.INT, Types.LONG) -val leftStateDescriptor = new MapStateDescriptor[Row, JTuple2[Int, Long]]( - "left", leftType, tupleTypeInfo) -val rightStateDescriptor = new MapStateDescriptor[Row, JTuple2[Int, Long]]( - "right", rightType, tupleTypeInfo) -leftState = getRuntimeContext.getMapState(leftStateDescriptor) -rightState = getRuntimeContext.getMapState(rightStateDescriptor) - -// initialize timer state -val valueStateDescriptor1 = new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long]) -leftTimer = getRuntimeContext.getState(valueStateDescriptor1) -val valueStateDescriptor2 = new ValueStateDescriptor[Long]("timervaluestate2", classOf[Long]) -rightTimer = getRuntimeContext.getState(valueStateDescriptor2) - -cRowWrapper = new CRowWrappingMultiOutputCollector() - } - - /** -* Process left stream records -* -* @param valueC The input value. -* @param ctxThe ctx to register timer or get current time -* @param outThe collector for returning result values. -* -*/ - override def processElement1( - valueC: CRow, - ctx: CoProcessFunction[CRow, CRow, CRow]#Context, - out: Collector[CRow]): Unit = { - -processElement(valueC, ctx, out, leftTimer, leftState, rightState, isLeft = true) - } - - /** -* Process right stream records -* -* @param valueC The input value. -* @param ctxThe ctx to register timer or get current time -* @param outThe collector for returning result values. -* -*/ - override def processElement2( - valueC: CRow, - ctx: CoProcessFunction[CRow, CRow, CRow]#Context, - out: Collector[CRow]): Unit = { - -processElement(valueC, ctx, out, rightTimer, rightState, leftState, isLeft = false) - } - - - /** -* Called when a processing timer trigger. -* Expire left/right records which are expired in left and right state. -* -* @param timestamp The timestamp of the firing timer. -* @param ctx The ctx to register time
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16400162#comment-16400162 ] ASF GitHub Bot commented on FLINK-8428: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5327 Thanks @hequn8128! We're pretty busy with the Flink 1.5 release right now. This will be one of the first features to add once 1.5 is out! Best, Fabian > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16391333#comment-16391333 ] ASF GitHub Bot commented on FLINK-8428: --- Github user hequn8128 commented on the issue: https://github.com/apache/flink/pull/5327 Hi @twalthr @walterddr The latest update refactors interfaces and functions to make code more friendly to right/full joins. The code of right/full joins are also ready and can be reached from https://github.com/hequn8128/flink/tree/outerjoin (branch:outerjoin). @fhueske It would be great if you can also take a look. Thanks all. Hequn > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16387274#comment-16387274 ] ASF GitHub Bot commented on FLINK-8428: --- Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r172408402 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoinWithNonEquiPredicates.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Only use for LeftJoin with NonEquiPredicates. + * An MapState of type [Row, Long] is added to record how many rows from the right table can be + * matched for each left row. Left join without NonEquiPredicates doesn't need it because + * left rows can always join right rows as long as join keys are same. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param queryConfig the configuration for the query to generate + */ +class NonWindowLeftJoinWithNonEquiPredicates( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all field from right will be null + private var resultRow: Row = _ + // how many matched rows from the right table for each left row + private var leftJoinCnt: MapState[Row, Long] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +val leftJoinCntDescriptor = new MapStateDescriptor[Row, Long]( --- End diff -- I think either is fine as long as they are consistent. > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16387275#comment-16387275 ] ASF GitHub Bot commented on FLINK-8428: --- Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r172408177 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala --- @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state.{MapState, MapStateDescriptor, ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.typeutils.TypeCheckUtils._ +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream-stream non-window Join. + * + * @param leftType the input type of left stream + * @param rightType the input type of right stream + * @param resultTypethe output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +queryConfig: StreamQueryConfig) + extends CoProcessFunction[CRow, CRow, CRow] + with Compiler[FlatJoinFunction[Row, Row, Row]] + with Logging { + + // check if input types implement proper equals/hashCode + validateEqualsHashCode("join", leftType) + validateEqualsHashCode("join", rightType) + + // state to hold left stream element + protected var leftState: MapState[Row, JTuple2[Int, Long]] = _ + // state to hold right stream element + protected var rightState: MapState[Row, JTuple2[Int, Long]] = _ + protected var cRowWrapper: CRowWrappingMultiOutputCollector = _ + + protected val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + protected val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + protected val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + // state to record last timer of left stream, 0 means no timer + protected var leftTimer: ValueState[Long] = _ + // state to record last timer of right stream, 0 means no timer + protected var rightTimer: ValueState[Long] = _ + + // other condition function + protected var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + override def open(parameters: Configuration): Unit = { +LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " + +s"Code:\n$genJoinFuncCode") +val clazz = compile( + getRuntimeContext.getUserCodeClassLoader, + genJoinFuncName, + genJoinFuncCode) +LOG.debug("Instantiating JoinFunction.") +joinFunction = clazz.newInstance() + +// initialize left and right state, the first element of tuple2 indicates how many rows of +// this row, while the second element represents the expired t
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16380481#comment-16380481 ] ASF GitHub Bot commented on FLINK-8428: --- Github user hequn8128 commented on the issue: https://github.com/apache/flink/pull/5327 Update pr according to @walterddr 's suggestions. > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16378653#comment-16378653 ] ASF GitHub Bot commented on FLINK-8428: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r170936813 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoinWithNonEquiPredicates.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Only use for LeftJoin with NonEquiPredicates. + * An MapState of type [Row, Long] is added to record how many rows from the right table can be + * matched for each left row. Left join without NonEquiPredicates doesn't need it because + * left rows can always join right rows as long as join keys are same. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param queryConfig the configuration for the query to generate + */ +class NonWindowLeftJoinWithNonEquiPredicates( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all field from right will be null + private var resultRow: Row = _ + // how many matched rows from the right table for each left row + private var leftJoinCnt: MapState[Row, Long] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +val leftJoinCntDescriptor = new MapStateDescriptor[Row, Long]( --- End diff -- Long is more safe. I will change all count type to Long. What do you think? > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16378654#comment-16378654 ] ASF GitHub Bot commented on FLINK-8428: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r170936767 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoin.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Only use for LeftJoin without NonEquiPredicates. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param queryConfig the configuration for the query to generate + */ +class NonWindowLeftJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null + private var resultRow: Row = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) +resultRow = new Row(resultType.getArity) +LOG.debug("Instantiating NonWindowLeftJoin.") + } + + /** +* Puts or Retract an element from the input stream into state and search the other state to +* output records meet the condition. The result is NULL from the right side, if there is no +* match. Records will be expired in state if state retention time has been specified. +*/ + override def processElement( + value: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow], + timerState: ValueState[Long], + currentSideState: MapState[Row, JTuple2[Int, Long]], + otherSideState: MapState[Row, JTuple2[Int, Long]], + isLeft: Boolean): Unit = { + +val inputRow = value.row +cRowWrapper.reset() +cRowWrapper.setCollector(out) +cRowWrapper.setChange(value.change) + +val curProcessTime = ctx.timerService.currentProcessingTime +val oldCntAndExpiredTime = currentSideState.get(inputRow) +val cntAndExpiredTime = if (null == oldCntAndExpiredTime) { + JTuple2.of(0, -1L) +} else { + oldCntAndExpiredTime +} + +cntAndExpiredTime.f1 = getNewExpiredTime(curProcessTime, cntAndExpiredTime.f1) +if (stateCleaningEnabled && timerState.value() == 0) { + timerState.update(cntAndExpiredTime.f1) + ctx.timerService().registerProcessingTimeTimer(cntAndExpiredTime.f1) +} + +// update current side stream state +if (!value.change) { + cntAndExpiredTime.f0 = cntAndExpiredTime.f0 - 1 + if (cntAndExpiredTime.f0 <= 0) { +currentSideState.remove(inputRow) + } else { +current
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16378652#comment-16378652 ] ASF GitHub Bot commented on FLINK-8428: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r170936660 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoin.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Only use for LeftJoin without NonEquiPredicates. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param queryConfig the configuration for the query to generate + */ +class NonWindowLeftJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null + private var resultRow: Row = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) +resultRow = new Row(resultType.getArity) +LOG.debug("Instantiating NonWindowLeftJoin.") + } + + /** +* Puts or Retract an element from the input stream into state and search the other state to +* output records meet the condition. The result is NULL from the right side, if there is no +* match. Records will be expired in state if state retention time has been specified. +*/ + override def processElement( + value: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow], + timerState: ValueState[Long], + currentSideState: MapState[Row, JTuple2[Int, Long]], + otherSideState: MapState[Row, JTuple2[Int, Long]], + isLeft: Boolean): Unit = { + +val inputRow = value.row +cRowWrapper.reset() +cRowWrapper.setCollector(out) +cRowWrapper.setChange(value.change) + +val curProcessTime = ctx.timerService.currentProcessingTime +val oldCntAndExpiredTime = currentSideState.get(inputRow) +val cntAndExpiredTime = if (null == oldCntAndExpiredTime) { + JTuple2.of(0, -1L) +} else { + oldCntAndExpiredTime +} + +cntAndExpiredTime.f1 = getNewExpiredTime(curProcessTime, cntAndExpiredTime.f1) +if (stateCleaningEnabled && timerState.value() == 0) { + timerState.update(cntAndExpiredTime.f1) + ctx.timerService().registerProcessingTimeTimer(cntAndExpiredTime.f1) +} + +// update current side stream state +if (!value.change) { + cntAndExpiredTime.f0 = cntAndExpiredTime.f0 - 1 + if (cntAndExpiredTime.f0 <= 0) { +currentSideState.remove(inputRow) + } else { +current
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16378649#comment-16378649 ] ASF GitHub Bot commented on FLINK-8428: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r170936524 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala --- @@ -149,45 +148,39 @@ object UpdatingPlanChecker { } case j: DataStreamJoin => - val joinType = j.getJoinType - joinType match { -case JoinRelType.INNER => - // get key(s) for inner join - val lInKeys = visit(j.getLeft) - val rInKeys = visit(j.getRight) - if (lInKeys.isEmpty || rInKeys.isEmpty) { -None - } else { -// Output of inner join must have keys if left and right both contain key(s). -// Key groups from both side will be merged by join equi-predicates -val lInNames: Seq[String] = j.getLeft.getRowType.getFieldNames -val rInNames: Seq[String] = j.getRight.getRowType.getFieldNames -val joinNames = j.getRowType.getFieldNames - -// if right field names equal to left field names, calcite will rename right -// field names. For example, T1(pk, a) join T2(pk, b), calcite will rename T2(pk, b) -// to T2(pk0, b). -val rInNamesToJoinNamesMap = rInNames - .zip(joinNames.subList(lInNames.size, joinNames.length)) - .toMap + // get key(s) for inner join + val lInKeys = visit(j.getLeft) + val rInKeys = visit(j.getRight) + if (lInKeys.isEmpty || rInKeys.isEmpty) { +None + } else { +// Output of inner join must have keys if left and right both contain key(s). --- End diff -- Yes, thank you > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16378651#comment-16378651 ] ASF GitHub Bot commented on FLINK-8428: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r170936833 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoinWithNonEquiPredicates.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Only use for LeftJoin with NonEquiPredicates. + * An MapState of type [Row, Long] is added to record how many rows from the right table can be + * matched for each left row. Left join without NonEquiPredicates doesn't need it because + * left rows can always join right rows as long as join keys are same. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param queryConfig the configuration for the query to generate + */ +class NonWindowLeftJoinWithNonEquiPredicates( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all field from right will be null + private var resultRow: Row = _ + // how many matched rows from the right table for each left row + private var leftJoinCnt: MapState[Row, Long] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +val leftJoinCntDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +leftJoinCnt = getRuntimeContext.getMapState(leftJoinCntDescriptor) +resultRow = new Row(resultType.getArity) + +LOG.debug("Instantiating NonWindowLeftJoinWithNonEquiPredicates.") + } + + /** +* Puts or Retract an element from the input stream into state and search the other state to +* output records meet the condition. The result is NULL from the right side, if there is no +* match. Records will be expired in state if state retention time has been specified. +*/ + override def processElement( + value: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow], + timerState: ValueState[Long], + currentSideState: MapState[Row, JTuple2[Int, Long]], + otherSideState: MapState[Row, JTuple2[Int, Long]], + isLeft: Boolean): Unit = { + +val inputRow = value.row +cRowWrapper.reset() +cRowWrapper.setCollector(out) +cRowWrapper.setChange(value.change) + +val curProcessTime = ctx.timerService.currentProcessingTime +val oldCntAndExpiredTime = currentSideState.get(inputRow) +val cn
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16378650#comment-16378650 ] ASF GitHub Bot commented on FLINK-8428: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r170936623 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala --- @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state.{MapState, MapStateDescriptor, ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.typeutils.TypeCheckUtils._ +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream-stream non-window Join. + * + * @param leftType the input type of left stream + * @param rightType the input type of right stream + * @param resultTypethe output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +queryConfig: StreamQueryConfig) + extends CoProcessFunction[CRow, CRow, CRow] + with Compiler[FlatJoinFunction[Row, Row, Row]] + with Logging { + + // check if input types implement proper equals/hashCode + validateEqualsHashCode("join", leftType) + validateEqualsHashCode("join", rightType) + + // state to hold left stream element + protected var leftState: MapState[Row, JTuple2[Int, Long]] = _ + // state to hold right stream element + protected var rightState: MapState[Row, JTuple2[Int, Long]] = _ + protected var cRowWrapper: CRowWrappingMultiOutputCollector = _ + + protected val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + protected val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + protected val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + // state to record last timer of left stream, 0 means no timer + protected var leftTimer: ValueState[Long] = _ + // state to record last timer of right stream, 0 means no timer + protected var rightTimer: ValueState[Long] = _ + + // other condition function + protected var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + override def open(parameters: Configuration): Unit = { +LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " + +s"Code:\n$genJoinFuncCode") +val clazz = compile( + getRuntimeContext.getUserCodeClassLoader, + genJoinFuncName, + genJoinFuncCode) +LOG.debug("Instantiating JoinFunction.") +joinFunction = clazz.newInstance() + +// initialize left and right state, the first element of tuple2 indicates how many rows of +// this row, while the second element represents the expired t
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16377007#comment-16377007 ] ASF GitHub Bot commented on FLINK-8428: --- Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r170610792 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala --- @@ -977,6 +922,254 @@ class JoinITCase extends StreamingWithStateTestBase { expected.add("D,R-8,null") StreamITCase.compareWithList(expected) } + + /** test non-window inner join **/ + @Test + def testNonWindowInnerJoin(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStateBackend(getStateBackend) +StreamITCase.clear + +val data1 = new mutable.MutableList[(Int, Long, String)] +data1.+=((1, 1L, "Hi1")) +data1.+=((1, 2L, "Hi2")) +data1.+=((1, 2L, "Hi2")) +data1.+=((1, 5L, "Hi3")) +data1.+=((2, 7L, "Hi5")) +data1.+=((1, 9L, "Hi6")) +data1.+=((1, 8L, "Hi8")) +data1.+=((3, 8L, "Hi9")) + +val data2 = new mutable.MutableList[(Int, Long, String)] +data2.+=((1, 1L, "HiHi")) +data2.+=((2, 2L, "HeHe")) +data2.+=((3, 2L, "HeHe")) + +val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c) + .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c) +val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c) + .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c) + +tEnv.registerTable("T1", t1) +tEnv.registerTable("T2", t2) + +val sqlQuery = + """ +|SELECT t2.a, t2.c, t1.c +|FROM T1 as t1 JOIN T2 as t2 ON +| t1.a = t2.a AND +| t1.b > t2.b +|""".stripMargin + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = mutable.MutableList( + "1,HiHi,Hi2", + "1,HiHi,Hi2", + "1,HiHi,Hi3", + "1,HiHi,Hi6", + "1,HiHi,Hi8", + "2,HeHe,Hi5", + "null,HeHe,Hi9") + +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testJoin(): Unit = { --- End diff -- Can this be more specific? like, inner equality join > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16377010#comment-16377010 ] ASF GitHub Bot commented on FLINK-8428: --- Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r170618769 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoinWithNonEquiPredicates.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Only use for LeftJoin with NonEquiPredicates. + * An MapState of type [Row, Long] is added to record how many rows from the right table can be + * matched for each left row. Left join without NonEquiPredicates doesn't need it because + * left rows can always join right rows as long as join keys are same. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param queryConfig the configuration for the query to generate + */ +class NonWindowLeftJoinWithNonEquiPredicates( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all field from right will be null + private var resultRow: Row = _ + // how many matched rows from the right table for each left row + private var leftJoinCnt: MapState[Row, Long] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +val leftJoinCntDescriptor = new MapStateDescriptor[Row, Long]( --- End diff -- probably use `[Row, Int]`, to match with the type for count in `LeftSideState` and `RightSideState`? > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16377003#comment-16377003 ] ASF GitHub Bot commented on FLINK-8428: --- Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r170608444 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoin.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Only use for LeftJoin without NonEquiPredicates. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param queryConfig the configuration for the query to generate + */ +class NonWindowLeftJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null + private var resultRow: Row = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) +resultRow = new Row(resultType.getArity) +LOG.debug("Instantiating NonWindowLeftJoin.") + } + + /** +* Puts or Retract an element from the input stream into state and search the other state to +* output records meet the condition. The result is NULL from the right side, if there is no +* match. Records will be expired in state if state retention time has been specified. +*/ + override def processElement( + value: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow], + timerState: ValueState[Long], + currentSideState: MapState[Row, JTuple2[Int, Long]], + otherSideState: MapState[Row, JTuple2[Int, Long]], + isLeft: Boolean): Unit = { + +val inputRow = value.row +cRowWrapper.reset() +cRowWrapper.setCollector(out) +cRowWrapper.setChange(value.change) + +val curProcessTime = ctx.timerService.currentProcessingTime +val oldCntAndExpiredTime = currentSideState.get(inputRow) +val cntAndExpiredTime = if (null == oldCntAndExpiredTime) { + JTuple2.of(0, -1L) +} else { + oldCntAndExpiredTime +} + +cntAndExpiredTime.f1 = getNewExpiredTime(curProcessTime, cntAndExpiredTime.f1) +if (stateCleaningEnabled && timerState.value() == 0) { + timerState.update(cntAndExpiredTime.f1) + ctx.timerService().registerProcessingTimeTimer(cntAndExpiredTime.f1) +} + +// update current side stream state +if (!value.change) { + cntAndExpiredTime.f0 = cntAndExpiredTime.f0 - 1 + if (cntAndExpiredTime.f0 <= 0) { +currentSideState.remove(inputRow) + } else { +current
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16377008#comment-16377008 ] ASF GitHub Bot commented on FLINK-8428: --- Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r170619950 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala --- @@ -985,4 +1017,232 @@ class JoinHarnessTest extends HarnessTestBase { testHarness.close() } + + @Test + def testNonWindowLeftJoinWithOutNonEqualPred() { --- End diff -- `WithOut` ==> `Without` > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16377005#comment-16377005 ] ASF GitHub Bot commented on FLINK-8428: --- Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r170610689 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala --- @@ -201,18 +202,294 @@ class JoinITCase extends StreamingWithStateTestBase { // Proctime window output uncertain results, so assert has been ignored here. } + @Test + def testJoin(): Unit = { --- End diff -- Can this be more specific? like, inner equality join > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16377004#comment-16377004 ] ASF GitHub Bot commented on FLINK-8428: --- Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r170613883 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoin.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Only use for LeftJoin without NonEquiPredicates. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param queryConfig the configuration for the query to generate + */ +class NonWindowLeftJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null + private var resultRow: Row = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) +resultRow = new Row(resultType.getArity) +LOG.debug("Instantiating NonWindowLeftJoin.") + } + + /** +* Puts or Retract an element from the input stream into state and search the other state to +* output records meet the condition. The result is NULL from the right side, if there is no +* match. Records will be expired in state if state retention time has been specified. +*/ + override def processElement( + value: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow], + timerState: ValueState[Long], + currentSideState: MapState[Row, JTuple2[Int, Long]], + otherSideState: MapState[Row, JTuple2[Int, Long]], + isLeft: Boolean): Unit = { + +val inputRow = value.row +cRowWrapper.reset() +cRowWrapper.setCollector(out) +cRowWrapper.setChange(value.change) + +val curProcessTime = ctx.timerService.currentProcessingTime +val oldCntAndExpiredTime = currentSideState.get(inputRow) +val cntAndExpiredTime = if (null == oldCntAndExpiredTime) { + JTuple2.of(0, -1L) +} else { + oldCntAndExpiredTime +} + +cntAndExpiredTime.f1 = getNewExpiredTime(curProcessTime, cntAndExpiredTime.f1) +if (stateCleaningEnabled && timerState.value() == 0) { + timerState.update(cntAndExpiredTime.f1) + ctx.timerService().registerProcessingTimeTimer(cntAndExpiredTime.f1) +} + +// update current side stream state +if (!value.change) { + cntAndExpiredTime.f0 = cntAndExpiredTime.f0 - 1 + if (cntAndExpiredTime.f0 <= 0) { +currentSideState.remove(inputRow) + } else { +current
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16377009#comment-16377009 ] ASF GitHub Bot commented on FLINK-8428: --- Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r170613748 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoin.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Only use for LeftJoin without NonEquiPredicates. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param queryConfig the configuration for the query to generate + */ +class NonWindowLeftJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null + private var resultRow: Row = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) +resultRow = new Row(resultType.getArity) +LOG.debug("Instantiating NonWindowLeftJoin.") + } + + /** +* Puts or Retract an element from the input stream into state and search the other state to +* output records meet the condition. The result is NULL from the right side, if there is no +* match. Records will be expired in state if state retention time has been specified. +*/ + override def processElement( + value: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow], + timerState: ValueState[Long], + currentSideState: MapState[Row, JTuple2[Int, Long]], + otherSideState: MapState[Row, JTuple2[Int, Long]], + isLeft: Boolean): Unit = { + +val inputRow = value.row +cRowWrapper.reset() +cRowWrapper.setCollector(out) +cRowWrapper.setChange(value.change) + +val curProcessTime = ctx.timerService.currentProcessingTime +val oldCntAndExpiredTime = currentSideState.get(inputRow) +val cntAndExpiredTime = if (null == oldCntAndExpiredTime) { + JTuple2.of(0, -1L) +} else { + oldCntAndExpiredTime +} + +cntAndExpiredTime.f1 = getNewExpiredTime(curProcessTime, cntAndExpiredTime.f1) +if (stateCleaningEnabled && timerState.value() == 0) { + timerState.update(cntAndExpiredTime.f1) + ctx.timerService().registerProcessingTimeTimer(cntAndExpiredTime.f1) +} + +// update current side stream state +if (!value.change) { + cntAndExpiredTime.f0 = cntAndExpiredTime.f0 - 1 + if (cntAndExpiredTime.f0 <= 0) { +currentSideState.remove(inputRow) + } else { +current
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16377006#comment-16377006 ] ASF GitHub Bot commented on FLINK-8428: --- Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r170617462 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoinWithNonEquiPredicates.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Only use for LeftJoin with NonEquiPredicates. + * An MapState of type [Row, Long] is added to record how many rows from the right table can be + * matched for each left row. Left join without NonEquiPredicates doesn't need it because + * left rows can always join right rows as long as join keys are same. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param queryConfig the configuration for the query to generate + */ +class NonWindowLeftJoinWithNonEquiPredicates( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all field from right will be null + private var resultRow: Row = _ + // how many matched rows from the right table for each left row + private var leftJoinCnt: MapState[Row, Long] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +val leftJoinCntDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +leftJoinCnt = getRuntimeContext.getMapState(leftJoinCntDescriptor) +resultRow = new Row(resultType.getArity) + +LOG.debug("Instantiating NonWindowLeftJoinWithNonEquiPredicates.") + } + + /** +* Puts or Retract an element from the input stream into state and search the other state to +* output records meet the condition. The result is NULL from the right side, if there is no +* match. Records will be expired in state if state retention time has been specified. +*/ + override def processElement( + value: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow], + timerState: ValueState[Long], + currentSideState: MapState[Row, JTuple2[Int, Long]], + otherSideState: MapState[Row, JTuple2[Int, Long]], + isLeft: Boolean): Unit = { + +val inputRow = value.row +cRowWrapper.reset() +cRowWrapper.setCollector(out) +cRowWrapper.setChange(value.change) + +val curProcessTime = ctx.timerService.currentProcessingTime +val oldCntAndExpiredTime = currentSideState.get(inputRow) +val cn
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16377011#comment-16377011 ] ASF GitHub Bot commented on FLINK-8428: --- Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r170296184 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala --- @@ -149,45 +148,39 @@ object UpdatingPlanChecker { } case j: DataStreamJoin => - val joinType = j.getJoinType - joinType match { -case JoinRelType.INNER => - // get key(s) for inner join - val lInKeys = visit(j.getLeft) - val rInKeys = visit(j.getRight) - if (lInKeys.isEmpty || rInKeys.isEmpty) { -None - } else { -// Output of inner join must have keys if left and right both contain key(s). -// Key groups from both side will be merged by join equi-predicates -val lInNames: Seq[String] = j.getLeft.getRowType.getFieldNames -val rInNames: Seq[String] = j.getRight.getRowType.getFieldNames -val joinNames = j.getRowType.getFieldNames - -// if right field names equal to left field names, calcite will rename right -// field names. For example, T1(pk, a) join T2(pk, b), calcite will rename T2(pk, b) -// to T2(pk0, b). -val rInNamesToJoinNamesMap = rInNames - .zip(joinNames.subList(lInNames.size, joinNames.length)) - .toMap + // get key(s) for inner join + val lInKeys = visit(j.getLeft) + val rInKeys = visit(j.getRight) + if (lInKeys.isEmpty || rInKeys.isEmpty) { +None + } else { +// Output of inner join must have keys if left and right both contain key(s). --- End diff -- remove "inner"? > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16377002#comment-16377002 ] ASF GitHub Bot commented on FLINK-8428: --- Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r170298988 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala --- @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state.{MapState, MapStateDescriptor, ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.typeutils.TypeCheckUtils._ +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream-stream non-window Join. + * + * @param leftType the input type of left stream + * @param rightType the input type of right stream + * @param resultTypethe output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +queryConfig: StreamQueryConfig) + extends CoProcessFunction[CRow, CRow, CRow] + with Compiler[FlatJoinFunction[Row, Row, Row]] + with Logging { + + // check if input types implement proper equals/hashCode + validateEqualsHashCode("join", leftType) + validateEqualsHashCode("join", rightType) + + // state to hold left stream element + protected var leftState: MapState[Row, JTuple2[Int, Long]] = _ + // state to hold right stream element + protected var rightState: MapState[Row, JTuple2[Int, Long]] = _ + protected var cRowWrapper: CRowWrappingMultiOutputCollector = _ + + protected val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + protected val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + protected val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + // state to record last timer of left stream, 0 means no timer + protected var leftTimer: ValueState[Long] = _ + // state to record last timer of right stream, 0 means no timer + protected var rightTimer: ValueState[Long] = _ + + // other condition function + protected var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + override def open(parameters: Configuration): Unit = { +LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " + +s"Code:\n$genJoinFuncCode") +val clazz = compile( + getRuntimeContext.getUserCodeClassLoader, + genJoinFuncName, + genJoinFuncCode) +LOG.debug("Instantiating JoinFunction.") +joinFunction = clazz.newInstance() + +// initialize left and right state, the first element of tuple2 indicates how many rows of +// this row, while the second element represents the expired t
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16377012#comment-16377012 ] ASF GitHub Bot commented on FLINK-8428: --- Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r170601596 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoin.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Only use for LeftJoin without NonEquiPredicates. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param queryConfig the configuration for the query to generate + */ +class NonWindowLeftJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null + private var resultRow: Row = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) +resultRow = new Row(resultType.getArity) +LOG.debug("Instantiating NonWindowLeftJoin.") + } + + /** +* Puts or Retract an element from the input stream into state and search the other state to +* output records meet the condition. The result is NULL from the right side, if there is no +* match. Records will be expired in state if state retention time has been specified. +*/ + override def processElement( + value: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow], + timerState: ValueState[Long], + currentSideState: MapState[Row, JTuple2[Int, Long]], + otherSideState: MapState[Row, JTuple2[Int, Long]], + isLeft: Boolean): Unit = { + +val inputRow = value.row +cRowWrapper.reset() +cRowWrapper.setCollector(out) +cRowWrapper.setChange(value.change) + +val curProcessTime = ctx.timerService.currentProcessingTime +val oldCntAndExpiredTime = currentSideState.get(inputRow) +val cntAndExpiredTime = if (null == oldCntAndExpiredTime) { + JTuple2.of(0, -1L) +} else { + oldCntAndExpiredTime +} + +cntAndExpiredTime.f1 = getNewExpiredTime(curProcessTime, cntAndExpiredTime.f1) +if (stateCleaningEnabled && timerState.value() == 0) { + timerState.update(cntAndExpiredTime.f1) + ctx.timerService().registerProcessingTimeTimer(cntAndExpiredTime.f1) +} + +// update current side stream state +if (!value.change) { + cntAndExpiredTime.f0 = cntAndExpiredTime.f0 - 1 + if (cntAndExpiredTime.f0 <= 0) { +currentSideState.remove(inputRow) + } else { +current
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16375497#comment-16375497 ] ASF GitHub Bot commented on FLINK-8428: --- Github user hequn8128 commented on the issue: https://github.com/apache/flink/pull/5327 hi, @twalthr Look forward to your review, thanks :-) > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346805#comment-16346805 ] ASF GitHub Bot commented on FLINK-8428: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/5327 Thanks for the reminder @hequn8128. I will review it in the next 2 weeks. If not, feel free to ping me again. > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16344587#comment-16344587 ] ASF GitHub Bot commented on FLINK-8428: --- Github user hequn8128 commented on the issue: https://github.com/apache/flink/pull/5327 Hi, @twalthr It would be great if you can take a look at the pr. I'm looking forward to finish outer join (left/right/full) before the end of March. Besides, there are a few PRs planed to optimize inner/outer joins. Thanks :) > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16333405#comment-16333405 ] ASF GitHub Bot commented on FLINK-8428: --- GitHub user hequn8128 opened a pull request: https://github.com/apache/flink/pull/5327 [FLINK-8428] [table] Implement stream-stream non-window left outer join ## What is the purpose of the change Implement stream-stream non-window left outer join for sql/table-api. A simple design doc can be found [here](https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing) ## Brief change log - Add left join - with non-equal predicates - without non-equal predicates - Adapt retraction rules to left join. Outer join will generate retractions - Adapt `UpsertTableSink`. Table mode of dynamic table produced by left join is Update Mode, even if the table does not include a key definition - Add inner join test cases which consistent with test cases in batch. - Add left join test cases which consistent with test cases in batch. ## Verifying this change This change added tests and can be verified as follows: - Added integration tests for left join with or without non-equal predicates. - Added HarnessTests left join with or without non-equal predicates. - Add tests for AccMode generate by left join. - Add tests for UpsertSink followed left join. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (already docs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/hequn8128/flink leftjoin Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5327.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5327 commit 2766de56d47e1a82f6605eb1dd80d8ea5e697a29 Author: hequn8128 Date: 2018-01-21T04:54:08Z [FLINK-8428] [table] Implement stream-stream non-window left outer join > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)