[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/ ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r184442107 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala --- @@ -50,6 +50,155 @@ class OverWindowITCase extends StreamingWithStateTestBase { (8L, 8, "Hello World"), (20L, 20, "Hello World")) + @Test + def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStateBackend(getStateBackend) +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.clear + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) +tEnv.registerTable("MyTable", t) + +val sqlQuery = "SELECT a, " + + " SUM(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW), " + + " MIN(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW), " + + " COLLECT(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW) " + + "FROM MyTable" + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = List( + "1,1,1,{1=1}", + "2,2,2,{2=1}", + "2,3,1,{1=1, 2=1}", + "3,2,2,{2=1}", + "3,2,2,{2=1}", + "3,5,2,{2=1, 3=1}", + "4,2,2,{2=1}", + "4,3,1,{1=1, 2=1}", + "4,3,1,{1=1, 2=1}", + "4,3,1,{1=1, 2=1}", + "5,1,1,{1=1}", + "5,4,1,{1=1, 3=1}", + "5,4,1,{1=1, 3=1}", + "5,6,1,{1=1, 2=1, 3=1}", + "5,5,2,{2=1, 3=1}") +assertEquals(expected, StreamITCase.testResults) + } + + @Test + def testProcTimeDistinctUnboundedPartitionedRowsOver(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStateBackend(getStateBackend) +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.clear + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) +tEnv.registerTable("MyTable", t) + +val sqlQuery = "SELECT a, " + + " COUNT(e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " + + " SUM(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " + + " MIN(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding) " + + "FROM MyTable" + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = List( + "1,1,1,1", + "2,1,2,2", + "2,2,3,1", + "3,1,2,2", + "3,2,2,2", + "3,3,5,2", + "4,1,2,2", + "4,2,3,1", + "4,3,3,1", + "4,4,3,1", + "5,1,1,1", + "5,2,4,1", + "5,3,4,1", + "5,4,6,1", + "5,5,6,1") +assertEquals(expected, StreamITCase.testResults) + } + + @Test + def testRowTimeDistinctBoundedNonPartitionedRowsOver(): Unit = { +// use out-of-order data to test distinct accumulator remove +val data = Seq( + Left((2L, (2L, 2, "Hello"))), + Left((2L, (2L, 2, "Hello"))), + Left((1L, (1L, 1, "Hello"))), + Left((1L, (1L, 1, "Hello"))), + Left((2L, (2L, 2, "Hello"))), + Left((1L, (1L, 1, "Hello"))), + Left((20L, (20L, 20, "Hello World"))), // early row + Right(3L), + Left((2L, (2L, 2, "Hello"))), // late row + Left((3L, (3L, 3, "Hello"))), + Left((4L, (4L, 4, "Hello"))), + Left((5L, (5L, 5, "Hello"))), + Left((6L, (6L, 6, "Hello"))), + Left((7L, (7L, 7, "Hello World"))), + Right(7L), + Left((9L, (9L, 9, "Hello World"))), + Left((8L, (8L, 8, "Hello World"))), + Left((8L, (8L, 8, "Hello World"))), + Right(20L)) + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +env.setStateBackend(getStateBackend) +env.setParallelism(1) --- End diff -- By default the LocalExecEnv chooses the parallelism is picked based on the number CPU cores. Are you running the tests in some kind of container? ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r184441869 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAccumulator.scala --- @@ -47,60 +51,51 @@ class DistinctAccumulator[E, ACC](var realAcc: ACC, var mapView: MapView[E, JLon override def equals(that: Any): Boolean = that match { case that: DistinctAccumulator[E, ACC] => that.canEqual(this) && -this.mapView == that.mapView +this.distinctValueMap == that.distinctValueMap case _ => false } def add(element: E): Boolean = { -if (element != null) { - val currentVal = mapView.get(element) - if (currentVal != null) { -mapView.put(element, currentVal + 1L) -false - } else { -mapView.put(element, 1L) -true - } -} else { +val wrappedElement = Row.of(element) --- End diff -- Thanks @fhueske for the insight. yeah I thought about that before the last commit but didn't go through with it, since we still need to construct the row of single element before passing to the distinct accumulator. But you are right, it will make future optimization easier ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r184419816 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAccumulator.scala --- @@ -47,60 +51,51 @@ class DistinctAccumulator[E, ACC](var realAcc: ACC, var mapView: MapView[E, JLon override def equals(that: Any): Boolean = that match { case that: DistinctAccumulator[E, ACC] => that.canEqual(this) && -this.mapView == that.mapView +this.distinctValueMap == that.distinctValueMap case _ => false } def add(element: E): Boolean = { -if (element != null) { - val currentVal = mapView.get(element) - if (currentVal != null) { -mapView.put(element, currentVal + 1L) -false - } else { -mapView.put(element, 1L) -true - } -} else { +val wrappedElement = Row.of(element) --- End diff -- I think we should remove the `E` type parameter and directly pass the `Row` as an argument. That will also make the extension to multiple arguments very easy. Actually, I think I'll do that before merging ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r184429742 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala --- @@ -50,6 +50,155 @@ class OverWindowITCase extends StreamingWithStateTestBase { (8L, 8, "Hello World"), (20L, 20, "Hello World")) + @Test + def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStateBackend(getStateBackend) +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.clear + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) +tEnv.registerTable("MyTable", t) + +val sqlQuery = "SELECT a, " + + " SUM(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW), " + + " MIN(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW), " + + " COLLECT(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW) " + + "FROM MyTable" + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = List( + "1,1,1,{1=1}", + "2,2,2,{2=1}", + "2,3,1,{1=1, 2=1}", + "3,2,2,{2=1}", + "3,2,2,{2=1}", + "3,5,2,{2=1, 3=1}", + "4,2,2,{2=1}", + "4,3,1,{1=1, 2=1}", + "4,3,1,{1=1, 2=1}", + "4,3,1,{1=1, 2=1}", + "5,1,1,{1=1}", + "5,4,1,{1=1, 3=1}", + "5,4,1,{1=1, 3=1}", + "5,6,1,{1=1, 2=1, 3=1}", + "5,5,2,{2=1, 3=1}") +assertEquals(expected, StreamITCase.testResults) + } + + @Test + def testProcTimeDistinctUnboundedPartitionedRowsOver(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStateBackend(getStateBackend) +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.clear + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) +tEnv.registerTable("MyTable", t) + +val sqlQuery = "SELECT a, " + + " COUNT(e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " + + " SUM(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " + + " MIN(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding) " + + "FROM MyTable" + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = List( + "1,1,1,1", + "2,1,2,2", + "2,2,3,1", + "3,1,2,2", + "3,2,2,2", + "3,3,5,2", + "4,1,2,2", + "4,2,3,1", + "4,3,3,1", + "4,4,3,1", + "5,1,1,1", + "5,2,4,1", + "5,3,4,1", + "5,4,6,1", + "5,5,6,1") +assertEquals(expected, StreamITCase.testResults) + } + + @Test + def testRowTimeDistinctBoundedNonPartitionedRowsOver(): Unit = { +// use out-of-order data to test distinct accumulator remove +val data = Seq( + Left((2L, (2L, 2, "Hello"))), + Left((2L, (2L, 2, "Hello"))), + Left((1L, (1L, 1, "Hello"))), + Left((1L, (1L, 1, "Hello"))), + Left((2L, (2L, 2, "Hello"))), + Left((1L, (1L, 1, "Hello"))), + Left((20L, (20L, 20, "Hello World"))), // early row + Right(3L), + Left((2L, (2L, 2, "Hello"))), // late row + Left((3L, (3L, 3, "Hello"))), + Left((4L, (4L, 4, "Hello"))), + Left((5L, (5L, 5, "Hello"))), + Left((6L, (6L, 6, "Hello"))), + Left((7L, (7L, 7, "Hello World"))), + Right(7L), + Left((9L, (9L, 9, "Hello World"))), + Left((8L, (8L, 8, "Hello World"))), + Left((8L, (8L, 8, "Hello World"))), + Right(20L)) + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +env.setStateBackend(getStateBackend) +env.setParallelism(1) --- End diff -- hmm. I still don't understand why this ITCase failed, it runs fine with or without `parallelism(1)` for me. Is this Travis CI failure the indeterministic
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r184428065 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala --- @@ -50,6 +50,155 @@ class OverWindowITCase extends StreamingWithStateTestBase { (8L, 8, "Hello World"), (20L, 20, "Hello World")) + @Test + def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStateBackend(getStateBackend) +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.clear + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) +tEnv.registerTable("MyTable", t) + +val sqlQuery = "SELECT a, " + + " SUM(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW), " + + " MIN(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW), " + + " COLLECT(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW) " + + "FROM MyTable" + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = List( + "1,1,1,{1=1}", + "2,2,2,{2=1}", + "2,3,1,{1=1, 2=1}", + "3,2,2,{2=1}", + "3,2,2,{2=1}", + "3,5,2,{2=1, 3=1}", + "4,2,2,{2=1}", + "4,3,1,{1=1, 2=1}", + "4,3,1,{1=1, 2=1}", + "4,3,1,{1=1, 2=1}", + "5,1,1,{1=1}", + "5,4,1,{1=1, 3=1}", + "5,4,1,{1=1, 3=1}", + "5,6,1,{1=1, 2=1, 3=1}", + "5,5,2,{2=1, 3=1}") +assertEquals(expected, StreamITCase.testResults) + } + + @Test + def testProcTimeDistinctUnboundedPartitionedRowsOver(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStateBackend(getStateBackend) +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.clear + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) +tEnv.registerTable("MyTable", t) + +val sqlQuery = "SELECT a, " + + " COUNT(e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " + + " SUM(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " + + " MIN(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding) " + + "FROM MyTable" + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = List( + "1,1,1,1", + "2,1,2,2", + "2,2,3,1", + "3,1,2,2", + "3,2,2,2", + "3,3,5,2", + "4,1,2,2", + "4,2,3,1", + "4,3,3,1", + "4,4,3,1", + "5,1,1,1", + "5,2,4,1", + "5,3,4,1", + "5,4,6,1", + "5,5,6,1") +assertEquals(expected, StreamITCase.testResults) + } + + @Test + def testRowTimeDistinctBoundedNonPartitionedRowsOver(): Unit = { +// use out-of-order data to test distinct accumulator remove +val data = Seq( + Left((2L, (2L, 2, "Hello"))), + Left((2L, (2L, 2, "Hello"))), + Left((1L, (1L, 1, "Hello"))), + Left((1L, (1L, 1, "Hello"))), + Left((2L, (2L, 2, "Hello"))), + Left((1L, (1L, 1, "Hello"))), + Left((20L, (20L, 20, "Hello World"))), // early row + Right(3L), + Left((2L, (2L, 2, "Hello"))), // late row + Left((3L, (3L, 3, "Hello"))), + Left((4L, (4L, 4, "Hello"))), + Left((5L, (5L, 5, "Hello"))), + Left((6L, (6L, 6, "Hello"))), + Left((7L, (7L, 7, "Hello World"))), + Right(7L), + Left((9L, (9L, 9, "Hello World"))), + Left((8L, (8L, 8, "Hello World"))), + Left((8L, (8L, 8, "Hello World"))), + Right(20L)) + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +env.setStateBackend(getStateBackend) +env.setParallelism(1) --- End diff -- I think all other DISTINCT aggregates are tested against proctime so I was just trying to use this one to test out-of-order delivery, as well as test against
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r184405910 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala --- @@ -50,6 +50,155 @@ class OverWindowITCase extends StreamingWithStateTestBase { (8L, 8, "Hello World"), (20L, 20, "Hello World")) + @Test + def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStateBackend(getStateBackend) +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.clear + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) +tEnv.registerTable("MyTable", t) + +val sqlQuery = "SELECT a, " + + " SUM(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW), " + + " MIN(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW), " + + " COLLECT(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW) " + + "FROM MyTable" + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = List( + "1,1,1,{1=1}", + "2,2,2,{2=1}", + "2,3,1,{1=1, 2=1}", + "3,2,2,{2=1}", + "3,2,2,{2=1}", + "3,5,2,{2=1, 3=1}", + "4,2,2,{2=1}", + "4,3,1,{1=1, 2=1}", + "4,3,1,{1=1, 2=1}", + "4,3,1,{1=1, 2=1}", + "5,1,1,{1=1}", + "5,4,1,{1=1, 3=1}", + "5,4,1,{1=1, 3=1}", + "5,6,1,{1=1, 2=1, 3=1}", + "5,5,2,{2=1, 3=1}") +assertEquals(expected, StreamITCase.testResults) + } + + @Test + def testProcTimeDistinctUnboundedPartitionedRowsOver(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStateBackend(getStateBackend) +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.clear + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) +tEnv.registerTable("MyTable", t) + +val sqlQuery = "SELECT a, " + + " COUNT(e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " + + " SUM(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " + + " MIN(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding) " + + "FROM MyTable" + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = List( + "1,1,1,1", + "2,1,2,2", + "2,2,3,1", + "3,1,2,2", + "3,2,2,2", + "3,3,5,2", + "4,1,2,2", + "4,2,3,1", + "4,3,3,1", + "4,4,3,1", + "5,1,1,1", + "5,2,4,1", + "5,3,4,1", + "5,4,6,1", + "5,5,6,1") +assertEquals(expected, StreamITCase.testResults) + } + + @Test + def testRowTimeDistinctBoundedNonPartitionedRowsOver(): Unit = { +// use out-of-order data to test distinct accumulator remove +val data = Seq( + Left((2L, (2L, 2, "Hello"))), + Left((2L, (2L, 2, "Hello"))), + Left((1L, (1L, 1, "Hello"))), + Left((1L, (1L, 1, "Hello"))), + Left((2L, (2L, 2, "Hello"))), + Left((1L, (1L, 1, "Hello"))), + Left((20L, (20L, 20, "Hello World"))), // early row + Right(3L), + Left((2L, (2L, 2, "Hello"))), // late row + Left((3L, (3L, 3, "Hello"))), + Left((4L, (4L, 4, "Hello"))), + Left((5L, (5L, 5, "Hello"))), + Left((6L, (6L, 6, "Hello"))), + Left((7L, (7L, 7, "Hello World"))), + Right(7L), + Left((9L, (9L, 9, "Hello World"))), + Left((8L, (8L, 8, "Hello World"))), + Left((8L, (8L, 8, "Hello World"))), + Right(20L)) + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +env.setStateBackend(getStateBackend) +env.setParallelism(1) --- End diff -- Also, the event-time test is not required. We test the retract case also with BOUNDED OVER windows (rows the fall out of the window are retracted). ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r184405329 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala --- @@ -50,6 +50,155 @@ class OverWindowITCase extends StreamingWithStateTestBase { (8L, 8, "Hello World"), (20L, 20, "Hello World")) + @Test + def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStateBackend(getStateBackend) +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.clear + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) +tEnv.registerTable("MyTable", t) + +val sqlQuery = "SELECT a, " + + " SUM(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW), " + + " MIN(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW), " + + " COLLECT(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW) " + + "FROM MyTable" + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = List( + "1,1,1,{1=1}", + "2,2,2,{2=1}", + "2,3,1,{1=1, 2=1}", + "3,2,2,{2=1}", + "3,2,2,{2=1}", + "3,5,2,{2=1, 3=1}", + "4,2,2,{2=1}", + "4,3,1,{1=1, 2=1}", + "4,3,1,{1=1, 2=1}", + "4,3,1,{1=1, 2=1}", + "5,1,1,{1=1}", + "5,4,1,{1=1, 3=1}", + "5,4,1,{1=1, 3=1}", + "5,6,1,{1=1, 2=1, 3=1}", + "5,5,2,{2=1, 3=1}") +assertEquals(expected, StreamITCase.testResults) + } + + @Test + def testProcTimeDistinctUnboundedPartitionedRowsOver(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStateBackend(getStateBackend) +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.clear + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) +tEnv.registerTable("MyTable", t) + +val sqlQuery = "SELECT a, " + + " COUNT(e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " + + " SUM(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " + + " MIN(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding) " + + "FROM MyTable" + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = List( + "1,1,1,1", + "2,1,2,2", + "2,2,3,1", + "3,1,2,2", + "3,2,2,2", + "3,3,5,2", + "4,1,2,2", + "4,2,3,1", + "4,3,3,1", + "4,4,3,1", + "5,1,1,1", + "5,2,4,1", + "5,3,4,1", + "5,4,6,1", + "5,5,6,1") +assertEquals(expected, StreamITCase.testResults) + } + + @Test + def testRowTimeDistinctBoundedNonPartitionedRowsOver(): Unit = { +// use out-of-order data to test distinct accumulator remove +val data = Seq( + Left((2L, (2L, 2, "Hello"))), + Left((2L, (2L, 2, "Hello"))), + Left((1L, (1L, 1, "Hello"))), + Left((1L, (1L, 1, "Hello"))), + Left((2L, (2L, 2, "Hello"))), + Left((1L, (1L, 1, "Hello"))), + Left((20L, (20L, 20, "Hello World"))), // early row + Right(3L), + Left((2L, (2L, 2, "Hello"))), // late row + Left((3L, (3L, 3, "Hello"))), + Left((4L, (4L, 4, "Hello"))), + Left((5L, (5L, 5, "Hello"))), + Left((6L, (6L, 6, "Hello"))), + Left((7L, (7L, 7, "Hello World"))), + Right(7L), + Left((9L, (9L, 9, "Hello World"))), + Left((8L, (8L, 8, "Hello World"))), + Left((8L, (8L, 8, "Hello World"))), + Right(20L)) + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +env.setStateBackend(getStateBackend) +env.setParallelism(1) --- End diff -- Actually, I was wrong on this one. Late elements are not deterministic handled if p > 1. Will change it back. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r184115873 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -151,8 +157,36 @@ class AggregationCodeGenerator( } } -// initialize and create data views -addReusableDataViews() +// get distinct filter of acc fields for each aggregate functions +val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}" + +val distinctAggs: Array[Seq[DataViewSpec[_]]] = isDistinctAggs.zipWithIndex.map { + case (isDistinctAgg, idx) => if (isDistinctAgg) { +val fieldIndex: Int = aggFields(idx)(0) +val mapViewTypeInfo = new MapViewTypeInfo( + physicalInputTypes(fieldIndex), BasicTypeInfo.LONG_TYPE_INFO) --- End diff -- at this moment it will disregard any null value. But as you mention this is not correct. Will address. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r184125030 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAccumulator.scala --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.aggfunctions + +import java.lang.{Long => JLong} +import java.lang.{Iterable => JIterable} +import java.util.{Map => JMap} + +import org.apache.flink.table.api.dataview.MapView + +/** + * The base class for accumulator wrapper when applying distinct aggregation. + * @param realAcc the actual accumulator which gets invoke after distinct filter. + * @param mapView the [[MapView]] element used to store the distinct filter hash map. + * @tparam E the element type for the distinct filter hash map. + * @tparam ACC the accumulator type for the realAcc. + */ +class DistinctAccumulator[E, ACC](var realAcc: ACC, var mapView: MapView[E, JLong]) { + def this() { +this(null.asInstanceOf[ACC], new MapView[E, JLong]()) + } + + def this(realAcc: ACC) { +this(realAcc, new MapView[E, JLong]()) + } + + def getRealAcc: ACC = realAcc + + def canEqual(a: Any): Boolean = a.isInstanceOf[DistinctAccumulator[E, ACC]] + + override def equals(that: Any): Boolean = +that match { + case that: DistinctAccumulator[E, ACC] => that.canEqual(this) && +this.mapView == that.mapView + case _ => false +} + + def add(element: E): Boolean = { +if (element != null) { --- End diff -- Great catch @fhueske. Sorry I misunderstood what you meant previous regrading `null` value. This line was to disregard all `null` values. But as you mentioned, some operators might treat `null` value differently, e.g. `COUNT` aggregators. I added the handling and the tests as well. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r184115947 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -151,8 +157,36 @@ class AggregationCodeGenerator( } } -// initialize and create data views -addReusableDataViews() +// get distinct filter of acc fields for each aggregate functions +val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}" + +val distinctAggs: Array[Seq[DataViewSpec[_]]] = isDistinctAggs.zipWithIndex.map { + case (isDistinctAgg, idx) => if (isDistinctAgg) { +val fieldIndex: Int = aggFields(idx)(0) +val mapViewTypeInfo = new MapViewTypeInfo( + physicalInputTypes(fieldIndex), BasicTypeInfo.LONG_TYPE_INFO) +Seq( + MapViewSpec( +"distinctAgg" + idx + "_field" + fieldIndex, --- End diff -- +1 ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r184041497 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -207,112 +241,164 @@ class AggregationCodeGenerator( } } + /** - * Create DataView Term, for example, acc1_map_dataview. - * - * @param aggIndex index of aggregate function - * @param fieldName field name of DataView - * @return term to access [[MapView]] or [[ListView]] + * Add all data view for all distinct filters defined by aggregation functions. */ -def createDataViewTerm(aggIndex: Int, fieldName: String): String = { - s"acc${aggIndex}_${fieldName}_dataview" +def addDistinctFilterDataViews(): Unit = { + val descMapping: Map[String, StateDescriptor[_, _]] = distinctAggs +.flatMap(specs => specs.map(s => (s.stateId, s.toStateDescriptor))) +.toMap[String, StateDescriptor[_ <: State, _]] + if (isStateBackedDataViews) { +for (i <- aggs.indices) yield { + for (spec <- distinctAggs(i)) { +// Check if stat descriptor exists. +val desc: StateDescriptor[_, _] = descMapping.getOrElse(spec.stateId, + throw new CodeGenException( +s"Can not find DataView for distinct filter in accumulator by id: ${spec.stateId}")) + +addReusableDataView(spec, desc, i) + } +} + } } /** - * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] to the open, cleanup, - * close and member area of the generated function. - * + * Add all data views for all field accumulators defined by aggregation functions. */ -def addReusableDataViews(): Unit = { +def addAccumulatorDataViews(): Unit = { if (accConfig.isDefined) { val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.get .flatMap(specs => specs.map(s => (s.stateId, s.toStateDescriptor))) .toMap[String, StateDescriptor[_ <: State, _]] for (i <- aggs.indices) yield { for (spec <- accConfig.get(i)) yield { -val dataViewField = spec.field -val dataViewTypeTerm = dataViewField.getType.getCanonicalName -val desc = descMapping.getOrElse(spec.stateId, +// Check if stat descriptor exists. +val desc: StateDescriptor[_, _] = descMapping.getOrElse(spec.stateId, throw new CodeGenException( s"Can not find DataView in accumulator by id: ${spec.stateId}")) -// define the DataView variables -val serializedData = serializeStateDescriptor(desc) -val dataViewFieldTerm = createDataViewTerm(i, dataViewField.getName) -val field = - s""" - |final $dataViewTypeTerm $dataViewFieldTerm; - |""".stripMargin -reusableMemberStatements.add(field) - -// create DataViews -val descFieldTerm = s"${dataViewFieldTerm}_desc" -val descClassQualifier = classOf[StateDescriptor[_, _]].getCanonicalName -val descDeserializeCode = - s""" - |$descClassQualifier $descFieldTerm = ($descClassQualifier) - | org.apache.flink.util.InstantiationUtil.deserializeObject( - | org.apache.commons.codec.binary.Base64.decodeBase64("$serializedData"), - | $contextTerm.getUserCodeClassLoader()); - |""".stripMargin -val createDataView = if (dataViewField.getType == classOf[MapView[_, _]]) { - s""" - |$descDeserializeCode - |$dataViewFieldTerm = new org.apache.flink.table.dataview.StateMapView( - | $contextTerm.getMapState(( - | org.apache.flink.api.common.state.MapStateDescriptor)$descFieldTerm)); - |""".stripMargin -} else if (dataViewField.getType == classOf[ListView[_]]) { - s""" - |$descDeserializeCode - |$dataViewFieldTerm = new org.apache.flink.table.dataview.StateListView( - | $contextTerm.getListState(( - | org.apache.flink.api.common.state.ListStateDescriptor)$descFieldTerm)); - |""".stripMargin -} else { - throw new CodeGenException(s"Unsupported dataview type:
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r184041109 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -207,112 +241,164 @@ class AggregationCodeGenerator( } } + /** - * Create DataView Term, for example, acc1_map_dataview. - * - * @param aggIndex index of aggregate function - * @param fieldName field name of DataView - * @return term to access [[MapView]] or [[ListView]] + * Add all data view for all distinct filters defined by aggregation functions. */ -def createDataViewTerm(aggIndex: Int, fieldName: String): String = { - s"acc${aggIndex}_${fieldName}_dataview" +def addDistinctFilterDataViews(): Unit = { + val descMapping: Map[String, StateDescriptor[_, _]] = distinctAggs +.flatMap(specs => specs.map(s => (s.stateId, s.toStateDescriptor))) +.toMap[String, StateDescriptor[_ <: State, _]] + if (isStateBackedDataViews) { +for (i <- aggs.indices) yield { + for (spec <- distinctAggs(i)) { +// Check if stat descriptor exists. +val desc: StateDescriptor[_, _] = descMapping.getOrElse(spec.stateId, + throw new CodeGenException( +s"Can not find DataView for distinct filter in accumulator by id: ${spec.stateId}")) + +addReusableDataView(spec, desc, i) + } +} + } } /** - * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] to the open, cleanup, - * close and member area of the generated function. - * + * Add all data views for all field accumulators defined by aggregation functions. */ -def addReusableDataViews(): Unit = { +def addAccumulatorDataViews(): Unit = { if (accConfig.isDefined) { val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.get .flatMap(specs => specs.map(s => (s.stateId, s.toStateDescriptor))) .toMap[String, StateDescriptor[_ <: State, _]] for (i <- aggs.indices) yield { for (spec <- accConfig.get(i)) yield { -val dataViewField = spec.field -val dataViewTypeTerm = dataViewField.getType.getCanonicalName -val desc = descMapping.getOrElse(spec.stateId, +// Check if stat descriptor exists. +val desc: StateDescriptor[_, _] = descMapping.getOrElse(spec.stateId, throw new CodeGenException( s"Can not find DataView in accumulator by id: ${spec.stateId}")) -// define the DataView variables -val serializedData = serializeStateDescriptor(desc) -val dataViewFieldTerm = createDataViewTerm(i, dataViewField.getName) -val field = - s""" - |final $dataViewTypeTerm $dataViewFieldTerm; - |""".stripMargin -reusableMemberStatements.add(field) - -// create DataViews -val descFieldTerm = s"${dataViewFieldTerm}_desc" -val descClassQualifier = classOf[StateDescriptor[_, _]].getCanonicalName -val descDeserializeCode = - s""" - |$descClassQualifier $descFieldTerm = ($descClassQualifier) - | org.apache.flink.util.InstantiationUtil.deserializeObject( - | org.apache.commons.codec.binary.Base64.decodeBase64("$serializedData"), - | $contextTerm.getUserCodeClassLoader()); - |""".stripMargin -val createDataView = if (dataViewField.getType == classOf[MapView[_, _]]) { - s""" - |$descDeserializeCode - |$dataViewFieldTerm = new org.apache.flink.table.dataview.StateMapView( - | $contextTerm.getMapState(( - | org.apache.flink.api.common.state.MapStateDescriptor)$descFieldTerm)); - |""".stripMargin -} else if (dataViewField.getType == classOf[ListView[_]]) { - s""" - |$descDeserializeCode - |$dataViewFieldTerm = new org.apache.flink.table.dataview.StateListView( - | $contextTerm.getListState(( - | org.apache.flink.api.common.state.ListStateDescriptor)$descFieldTerm)); - |""".stripMargin -} else { - throw new CodeGenException(s"Unsupported dataview type:
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r184040988 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -207,112 +241,164 @@ class AggregationCodeGenerator( } } + /** - * Create DataView Term, for example, acc1_map_dataview. - * - * @param aggIndex index of aggregate function - * @param fieldName field name of DataView - * @return term to access [[MapView]] or [[ListView]] + * Add all data view for all distinct filters defined by aggregation functions. */ -def createDataViewTerm(aggIndex: Int, fieldName: String): String = { - s"acc${aggIndex}_${fieldName}_dataview" +def addDistinctFilterDataViews(): Unit = { + val descMapping: Map[String, StateDescriptor[_, _]] = distinctAggs +.flatMap(specs => specs.map(s => (s.stateId, s.toStateDescriptor))) +.toMap[String, StateDescriptor[_ <: State, _]] + if (isStateBackedDataViews) { +for (i <- aggs.indices) yield { + for (spec <- distinctAggs(i)) { +// Check if stat descriptor exists. +val desc: StateDescriptor[_, _] = descMapping.getOrElse(spec.stateId, + throw new CodeGenException( +s"Can not find DataView for distinct filter in accumulator by id: ${spec.stateId}")) + +addReusableDataView(spec, desc, i) + } +} + } } /** - * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] to the open, cleanup, - * close and member area of the generated function. - * + * Add all data views for all field accumulators defined by aggregation functions. */ -def addReusableDataViews(): Unit = { +def addAccumulatorDataViews(): Unit = { if (accConfig.isDefined) { val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.get .flatMap(specs => specs.map(s => (s.stateId, s.toStateDescriptor))) .toMap[String, StateDescriptor[_ <: State, _]] for (i <- aggs.indices) yield { for (spec <- accConfig.get(i)) yield { -val dataViewField = spec.field -val dataViewTypeTerm = dataViewField.getType.getCanonicalName -val desc = descMapping.getOrElse(spec.stateId, +// Check if stat descriptor exists. +val desc: StateDescriptor[_, _] = descMapping.getOrElse(spec.stateId, throw new CodeGenException( s"Can not find DataView in accumulator by id: ${spec.stateId}")) -// define the DataView variables -val serializedData = serializeStateDescriptor(desc) -val dataViewFieldTerm = createDataViewTerm(i, dataViewField.getName) -val field = - s""" - |final $dataViewTypeTerm $dataViewFieldTerm; - |""".stripMargin -reusableMemberStatements.add(field) - -// create DataViews -val descFieldTerm = s"${dataViewFieldTerm}_desc" -val descClassQualifier = classOf[StateDescriptor[_, _]].getCanonicalName -val descDeserializeCode = - s""" - |$descClassQualifier $descFieldTerm = ($descClassQualifier) - | org.apache.flink.util.InstantiationUtil.deserializeObject( - | org.apache.commons.codec.binary.Base64.decodeBase64("$serializedData"), - | $contextTerm.getUserCodeClassLoader()); - |""".stripMargin -val createDataView = if (dataViewField.getType == classOf[MapView[_, _]]) { - s""" - |$descDeserializeCode - |$dataViewFieldTerm = new org.apache.flink.table.dataview.StateMapView( - | $contextTerm.getMapState(( - | org.apache.flink.api.common.state.MapStateDescriptor)$descFieldTerm)); - |""".stripMargin -} else if (dataViewField.getType == classOf[ListView[_]]) { - s""" - |$descDeserializeCode - |$dataViewFieldTerm = new org.apache.flink.table.dataview.StateListView( - | $contextTerm.getListState(( - | org.apache.flink.api.common.state.ListStateDescriptor)$descFieldTerm)); - |""".stripMargin -} else { - throw new CodeGenException(s"Unsupported dataview type:
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r184041605 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -207,112 +241,164 @@ class AggregationCodeGenerator( } } + /** - * Create DataView Term, for example, acc1_map_dataview. - * - * @param aggIndex index of aggregate function - * @param fieldName field name of DataView - * @return term to access [[MapView]] or [[ListView]] + * Add all data view for all distinct filters defined by aggregation functions. */ -def createDataViewTerm(aggIndex: Int, fieldName: String): String = { - s"acc${aggIndex}_${fieldName}_dataview" +def addDistinctFilterDataViews(): Unit = { + val descMapping: Map[String, StateDescriptor[_, _]] = distinctAggs +.flatMap(specs => specs.map(s => (s.stateId, s.toStateDescriptor))) +.toMap[String, StateDescriptor[_ <: State, _]] + if (isStateBackedDataViews) { +for (i <- aggs.indices) yield { + for (spec <- distinctAggs(i)) { +// Check if stat descriptor exists. +val desc: StateDescriptor[_, _] = descMapping.getOrElse(spec.stateId, + throw new CodeGenException( +s"Can not find DataView for distinct filter in accumulator by id: ${spec.stateId}")) + +addReusableDataView(spec, desc, i) + } +} + } } /** - * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] to the open, cleanup, - * close and member area of the generated function. - * + * Add all data views for all field accumulators defined by aggregation functions. */ -def addReusableDataViews(): Unit = { +def addAccumulatorDataViews(): Unit = { if (accConfig.isDefined) { val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.get .flatMap(specs => specs.map(s => (s.stateId, s.toStateDescriptor))) .toMap[String, StateDescriptor[_ <: State, _]] for (i <- aggs.indices) yield { for (spec <- accConfig.get(i)) yield { -val dataViewField = spec.field -val dataViewTypeTerm = dataViewField.getType.getCanonicalName -val desc = descMapping.getOrElse(spec.stateId, +// Check if stat descriptor exists. +val desc: StateDescriptor[_, _] = descMapping.getOrElse(spec.stateId, throw new CodeGenException( s"Can not find DataView in accumulator by id: ${spec.stateId}")) -// define the DataView variables -val serializedData = serializeStateDescriptor(desc) -val dataViewFieldTerm = createDataViewTerm(i, dataViewField.getName) -val field = - s""" - |final $dataViewTypeTerm $dataViewFieldTerm; - |""".stripMargin -reusableMemberStatements.add(field) - -// create DataViews -val descFieldTerm = s"${dataViewFieldTerm}_desc" -val descClassQualifier = classOf[StateDescriptor[_, _]].getCanonicalName -val descDeserializeCode = - s""" - |$descClassQualifier $descFieldTerm = ($descClassQualifier) - | org.apache.flink.util.InstantiationUtil.deserializeObject( - | org.apache.commons.codec.binary.Base64.decodeBase64("$serializedData"), - | $contextTerm.getUserCodeClassLoader()); - |""".stripMargin -val createDataView = if (dataViewField.getType == classOf[MapView[_, _]]) { - s""" - |$descDeserializeCode - |$dataViewFieldTerm = new org.apache.flink.table.dataview.StateMapView( - | $contextTerm.getMapState(( - | org.apache.flink.api.common.state.MapStateDescriptor)$descFieldTerm)); - |""".stripMargin -} else if (dataViewField.getType == classOf[ListView[_]]) { - s""" - |$descDeserializeCode - |$dataViewFieldTerm = new org.apache.flink.table.dataview.StateListView( - | $contextTerm.getListState(( - | org.apache.flink.api.common.state.ListStateDescriptor)$descFieldTerm)); - |""".stripMargin -} else { - throw new CodeGenException(s"Unsupported dataview type:
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r184040192 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAccumulator.scala --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.aggfunctions + +import java.lang.{Long => JLong} +import java.lang.{Iterable => JIterable} +import java.util.{Map => JMap} + +import org.apache.flink.table.api.dataview.MapView + +/** + * The base class for accumulator wrapper when applying distinct aggregation. + * @param realAcc the actual accumulator which gets invoke after distinct filter. + * @param mapView the [[MapView]] element used to store the distinct filter hash map. + * @tparam E the element type for the distinct filter hash map. + * @tparam ACC the accumulator type for the realAcc. + */ +class DistinctAccumulator[E, ACC](var realAcc: ACC, var mapView: MapView[E, JLong]) { --- End diff -- rename `mapView` to `distinctValueMap`? ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r184036382 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -151,8 +157,36 @@ class AggregationCodeGenerator( } } -// initialize and create data views -addReusableDataViews() +// get distinct filter of acc fields for each aggregate functions +val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}" + +val distinctAggs: Array[Seq[DataViewSpec[_]]] = isDistinctAggs.zipWithIndex.map { + case (isDistinctAgg, idx) => if (isDistinctAgg) { +val fieldIndex: Int = aggFields(idx)(0) +val mapViewTypeInfo = new MapViewTypeInfo( + physicalInputTypes(fieldIndex), BasicTypeInfo.LONG_TYPE_INFO) --- End diff -- Did you check if this works for `null` keys as well, or whether we should wrap it in a `Row(1)`? ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r184075230 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -481,14 +622,34 @@ class AggregationCodeGenerator( |org.apache.flink.types.Row b) """.stripMargin val merge: String = { -for (i <- aggs.indices) yield - j""" - |${accTypes(i)} aAcc$i = (${accTypes(i)}) a.getField($i); - |${accTypes(i)} bAcc$i = (${accTypes(i)}) b.getField(${mapping(i)}); - |accIt$i.setElement(bAcc$i); - |${aggs(i)}.merge(aAcc$i, accIt$i); - |a.setField($i, aAcc$i); - """.stripMargin +for (i <- aggs.indices) yield { + if (isDistinctAggs(i)) { +j""" + |$distinctAccType aDistinctAcc$i = ($distinctAccType) a.getField($i); + |$distinctAccType bDistinctAcc$i = ($distinctAccType) b.getField(${mapping(i)}); + |java.util.Iterator mergeIt$i = + |bDistinctAcc$i.elements().iterator(); + |while (mergeIt$i.hasNext()) { + | java.util.Map.Entry entry = (java.util.Map.Entry) mergeIt$i.next(); + | Object k = entry.getKey(); + | Long v = (Long) entry.getValue(); + | if (aDistinctAcc$i.add(k, v)) { + |${accTypes(i)} aAcc$i = (${accTypes(i)}) aDistinctAcc$i.getRealAcc(); --- End diff -- Move this out of the while loop? ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r184048806 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -417,13 +548,23 @@ class AggregationCodeGenerator( .stripMargin val create: String = { for (i <- aggs.indices) yield { - j""" - |${accTypes(i)} acc$i = (${accTypes(i)}) ${aggs(i)}.createAccumulator(); - |${genDataViewFieldSetter(s"acc$i", i)} - |accs.setField( - | $i, - | acc$i);""" -.stripMargin + if (isDistinctAggs(i)) { +j""" + |${accTypes(i)} acc$i = (${accTypes(i)}) ${aggs(i)}.createAccumulator(); + |$distinctAccType distinctAcc$i = ($distinctAccType) new org.apache.flink.table. --- End diff -- `classOf[DistinctAccumulator].getCanonicalName` ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r184036079 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -151,8 +157,36 @@ class AggregationCodeGenerator( } } -// initialize and create data views -addReusableDataViews() +// get distinct filter of acc fields for each aggregate functions +val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}" + +val distinctAggs: Array[Seq[DataViewSpec[_]]] = isDistinctAggs.zipWithIndex.map { + case (isDistinctAgg, idx) => if (isDistinctAgg) { +val fieldIndex: Int = aggFields(idx)(0) --- End diff -- Add a safety check that `aggFields(idx).length == 1` ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r184037303 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -151,8 +157,36 @@ class AggregationCodeGenerator( } } -// initialize and create data views -addReusableDataViews() +// get distinct filter of acc fields for each aggregate functions +val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}" + +val distinctAggs: Array[Seq[DataViewSpec[_]]] = isDistinctAggs.zipWithIndex.map { + case (isDistinctAgg, idx) => if (isDistinctAgg) { +val fieldIndex: Int = aggFields(idx)(0) +val mapViewTypeInfo = new MapViewTypeInfo( + physicalInputTypes(fieldIndex), BasicTypeInfo.LONG_TYPE_INFO) +Seq( + MapViewSpec( +"distinctAgg" + idx + "_field" + fieldIndex, --- End diff -- I think we can remove the `"_field" + fieldIndex` postfix. Once we add support for distinct multi-argument function, we have to use a single `MapViewSpec` with a `Row` that holds all arguments. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r184039407 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -207,112 +241,164 @@ class AggregationCodeGenerator( } } + /** - * Create DataView Term, for example, acc1_map_dataview. - * - * @param aggIndex index of aggregate function - * @param fieldName field name of DataView - * @return term to access [[MapView]] or [[ListView]] + * Add all data view for all distinct filters defined by aggregation functions. */ -def createDataViewTerm(aggIndex: Int, fieldName: String): String = { - s"acc${aggIndex}_${fieldName}_dataview" +def addDistinctFilterDataViews(): Unit = { + val descMapping: Map[String, StateDescriptor[_, _]] = distinctAggs --- End diff -- can be moved into the `if` branch (like in `addAccumulatorDataViews`) ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r184092223 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala --- @@ -50,6 +50,155 @@ class OverWindowITCase extends StreamingWithStateTestBase { (8L, 8, "Hello World"), (20L, 20, "Hello World")) + @Test + def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStateBackend(getStateBackend) +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.clear + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) +tEnv.registerTable("MyTable", t) + +val sqlQuery = "SELECT a, " + + " SUM(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW), " + + " MIN(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW), " + + " COLLECT(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW) " + + "FROM MyTable" + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = List( + "1,1,1,{1=1}", + "2,2,2,{2=1}", + "2,3,1,{1=1, 2=1}", + "3,2,2,{2=1}", + "3,2,2,{2=1}", + "3,5,2,{2=1, 3=1}", + "4,2,2,{2=1}", + "4,3,1,{1=1, 2=1}", + "4,3,1,{1=1, 2=1}", + "4,3,1,{1=1, 2=1}", + "5,1,1,{1=1}", + "5,4,1,{1=1, 3=1}", + "5,4,1,{1=1, 3=1}", + "5,6,1,{1=1, 2=1, 3=1}", + "5,5,2,{2=1, 3=1}") +assertEquals(expected, StreamITCase.testResults) + } + + @Test + def testProcTimeDistinctUnboundedPartitionedRowsOver(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStateBackend(getStateBackend) +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.clear + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) +tEnv.registerTable("MyTable", t) + +val sqlQuery = "SELECT a, " + + " COUNT(e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " + + " SUM(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " + + " MIN(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding) " + + "FROM MyTable" + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = List( + "1,1,1,1", + "2,1,2,2", + "2,2,3,1", + "3,1,2,2", + "3,2,2,2", + "3,3,5,2", + "4,1,2,2", + "4,2,3,1", + "4,3,3,1", + "4,4,3,1", + "5,1,1,1", + "5,2,4,1", + "5,3,4,1", + "5,4,6,1", + "5,5,6,1") +assertEquals(expected, StreamITCase.testResults) + } + + @Test + def testRowTimeDistinctBoundedNonPartitionedRowsOver(): Unit = { +// use out-of-order data to test distinct accumulator remove +val data = Seq( + Left((2L, (2L, 2, "Hello"))), + Left((2L, (2L, 2, "Hello"))), + Left((1L, (1L, 1, "Hello"))), + Left((1L, (1L, 1, "Hello"))), + Left((2L, (2L, 2, "Hello"))), + Left((1L, (1L, 1, "Hello"))), + Left((20L, (20L, 20, "Hello World"))), // early row + Right(3L), + Left((2L, (2L, 2, "Hello"))), // late row + Left((3L, (3L, 3, "Hello"))), + Left((4L, (4L, 4, "Hello"))), + Left((5L, (5L, 5, "Hello"))), + Left((6L, (6L, 6, "Hello"))), + Left((7L, (7L, 7, "Hello World"))), + Right(7L), + Left((9L, (9L, 9, "Hello World"))), + Left((8L, (8L, 8, "Hello World"))), + Left((8L, (8L, 8, "Hello World"))), + Right(20L)) + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +env.setStateBackend(getStateBackend) +env.setParallelism(1) --- End diff -- should also work for the default parallelism (p=1 only important for proc time processing) ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r184085613 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAccumulator.scala --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.aggfunctions + +import java.lang.{Long => JLong} +import java.lang.{Iterable => JIterable} +import java.util.{Map => JMap} + +import org.apache.flink.table.api.dataview.MapView + +/** + * The base class for accumulator wrapper when applying distinct aggregation. + * @param realAcc the actual accumulator which gets invoke after distinct filter. + * @param mapView the [[MapView]] element used to store the distinct filter hash map. + * @tparam E the element type for the distinct filter hash map. + * @tparam ACC the accumulator type for the realAcc. + */ +class DistinctAccumulator[E, ACC](var realAcc: ACC, var mapView: MapView[E, JLong]) { + def this() { +this(null.asInstanceOf[ACC], new MapView[E, JLong]()) + } + + def this(realAcc: ACC) { +this(realAcc, new MapView[E, JLong]()) + } + + def getRealAcc: ACC = realAcc + + def canEqual(a: Any): Boolean = a.isInstanceOf[DistinctAccumulator[E, ACC]] + + override def equals(that: Any): Boolean = +that match { + case that: DistinctAccumulator[E, ACC] => that.canEqual(this) && +this.mapView == that.mapView + case _ => false +} + + def add(element: E): Boolean = { +if (element != null) { --- End diff -- Shouldn't it be up to the aggregation function how to handle `null` valued parameters? I'd pass the null value on to the function instead of filtering it out. We'd need to wrap it in a `Row()` with a single field to distinguish the case of empty entry and null-valued entry. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r183880939 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -151,8 +157,15 @@ class AggregationCodeGenerator( } } -// initialize and create data views -addReusableDataViews() +// get distinct filter of acc fields for each aggregate functions +val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}" +val isDistinctAggs = distinctAggs.map(_.nonEmpty) --- End diff -- Yes. You are right. Sorry I missed that. Just updated :-) ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r183867019 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala --- @@ -50,6 +50,155 @@ class OverWindowITCase extends StreamingWithStateTestBase { (8L, 8, "Hello World"), (20L, 20, "Hello World")) + @Test + def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStateBackend(getStateBackend) +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.clear + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) +tEnv.registerTable("MyTable", t) + +val sqlQuery = "SELECT a, " + + " SUM(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW), " + + " MIN(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW), " + + " COLLECT(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW) " + + "FROM MyTable" + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = List( + "1,1,1,{1=1}", + "2,2,2,{2=1}", + "2,3,1,{1=1, 2=1}", + "3,2,2,{2=1}", + "3,2,2,{2=1}", + "3,5,2,{2=1, 3=1}", + "4,2,2,{2=1}", + "4,3,1,{1=1, 2=1}", + "4,3,1,{1=1, 2=1}", + "4,3,1,{1=1, 2=1}", + "5,1,1,{1=1}", + "5,4,1,{1=1, 3=1}", + "5,4,1,{1=1, 3=1}", + "5,6,1,{1=1, 2=1, 3=1}", + "5,5,2,{2=1, 3=1}") +assertEquals(expected, StreamITCase.testResults) + } + + @Test + def testProcTimeDistinctUnboundedPartitionedRowsOver(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStateBackend(getStateBackend) +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.clear + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) +tEnv.registerTable("MyTable", t) + +val sqlQuery = "SELECT a, " + + " COUNT(e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " + + " SUM(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " + + " MIN(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding) " + + "FROM MyTable" + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = List( + "1,1,1,1", + "2,1,2,2", + "2,2,3,1", + "3,1,2,2", + "3,2,2,2", + "3,3,5,2", + "4,1,2,2", + "4,2,3,1", + "4,3,3,1", + "4,4,3,1", + "5,1,1,1", + "5,2,4,1", + "5,3,4,1", + "5,4,6,1", + "5,5,6,1") +assertEquals(expected, StreamITCase.testResults) + } + + @Test + def testRowTimeDistinctBoundedNonPartitionedRowsOver(): Unit = { --- End diff -- Great, thanks! ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r183866776 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -151,8 +157,15 @@ class AggregationCodeGenerator( } } -// initialize and create data views -addReusableDataViews() +// get distinct filter of acc fields for each aggregate functions +val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}" +val isDistinctAggs = distinctAggs.map(_.nonEmpty) --- End diff -- I think we don't need the type info of the accumulator to create the `MapViewTypeInfo`, but the type info of the input type of the aggregation function. The input types can be resolved from `physicalInputTypes` and `aggFields`. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r183222089 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1439,7 +1467,47 @@ object AggregateUtil { } } -(aggFieldIndexes, aggregates, accTypes, accSpecs) +// create distinct accumulator filter argument +val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size) + +aggregateCalls.zipWithIndex.foreach { + case (aggCall, index) => +if (aggCall.isDistinct) { + val argList: util.List[Integer] = aggCall.getArgList + // Only support single argument for distinct operation + if (argList.size() > 1) { +throw TableException( + "Cannot apply distinct filter on multiple input argument fields at this moment!") + } + val relDataType = aggregateInputType.getFieldList.get(argList.get(0)).getType + val fieldIndex = aggFieldIndexes(index)(0) + val mapViewTypeInfo = new MapViewTypeInfo( +FlinkTypeFactory.toTypeInfo(relDataType), BasicTypeInfo.INT_TYPE_INFO) + + distinctAggs(index) = Seq( --- End diff -- `mapViewTypeInfo` doesn't seem to be available in codegen. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r183222353 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -327,19 +392,41 @@ class AggregationCodeGenerator( for (i <- aggs.indices) yield if (partialResults) { -j""" - |output.setField( - | ${aggMapping(i)}, - | (${accTypes(i)}) accs.getField($i));""".stripMargin +if (isDistinctAggs(i)) { + + j""" + |$distinctAccType distinctAcc$i = ($distinctAccType) accs.getField($i); + |output.setField( + | ${aggMapping(i)}, + | (${accTypes(i)}) distinctAcc$i.getRealAcc());""".stripMargin +} else { + j""" + |output.setField( + | ${aggMapping(i)}, + | (${accTypes(i)}) accs.getField($i));""".stripMargin +} } else { -j""" - |org.apache.flink.table.functions.AggregateFunction baseClass$i = - | (org.apache.flink.table.functions.AggregateFunction) ${aggs(i)}; - |${accTypes(i)} acc$i = (${accTypes(i)}) accs.getField($i); - |${genDataViewFieldSetter(s"acc$i", i)} - |output.setField( - | ${aggMapping(i)}, - | baseClass$i.getValue(acc$i));""".stripMargin +if (isDistinctAggs(i)) { + j""" + |org.apache.flink.table.functions.AggregateFunction baseClass$i = + | (org.apache.flink.table.functions.AggregateFunction) ${aggs(i)}; + |$distinctAccType distinctAcc$i = ($distinctAccType) accs.getField($i); + |${genDistinctDataViewFieldSetter(s"distinctAcc$i", i)} + |${accTypes(i)} acc$i = (${accTypes(i)}) distinctAcc$i.getRealAcc(); + |${genAccDataViewFieldSetter(s"acc$i", i)} + |output.setField( + | ${aggMapping(i)}, + | baseClass$i.getValue(acc$i));""".stripMargin +} else { --- End diff -- yeah, that's true. it will avoid a lot of typo possibility as well. +1 ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r183222457 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala --- @@ -70,7 +70,12 @@ trait OverAggregate { val aggStrings = namedAggregates.map(_.getKey).map( a => s"${a.getAggregation}(${ -if (a.getArgList.size() > 0) { +val prefix = if (a.isDistinct) { --- End diff -- will address in FLINK-8690 ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r183222451 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAccumulator.scala --- @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.aggfunctions + +import java.lang.{Iterable => JIterable} +import java.util.{Map => JMap} +import org.apache.flink.table.api.dataview.MapView + +/** + * The base class for accumulator wrapper when applying distinct aggregation. + * @param realAcc the actual accumulator which gets invoke after distinct filter. + * @param mapView the [[MapView]] element used to store the distinct filter hash map. + * @tparam E the element type for the distinct filter hash map. + * @tparam ACC the accumulator type for the realAcc. + */ +class DistinctAccumulator[E, ACC](var realAcc: ACC, var mapView: MapView[E, Integer]) { + def this() { +this(null.asInstanceOf[ACC], new MapView[E, Integer]()) + } + + def this(realAcc: ACC) { +this(realAcc, new MapView[E, Integer]()) + } + + def getRealAcc: ACC = realAcc + + def canEqual(a: Any): Boolean = a.isInstanceOf[DistinctAccumulator[E, ACC]] + + override def equals(that: Any): Boolean = +that match { + case that: DistinctAccumulator[E, ACC] => that.canEqual(this) && +this.mapView == that.mapView + case _ => false +} + + def add(element: E): Boolean = { +if (element != null) { + if (mapView.contains(element)) { +mapView.put(element, mapView.get(element) + 1) +false + } else { +mapView.put(element, 1) +true + } +} else { + false +} + } + + def add(element: E, count: Int): Boolean = { +if (element != null) { + if (mapView.contains(element)) { --- End diff -- +1 here as well, just in case I forgot ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r183222077 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1439,7 +1467,47 @@ object AggregateUtil { } } -(aggFieldIndexes, aggregates, accTypes, accSpecs) +// create distinct accumulator filter argument +val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size) + +aggregateCalls.zipWithIndex.foreach { + case (aggCall, index) => +if (aggCall.isDistinct) { + val argList: util.List[Integer] = aggCall.getArgList + // Only support single argument for distinct operation + if (argList.size() > 1) { +throw TableException( + "Cannot apply distinct filter on multiple input argument fields at this moment!") + } + val relDataType = aggregateInputType.getFieldList.get(argList.get(0)).getType + val fieldIndex = aggFieldIndexes(index)(0) + val mapViewTypeInfo = new MapViewTypeInfo( +FlinkTypeFactory.toTypeInfo(relDataType), BasicTypeInfo.INT_TYPE_INFO) --- End diff -- That's a good point. I will add test for this use case. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r18367 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -151,8 +157,15 @@ class AggregationCodeGenerator( } } -// initialize and create data views -addReusableDataViews() +// get distinct filter of acc fields for each aggregate functions +val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}" +val isDistinctAggs = distinctAggs.map(_.nonEmpty) --- End diff -- Hmm I tried to generate MapViewSpec in codegen but `AggregationCodeGenerator.generateAggregations` call signature seems to miss the typeinfo for accumulator (`accTypes` only shows the accumulator type, not the `PojoField` info). I can certainly changed to a more simple: `DistinctAccumulator[Object, Long]` but I think there's pros and cons, also making a `MapTypeInfo` for plain `Object` seems funny to me. How about we leave it to the future optimization. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r183598006 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala --- @@ -50,6 +50,155 @@ class OverWindowITCase extends StreamingWithStateTestBase { (8L, 8, "Hello World"), (20L, 20, "Hello World")) + @Test + def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStateBackend(getStateBackend) +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.clear + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) +tEnv.registerTable("MyTable", t) + +val sqlQuery = "SELECT a, " + + " SUM(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW), " + + " MIN(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW), " + + " COLLECT(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW) " + + "FROM MyTable" + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = List( + "1,1,1,{1=1}", + "2,2,2,{2=1}", + "2,3,1,{1=1, 2=1}", + "3,2,2,{2=1}", + "3,2,2,{2=1}", + "3,5,2,{2=1, 3=1}", + "4,2,2,{2=1}", + "4,3,1,{1=1, 2=1}", + "4,3,1,{1=1, 2=1}", + "4,3,1,{1=1, 2=1}", + "5,1,1,{1=1}", + "5,4,1,{1=1, 3=1}", + "5,4,1,{1=1, 3=1}", + "5,6,1,{1=1, 2=1, 3=1}", + "5,5,2,{2=1, 3=1}") +assertEquals(expected, StreamITCase.testResults) + } + + @Test + def testProcTimeDistinctUnboundedPartitionedRowsOver(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStateBackend(getStateBackend) +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.clear + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) +tEnv.registerTable("MyTable", t) + +val sqlQuery = "SELECT a, " + + " COUNT(e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " + + " SUM(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " + + " MIN(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding) " + + "FROM MyTable" + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = List( + "1,1,1,1", + "2,1,2,2", + "2,2,3,1", + "3,1,2,2", + "3,2,2,2", + "3,3,5,2", + "4,1,2,2", + "4,2,3,1", + "4,3,3,1", + "4,4,3,1", + "5,1,1,1", + "5,2,4,1", + "5,3,4,1", + "5,4,6,1", + "5,5,6,1") +assertEquals(expected, StreamITCase.testResults) + } + + @Test + def testRowTimeDistinctBoundedNonPartitionedRowsOver(): Unit = { --- End diff -- This test case was added to cover all over aggregate usage on distinct accumulator. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r183222573 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala --- @@ -50,6 +51,96 @@ class OverWindowITCase extends StreamingWithStateTestBase { (8L, 8, "Hello World"), (20L, 20, "Hello World")) + @Test + def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = { --- End diff -- Both case went through: `AggregateExpandDistinctAggregatesRule`, thus produce plans that breaks the distinct aggregate into a `GROUP BY` over distinct key, and aggregate without distinct. In the 2nd case, `LogicalWindowAggregateRule` did not applied due to `distinct` keyword. They should both be fixed in FLINK-8690 ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182496636 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1439,7 +1467,47 @@ object AggregateUtil { } } -(aggFieldIndexes, aggregates, accTypes, accSpecs) +// create distinct accumulator filter argument +val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size) + +aggregateCalls.zipWithIndex.foreach { + case (aggCall, index) => +if (aggCall.isDistinct) { + val argList: util.List[Integer] = aggCall.getArgList + // Only support single argument for distinct operation + if (argList.size() > 1) { +throw TableException( + "Cannot apply distinct filter on multiple input argument fields at this moment!") + } + val relDataType = aggregateInputType.getFieldList.get(argList.get(0)).getType + val fieldIndex = aggFieldIndexes(index)(0) + val mapViewTypeInfo = new MapViewTypeInfo( +FlinkTypeFactory.toTypeInfo(relDataType), BasicTypeInfo.INT_TYPE_INFO) + + distinctAggs(index) = Seq( --- End diff -- I would generate the `MapViewSpec` in the aggregation code generator ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182226785 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1439,7 +1467,47 @@ object AggregateUtil { } } -(aggFieldIndexes, aggregates, accTypes, accSpecs) +// create distinct accumulator filter argument +val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size) + +aggregateCalls.zipWithIndex.foreach { + case (aggCall, index) => +if (aggCall.isDistinct) { + val argList: util.List[Integer] = aggCall.getArgList + // Only support single argument for distinct operation + if (argList.size() > 1) { +throw TableException( + "Cannot apply distinct filter on multiple input argument fields at this moment!") --- End diff -- Reword error message to `"DISTINCT aggregations with multiple parameters not fully supported yet."`. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182753514 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -151,8 +157,15 @@ class AggregationCodeGenerator( } } -// initialize and create data views -addReusableDataViews() +// get distinct filter of acc fields for each aggregate functions +val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}" +val isDistinctAggs = distinctAggs.map(_.nonEmpty) --- End diff -- Instead we can create the `DataViewSpecs` here. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182478446 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -93,6 +97,8 @@ class AggregationCodeGenerator( aggregates: Array[AggregateFunction[_ <: Any, _ <: Any]], aggFields: Array[Array[Int]], aggMapping: Array[Int], + distinctAggs: Array[Seq[DataViewSpec[_]]], --- End diff -- I would make this an `Array[Boolean]` and rename to `isDistinctAgg`. We can build the `MapViewSpec`s in the method. We have all the information for that in the other parameters. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182481625 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAccumulator.scala --- @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.aggfunctions + +import java.lang.{Iterable => JIterable} +import java.util.{Map => JMap} +import org.apache.flink.table.api.dataview.MapView + +/** + * The base class for accumulator wrapper when applying distinct aggregation. + * @param realAcc the actual accumulator which gets invoke after distinct filter. + * @param mapView the [[MapView]] element used to store the distinct filter hash map. + * @tparam E the element type for the distinct filter hash map. + * @tparam ACC the accumulator type for the realAcc. + */ +class DistinctAccumulator[E, ACC](var realAcc: ACC, var mapView: MapView[E, Integer]) { + def this() { +this(null.asInstanceOf[ACC], new MapView[E, Integer]()) + } + + def this(realAcc: ACC) { +this(realAcc, new MapView[E, Integer]()) + } + + def getRealAcc: ACC = realAcc + + def canEqual(a: Any): Boolean = a.isInstanceOf[DistinctAccumulator[E, ACC]] + + override def equals(that: Any): Boolean = +that match { + case that: DistinctAccumulator[E, ACC] => that.canEqual(this) && +this.mapView == that.mapView + case _ => false +} + + def add(element: E): Boolean = { +if (element != null) { + if (mapView.contains(element)) { --- End diff -- +1 ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182479163 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -93,6 +97,8 @@ class AggregationCodeGenerator( aggregates: Array[AggregateFunction[_ <: Any, _ <: Any]], aggFields: Array[Array[Int]], aggMapping: Array[Int], + distinctAggs: Array[Seq[DataViewSpec[_]]], + isStateBackedDataViews: Boolean, --- End diff -- We should add a constructor check for `if (partialResults && isStateBackedDataViews)` and throw an exception if `true`. `partialResults` means that the `Row` with the accumulators has to be emitted which won't work well for state-backed distinct maps that are probably too big. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182768521 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -327,19 +392,41 @@ class AggregationCodeGenerator( for (i <- aggs.indices) yield if (partialResults) { -j""" - |output.setField( - | ${aggMapping(i)}, - | (${accTypes(i)}) accs.getField($i));""".stripMargin +if (isDistinctAggs(i)) { + + j""" + |$distinctAccType distinctAcc$i = ($distinctAccType) accs.getField($i); + |output.setField( + | ${aggMapping(i)}, + | (${accTypes(i)}) distinctAcc$i.getRealAcc());""".stripMargin +} else { + j""" + |output.setField( + | ${aggMapping(i)}, + | (${accTypes(i)}) accs.getField($i));""".stripMargin +} } else { -j""" - |org.apache.flink.table.functions.AggregateFunction baseClass$i = - | (org.apache.flink.table.functions.AggregateFunction) ${aggs(i)}; - |${accTypes(i)} acc$i = (${accTypes(i)}) accs.getField($i); - |${genDataViewFieldSetter(s"acc$i", i)} - |output.setField( - | ${aggMapping(i)}, - | baseClass$i.getValue(acc$i));""".stripMargin +if (isDistinctAggs(i)) { + j""" + |org.apache.flink.table.functions.AggregateFunction baseClass$i = + | (org.apache.flink.table.functions.AggregateFunction) ${aggs(i)}; + |$distinctAccType distinctAcc$i = ($distinctAccType) accs.getField($i); + |${genDistinctDataViewFieldSetter(s"distinctAcc$i", i)} --- End diff -- we don't need this statement ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182218521 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1439,7 +1467,47 @@ object AggregateUtil { } } -(aggFieldIndexes, aggregates, accTypes, accSpecs) +// create distinct accumulator filter argument +val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size) --- End diff -- I would not create the `MapViewSpec`s here but do that in the code gen function. Here we should create an `Array[Boolean]` to flag distinct aggregations. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182496465 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1439,7 +1467,47 @@ object AggregateUtil { } } -(aggFieldIndexes, aggregates, accTypes, accSpecs) +// create distinct accumulator filter argument +val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size) + +aggregateCalls.zipWithIndex.foreach { + case (aggCall, index) => +if (aggCall.isDistinct) { + val argList: util.List[Integer] = aggCall.getArgList + // Only support single argument for distinct operation + if (argList.size() > 1) { +throw TableException( + "Cannot apply distinct filter on multiple input argument fields at this moment!") + } + val relDataType = aggregateInputType.getFieldList.get(argList.get(0)).getType + val fieldIndex = aggFieldIndexes(index)(0) + val mapViewTypeInfo = new MapViewTypeInfo( +FlinkTypeFactory.toTypeInfo(relDataType), BasicTypeInfo.INT_TYPE_INFO) --- End diff -- Does the approach also work for `null` values in both MapViews? If not, we can use a `Row(1)` that serializes a bitmask for `null` values. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182773359 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -417,13 +530,26 @@ class AggregationCodeGenerator( .stripMargin val create: String = { for (i <- aggs.indices) yield { - j""" - |${accTypes(i)} acc$i = (${accTypes(i)}) ${aggs(i)}.createAccumulator(); - |${genDataViewFieldSetter(s"acc$i", i)} - |accs.setField( - | $i, - | acc$i);""" -.stripMargin + if (isDistinctAggs(i)) { +j""" + |${accTypes(i)} acc$i = (${accTypes(i)}) ${aggs(i)}.createAccumulator(); + |$distinctAccType distinctAcc$i = ($distinctAccType) new org.apache.flink.table. + |functions.aggfunctions.DistinctAccumulator(acc$i); + |${genDistinctDataViewFieldSetter(s"distinctAcc$i", i)} --- End diff -- I think this and the `genAccDataViewFieldSetter` call (both from here and the non-distinct case) can be removed. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182219690 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala --- @@ -50,6 +51,96 @@ class OverWindowITCase extends StreamingWithStateTestBase { (8L, 8, "Hello World"), (20L, 20, "Hello World")) + @Test + def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = { --- End diff -- I tested a few other queries with distinct aggregates. * Queries with non-windowed `DISTINCT` aggregations work, but they are they are translated without distinct aggregations. So they changes in this PR are not used. * Queries with `DISTINCT` aggregates and `TUMBLE` or `HOP` windows fail with strange exceptions. Did not look related to these changes, but would be good to check. We don't have to fix these things in this PR (unless it is broken by these changes). In general, I think it would be good to add unit tests for the `AggregationCodeGenerator`. We could generate `GeneratedAggregations` for different configurations, compile them, and check if the result is expected. Actually, we should have done that already. This should also work for state-backend backed views if we embed the test in a HarnessTest. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182489141 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1439,7 +1467,47 @@ object AggregateUtil { } } -(aggFieldIndexes, aggregates, accTypes, accSpecs) +// create distinct accumulator filter argument +val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size) + +aggregateCalls.zipWithIndex.foreach { + case (aggCall, index) => +if (aggCall.isDistinct) { + val argList: util.List[Integer] = aggCall.getArgList + // Only support single argument for distinct operation + if (argList.size() > 1) { +throw TableException( + "Cannot apply distinct filter on multiple input argument fields at this moment!") + } + val relDataType = aggregateInputType.getFieldList.get(argList.get(0)).getType + val fieldIndex = aggFieldIndexes(index)(0) + val mapViewTypeInfo = new MapViewTypeInfo( +FlinkTypeFactory.toTypeInfo(relDataType), BasicTypeInfo.INT_TYPE_INFO) --- End diff -- +1 ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182233134 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1439,7 +1467,47 @@ object AggregateUtil { } } -(aggFieldIndexes, aggregates, accTypes, accSpecs) +// create distinct accumulator filter argument +val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size) + +aggregateCalls.zipWithIndex.foreach { + case (aggCall, index) => +if (aggCall.isDistinct) { + val argList: util.List[Integer] = aggCall.getArgList --- End diff -- add more comments for the `aggCall.isDistinct` branch ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182753219 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -151,8 +157,15 @@ class AggregationCodeGenerator( } } -// initialize and create data views -addReusableDataViews() +// get distinct filter of acc fields for each aggregate functions +val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}" +val isDistinctAggs = distinctAggs.map(_.nonEmpty) --- End diff -- If we change the input parameter, we have the `Array[Boolean]` already ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182254735 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -327,19 +392,41 @@ class AggregationCodeGenerator( for (i <- aggs.indices) yield if (partialResults) { -j""" - |output.setField( - | ${aggMapping(i)}, - | (${accTypes(i)}) accs.getField($i));""".stripMargin +if (isDistinctAggs(i)) { + + j""" + |$distinctAccType distinctAcc$i = ($distinctAccType) accs.getField($i); + |output.setField( + | ${aggMapping(i)}, + | (${accTypes(i)}) distinctAcc$i.getRealAcc());""".stripMargin --- End diff -- We need to forward the distinct maps as well. `partialResults` is used when an operator needs to emit partial aggregation results such as a combine function in batch execution. So we don't need to distinguish the `isDistinctAggs(i)` case here. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182768726 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -327,19 +392,41 @@ class AggregationCodeGenerator( for (i <- aggs.indices) yield if (partialResults) { -j""" - |output.setField( - | ${aggMapping(i)}, - | (${accTypes(i)}) accs.getField($i));""".stripMargin +if (isDistinctAggs(i)) { + + j""" + |$distinctAccType distinctAcc$i = ($distinctAccType) accs.getField($i); + |output.setField( + | ${aggMapping(i)}, + | (${accTypes(i)}) distinctAcc$i.getRealAcc());""".stripMargin +} else { + j""" + |output.setField( + | ${aggMapping(i)}, + | (${accTypes(i)}) accs.getField($i));""".stripMargin +} } else { -j""" - |org.apache.flink.table.functions.AggregateFunction baseClass$i = - | (org.apache.flink.table.functions.AggregateFunction) ${aggs(i)}; - |${accTypes(i)} acc$i = (${accTypes(i)}) accs.getField($i); - |${genDataViewFieldSetter(s"acc$i", i)} - |output.setField( - | ${aggMapping(i)}, - | baseClass$i.getValue(acc$i));""".stripMargin +if (isDistinctAggs(i)) { + j""" + |org.apache.flink.table.functions.AggregateFunction baseClass$i = + | (org.apache.flink.table.functions.AggregateFunction) ${aggs(i)}; + |$distinctAccType distinctAcc$i = ($distinctAccType) accs.getField($i); + |${genDistinctDataViewFieldSetter(s"distinctAcc$i", i)} + |${accTypes(i)} acc$i = (${accTypes(i)}) distinctAcc$i.getRealAcc(); + |${genAccDataViewFieldSetter(s"acc$i", i)} + |output.setField( + | ${aggMapping(i)}, + | baseClass$i.getValue(acc$i));""".stripMargin +} else { --- End diff -- both cases share a lot of code. We could only retrieve `acc$i` differently. Does that make sense or fragment the code too much? Same would apply for `accumulate()` and `retract()`. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182469110 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala --- @@ -70,7 +70,12 @@ trait OverAggregate { val aggStrings = namedAggregates.map(_.getKey).map( a => s"${a.getAggregation}(${ -if (a.getArgList.size() > 0) { +val prefix = if (a.isDistinct) { --- End diff -- In case we also want to support group-windowed DISTINCT aggregates, we would need to change `CommonAggregate.aggregationToString()` as well. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r173644059 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAccumulator.scala --- @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.aggfunctions + +import java.lang.{Iterable => JIterable} +import java.util.{Map => JMap} +import org.apache.flink.table.api.dataview.MapView + +/** + * The base class for accumulator wrapper when applying distinct aggregation. + * @param realAcc the actual accumulator which gets invoke after distinct filter. + * @param mapView the [[MapView]] element used to store the distinct filter hash map. + * @tparam E the element type for the distinct filter hash map. + * @tparam ACC the accumulator type for the realAcc. + */ +class DistinctAccumulator[E, ACC](var realAcc: ACC, var mapView: MapView[E, Integer]) { + def this() { +this(null.asInstanceOf[ACC], new MapView[E, Integer]()) + } + + def this(realAcc: ACC) { +this(realAcc, new MapView[E, Integer]()) + } + + def getRealAcc: ACC = realAcc + + def canEqual(a: Any): Boolean = a.isInstanceOf[DistinctAccumulator[E, ACC]] + + override def equals(that: Any): Boolean = +that match { + case that: DistinctAccumulator[E, ACC] => that.canEqual(this) && +this.mapView == that.mapView + case _ => false +} + + def add(element: E): Boolean = { +if (element != null) { + if (mapView.contains(element)) { --- End diff -- Use mapView.get() directly and reuse the result. This can avoid to get state twice. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r173644062 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1439,7 +1467,47 @@ object AggregateUtil { } } -(aggFieldIndexes, aggregates, accTypes, accSpecs) +// create distinct accumulator filter argument +val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size) + +aggregateCalls.zipWithIndex.foreach { + case (aggCall, index) => +if (aggCall.isDistinct) { + val argList: util.List[Integer] = aggCall.getArgList + // Only support single argument for distinct operation + if (argList.size() > 1) { +throw TableException( + "Cannot apply distinct filter on multiple input argument fields at this moment!") + } + val relDataType = aggregateInputType.getFieldList.get(argList.get(0)).getType + val fieldIndex = aggFieldIndexes(index)(0) + val mapViewTypeInfo = new MapViewTypeInfo( +FlinkTypeFactory.toTypeInfo(relDataType), BasicTypeInfo.INT_TYPE_INFO) --- End diff -- LONG_TYPE_INFO is more safe? Int overflow will be easily reached given 1w records are processed per second. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r170828036 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1393,6 +1393,21 @@ object AggregateUtil { throw new TableException(s"unsupported Function: '${unSupported.getName}'") } } + + // create distinct accumulator delegate + if (aggregateCall.isDistinct) { --- End diff -- Sql will be verified by calcite to exclude single DISTINCT during sql parse phase, so maybe we don't have to consider single distinct here. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r170828027 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAggDelegateFunction.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.aggfunctions + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo} +import org.apache.flink.table.api.dataview.MapView +import org.apache.flink.table.dataview.MapViewTypeInfo +import org.apache.flink.table.functions.AggregateFunction + +class DistinctAccumulator[E, ACC] (var mapView: MapView[E, Integer], var realAcc: ACC) { + def this() { +this(null, null.asInstanceOf[ACC]) + } + + def getRealAcc: ACC = realAcc + + def canEqual(a: Any): Boolean = a.isInstanceOf[DistinctAccumulator[E, ACC]] + + override def equals(that: Any): Boolean = +that match { + case that: DistinctAccumulator[E, ACC] => that.canEqual(this) && +this.mapView == that.mapView + case _ => false +} + +} + +class DistinctAggDelegateFunction[E, ACC](elementTypeInfo: TypeInformation[_], + var realAgg: AggregateFunction[_, ACC]) + extends AggregateFunction[util.Map[E, Integer], DistinctAccumulator[E, ACC]] { + + def getRealAgg: AggregateFunction[_, ACC] = realAgg + + override def createAccumulator(): DistinctAccumulator[E, ACC] = { +new DistinctAccumulator[E, ACC]( + new MapView[E, Integer]( +elementTypeInfo.asInstanceOf[TypeInformation[E]], +BasicTypeInfo.INT_TYPE_INFO), + realAgg.createAccumulator()) + } + + def accumulate(acc: DistinctAccumulator[E, ACC], element: E): Boolean = { +if (element != null) { + if (acc.mapView.contains(element)) { +acc.mapView.put(element, acc.mapView.get(element) + 1) +false + } else { +acc.mapView.put(element, 1) +true + } +} else { + false +} + } + + def accumulate(acc: DistinctAccumulator[E, ACC], element: E, count: Int): Boolean = { +if (element != null) { + if (acc.mapView.contains(element)) { +acc.mapView.put(element, acc.mapView.get(element) + count) +false + } else { +acc.mapView.put(element, count) +true + } +} else { + false +} + } + + def retract(acc: DistinctAccumulator[E, ACC], element: E): Boolean = { +if (element != null) { + val count = acc.mapView.get(element) + if (count == 1) { +acc.mapView.remove(element) +true + } else { +acc.mapView.put(element, count - 1) +false + } +} else { + false +} + } + + def resetAccumulator(acc: DistinctAccumulator[E, ACC]): Unit = { +acc.mapView.clear() + } + + override def getValue(acc: DistinctAccumulator[E, ACC]): util.Map[E, Integer] = { +acc.mapView.map + } + + override def getAccumulatorType: TypeInformation[DistinctAccumulator[E, ACC]] = { +val clazz = classOf[DistinctAccumulator[E, ACC]] +val pojoFields = new util.ArrayList[PojoField] +pojoFields.add(new PojoField(clazz.getDeclaredField("mapView"), + new MapViewTypeInfo[E, Integer]( +elementTypeInfo.asInstanceOf[TypeInformation[E]], BasicTypeInfo.INT_TYPE_INFO))) +pojoFields.add(new PojoField(clazz.getDeclaredField("realAcc"), + realAgg.getAccumulatorType)) +new PojoTypeInfo[DistinctAccumulator[E, ACC]](clazz, pojoFields) --- End diff -- Make sense. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r170602449 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAggDelegateFunction.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.aggfunctions + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo} +import org.apache.flink.table.api.dataview.MapView +import org.apache.flink.table.dataview.MapViewTypeInfo +import org.apache.flink.table.functions.AggregateFunction + +class DistinctAccumulator[E, ACC] (var mapView: MapView[E, Integer], var realAcc: ACC) { + def this() { +this(null, null.asInstanceOf[ACC]) + } + + def getRealAcc: ACC = realAcc + + def canEqual(a: Any): Boolean = a.isInstanceOf[DistinctAccumulator[E, ACC]] + + override def equals(that: Any): Boolean = +that match { + case that: DistinctAccumulator[E, ACC] => that.canEqual(this) && +this.mapView == that.mapView + case _ => false +} + +} + +class DistinctAggDelegateFunction[E, ACC](elementTypeInfo: TypeInformation[_], + var realAgg: AggregateFunction[_, ACC]) + extends AggregateFunction[util.Map[E, Integer], DistinctAccumulator[E, ACC]] { + + def getRealAgg: AggregateFunction[_, ACC] = realAgg + + override def createAccumulator(): DistinctAccumulator[E, ACC] = { +new DistinctAccumulator[E, ACC]( + new MapView[E, Integer]( +elementTypeInfo.asInstanceOf[TypeInformation[E]], +BasicTypeInfo.INT_TYPE_INFO), + realAgg.createAccumulator()) + } + + def accumulate(acc: DistinctAccumulator[E, ACC], element: E): Boolean = { +if (element != null) { + if (acc.mapView.contains(element)) { +acc.mapView.put(element, acc.mapView.get(element) + 1) +false + } else { +acc.mapView.put(element, 1) +true + } +} else { + false +} + } + + def accumulate(acc: DistinctAccumulator[E, ACC], element: E, count: Int): Boolean = { +if (element != null) { + if (acc.mapView.contains(element)) { +acc.mapView.put(element, acc.mapView.get(element) + count) +false + } else { +acc.mapView.put(element, count) +true + } +} else { + false +} + } + + def retract(acc: DistinctAccumulator[E, ACC], element: E): Boolean = { +if (element != null) { + val count = acc.mapView.get(element) + if (count == 1) { +acc.mapView.remove(element) +true + } else { +acc.mapView.put(element, count - 1) +false + } +} else { + false +} + } + + def resetAccumulator(acc: DistinctAccumulator[E, ACC]): Unit = { +acc.mapView.clear() + } + + override def getValue(acc: DistinctAccumulator[E, ACC]): util.Map[E, Integer] = { +acc.mapView.map + } + + override def getAccumulatorType: TypeInformation[DistinctAccumulator[E, ACC]] = { +val clazz = classOf[DistinctAccumulator[E, ACC]] +val pojoFields = new util.ArrayList[PojoField] +pojoFields.add(new PojoField(clazz.getDeclaredField("mapView"), + new MapViewTypeInfo[E, Integer]( +elementTypeInfo.asInstanceOf[TypeInformation[E]], BasicTypeInfo.INT_TYPE_INFO))) +pojoFields.add(new PojoField(clazz.getDeclaredField("realAcc"), + realAgg.getAccumulatorType)) --- End diff -- Good catch! ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r170603205 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAggDelegateFunction.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.aggfunctions + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo} +import org.apache.flink.table.api.dataview.MapView +import org.apache.flink.table.dataview.MapViewTypeInfo +import org.apache.flink.table.functions.AggregateFunction + +class DistinctAccumulator[E, ACC] (var mapView: MapView[E, Integer], var realAcc: ACC) { + def this() { +this(null, null.asInstanceOf[ACC]) + } + + def getRealAcc: ACC = realAcc + + def canEqual(a: Any): Boolean = a.isInstanceOf[DistinctAccumulator[E, ACC]] + + override def equals(that: Any): Boolean = +that match { + case that: DistinctAccumulator[E, ACC] => that.canEqual(this) && +this.mapView == that.mapView + case _ => false +} + +} + +class DistinctAggDelegateFunction[E, ACC](elementTypeInfo: TypeInformation[_], + var realAgg: AggregateFunction[_, ACC]) + extends AggregateFunction[util.Map[E, Integer], DistinctAccumulator[E, ACC]] { + + def getRealAgg: AggregateFunction[_, ACC] = realAgg + + override def createAccumulator(): DistinctAccumulator[E, ACC] = { +new DistinctAccumulator[E, ACC]( + new MapView[E, Integer]( +elementTypeInfo.asInstanceOf[TypeInformation[E]], +BasicTypeInfo.INT_TYPE_INFO), + realAgg.createAccumulator()) + } + + def accumulate(acc: DistinctAccumulator[E, ACC], element: E): Boolean = { +if (element != null) { + if (acc.mapView.contains(element)) { +acc.mapView.put(element, acc.mapView.get(element) + 1) +false + } else { +acc.mapView.put(element, 1) +true + } +} else { + false +} + } + + def accumulate(acc: DistinctAccumulator[E, ACC], element: E, count: Int): Boolean = { +if (element != null) { + if (acc.mapView.contains(element)) { +acc.mapView.put(element, acc.mapView.get(element) + count) +false + } else { +acc.mapView.put(element, count) +true + } +} else { + false +} + } + + def retract(acc: DistinctAccumulator[E, ACC], element: E): Boolean = { +if (element != null) { + val count = acc.mapView.get(element) + if (count == 1) { +acc.mapView.remove(element) +true + } else { +acc.mapView.put(element, count - 1) +false + } +} else { + false +} + } + + def resetAccumulator(acc: DistinctAccumulator[E, ACC]): Unit = { +acc.mapView.clear() + } + + override def getValue(acc: DistinctAccumulator[E, ACC]): util.Map[E, Integer] = { +acc.mapView.map + } + + override def getAccumulatorType: TypeInformation[DistinctAccumulator[E, ACC]] = { +val clazz = classOf[DistinctAccumulator[E, ACC]] +val pojoFields = new util.ArrayList[PojoField] +pojoFields.add(new PojoField(clazz.getDeclaredField("mapView"), + new MapViewTypeInfo[E, Integer]( +elementTypeInfo.asInstanceOf[TypeInformation[E]], BasicTypeInfo.INT_TYPE_INFO))) +pojoFields.add(new PojoField(clazz.getDeclaredField("realAcc"), + realAgg.getAccumulatorType)) +new PojoTypeInfo[DistinctAccumulator[E, ACC]](clazz, pojoFields) --- End diff -- Yes, I was actually aware of that as one of the limitation of
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r170605004 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala --- @@ -50,6 +50,80 @@ class OverWindowITCase extends StreamingWithStateTestBase { (8L, 8, "Hello World"), (20L, 20, "Hello World")) + @Test + def testUnboundedDistinctGroupWindow(): Unit = { --- End diff -- oops.. this was suppose to be added later in FLINK-8690 :-P ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r170604304 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1393,6 +1393,21 @@ object AggregateUtil { throw new TableException(s"unsupported Function: '${unSupported.getName}'") } } + + // create distinct accumulator delegate + if (aggregateCall.isDistinct) { --- End diff -- Good point. Actually there's a very interesting question I've been thinking - This AggFunction literally is the distinct version of the `CollectAggFunction`, however it cannot exist by itself unless chained with other `realAgg` functions --> this means we will need to chain `DistinctAggFunction` with `CollectAggFunction` which just dont make sense. I will try some other approach and see if they work. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r170577231 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -105,11 +106,32 @@ class AggregationCodeGenerator( // get unique function name val funcName = newName(name) + +// get distinct filter of acc fields for each aggregate functions +val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}" +val distinctAggType = s"${classOf[DistinctAggDelegateFunction[_, _]].getName}" +val isDistinctAcc = aggregates.map(_.isInstanceOf[DistinctAggDelegateFunction[_, _]]) + // register UDAGGs val aggs = aggregates.map(a => addReusableFunction(a, contextTerm)) +// register real aggregate functions without distinct delegate +val realAggregates: Array[AggregateFunction[_ <: Any, _ <: Any]] = aggregates.map { + case distinctAggDelegate: DistinctAggDelegateFunction[_, _] => +distinctAggDelegate.realAgg + case agg: AggregateFunction[_, _] => +agg +} + +val realAggTypes = aggregates.map { --- End diff -- Can be replaced by `realAggregates.map(_.getClass.getName)` ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r170577575 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAggDelegateFunction.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.aggfunctions + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo} +import org.apache.flink.table.api.dataview.MapView +import org.apache.flink.table.dataview.MapViewTypeInfo +import org.apache.flink.table.functions.AggregateFunction + +class DistinctAccumulator[E, ACC] (var mapView: MapView[E, Integer], var realAcc: ACC) { + def this() { +this(null, null.asInstanceOf[ACC]) + } + + def getRealAcc: ACC = realAcc + + def canEqual(a: Any): Boolean = a.isInstanceOf[DistinctAccumulator[E, ACC]] + + override def equals(that: Any): Boolean = +that match { + case that: DistinctAccumulator[E, ACC] => that.canEqual(this) && +this.mapView == that.mapView + case _ => false +} + +} + +class DistinctAggDelegateFunction[E, ACC](elementTypeInfo: TypeInformation[_], + var realAgg: AggregateFunction[_, ACC]) + extends AggregateFunction[util.Map[E, Integer], DistinctAccumulator[E, ACC]] { + + def getRealAgg: AggregateFunction[_, ACC] = realAgg + + override def createAccumulator(): DistinctAccumulator[E, ACC] = { +new DistinctAccumulator[E, ACC]( + new MapView[E, Integer]( +elementTypeInfo.asInstanceOf[TypeInformation[E]], +BasicTypeInfo.INT_TYPE_INFO), + realAgg.createAccumulator()) + } + + def accumulate(acc: DistinctAccumulator[E, ACC], element: E): Boolean = { +if (element != null) { + if (acc.mapView.contains(element)) { +acc.mapView.put(element, acc.mapView.get(element) + 1) +false + } else { +acc.mapView.put(element, 1) +true + } +} else { + false +} + } + + def accumulate(acc: DistinctAccumulator[E, ACC], element: E, count: Int): Boolean = { +if (element != null) { + if (acc.mapView.contains(element)) { +acc.mapView.put(element, acc.mapView.get(element) + count) +false + } else { +acc.mapView.put(element, count) +true + } +} else { + false +} + } + + def retract(acc: DistinctAccumulator[E, ACC], element: E): Boolean = { +if (element != null) { + val count = acc.mapView.get(element) + if (count == 1) { +acc.mapView.remove(element) +true + } else { +acc.mapView.put(element, count - 1) +false + } +} else { + false +} + } + + def resetAccumulator(acc: DistinctAccumulator[E, ACC]): Unit = { +acc.mapView.clear() + } + + override def getValue(acc: DistinctAccumulator[E, ACC]): util.Map[E, Integer] = { +acc.mapView.map + } + + override def getAccumulatorType: TypeInformation[DistinctAccumulator[E, ACC]] = { +val clazz = classOf[DistinctAccumulator[E, ACC]] +val pojoFields = new util.ArrayList[PojoField] +pojoFields.add(new PojoField(clazz.getDeclaredField("mapView"), + new MapViewTypeInfo[E, Integer]( +elementTypeInfo.asInstanceOf[TypeInformation[E]], BasicTypeInfo.INT_TYPE_INFO))) +pojoFields.add(new PojoField(clazz.getDeclaredField("realAcc"), + realAgg.getAccumulatorType)) +new PojoTypeInfo[DistinctAccumulator[E, ACC]](clazz, pojoFields) --- End diff -- Currently, `MapView` and `ListView` only supported at first
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r170577434 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAggDelegateFunction.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.aggfunctions + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo} +import org.apache.flink.table.api.dataview.MapView +import org.apache.flink.table.dataview.MapViewTypeInfo +import org.apache.flink.table.functions.AggregateFunction + +class DistinctAccumulator[E, ACC] (var mapView: MapView[E, Integer], var realAcc: ACC) { + def this() { +this(null, null.asInstanceOf[ACC]) + } + + def getRealAcc: ACC = realAcc + + def canEqual(a: Any): Boolean = a.isInstanceOf[DistinctAccumulator[E, ACC]] + + override def equals(that: Any): Boolean = +that match { + case that: DistinctAccumulator[E, ACC] => that.canEqual(this) && +this.mapView == that.mapView + case _ => false +} + +} + +class DistinctAggDelegateFunction[E, ACC](elementTypeInfo: TypeInformation[_], + var realAgg: AggregateFunction[_, ACC]) + extends AggregateFunction[util.Map[E, Integer], DistinctAccumulator[E, ACC]] { + + def getRealAgg: AggregateFunction[_, ACC] = realAgg + + override def createAccumulator(): DistinctAccumulator[E, ACC] = { +new DistinctAccumulator[E, ACC]( + new MapView[E, Integer]( +elementTypeInfo.asInstanceOf[TypeInformation[E]], +BasicTypeInfo.INT_TYPE_INFO), + realAgg.createAccumulator()) + } + + def accumulate(acc: DistinctAccumulator[E, ACC], element: E): Boolean = { +if (element != null) { + if (acc.mapView.contains(element)) { +acc.mapView.put(element, acc.mapView.get(element) + 1) +false + } else { +acc.mapView.put(element, 1) +true + } +} else { + false +} + } + + def accumulate(acc: DistinctAccumulator[E, ACC], element: E, count: Int): Boolean = { --- End diff -- Add a corresponding retract function. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r170577325 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAggDelegateFunction.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.aggfunctions + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo} +import org.apache.flink.table.api.dataview.MapView +import org.apache.flink.table.dataview.MapViewTypeInfo +import org.apache.flink.table.functions.AggregateFunction + --- End diff -- Add comments for this class and functions. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r170577531 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAggDelegateFunction.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.aggfunctions + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo} +import org.apache.flink.table.api.dataview.MapView +import org.apache.flink.table.dataview.MapViewTypeInfo +import org.apache.flink.table.functions.AggregateFunction + +class DistinctAccumulator[E, ACC] (var mapView: MapView[E, Integer], var realAcc: ACC) { + def this() { +this(null, null.asInstanceOf[ACC]) + } + + def getRealAcc: ACC = realAcc + + def canEqual(a: Any): Boolean = a.isInstanceOf[DistinctAccumulator[E, ACC]] + + override def equals(that: Any): Boolean = +that match { + case that: DistinctAccumulator[E, ACC] => that.canEqual(this) && +this.mapView == that.mapView + case _ => false +} + +} + +class DistinctAggDelegateFunction[E, ACC](elementTypeInfo: TypeInformation[_], + var realAgg: AggregateFunction[_, ACC]) + extends AggregateFunction[util.Map[E, Integer], DistinctAccumulator[E, ACC]] { + + def getRealAgg: AggregateFunction[_, ACC] = realAgg + + override def createAccumulator(): DistinctAccumulator[E, ACC] = { +new DistinctAccumulator[E, ACC]( + new MapView[E, Integer]( +elementTypeInfo.asInstanceOf[TypeInformation[E]], +BasicTypeInfo.INT_TYPE_INFO), + realAgg.createAccumulator()) + } + + def accumulate(acc: DistinctAccumulator[E, ACC], element: E): Boolean = { +if (element != null) { + if (acc.mapView.contains(element)) { +acc.mapView.put(element, acc.mapView.get(element) + 1) +false + } else { +acc.mapView.put(element, 1) +true + } +} else { + false +} + } + + def accumulate(acc: DistinctAccumulator[E, ACC], element: E, count: Int): Boolean = { +if (element != null) { + if (acc.mapView.contains(element)) { +acc.mapView.put(element, acc.mapView.get(element) + count) +false + } else { +acc.mapView.put(element, count) +true + } +} else { + false +} + } + + def retract(acc: DistinctAccumulator[E, ACC], element: E): Boolean = { +if (element != null) { + val count = acc.mapView.get(element) + if (count == 1) { +acc.mapView.remove(element) +true + } else { +acc.mapView.put(element, count - 1) +false + } +} else { + false +} + } + + def resetAccumulator(acc: DistinctAccumulator[E, ACC]): Unit = { +acc.mapView.clear() + } + + override def getValue(acc: DistinctAccumulator[E, ACC]): util.Map[E, Integer] = { +acc.mapView.map + } + + override def getAccumulatorType: TypeInformation[DistinctAccumulator[E, ACC]] = { +val clazz = classOf[DistinctAccumulator[E, ACC]] +val pojoFields = new util.ArrayList[PojoField] +pojoFields.add(new PojoField(clazz.getDeclaredField("mapView"), + new MapViewTypeInfo[E, Integer]( +elementTypeInfo.asInstanceOf[TypeInformation[E]], BasicTypeInfo.INT_TYPE_INFO))) +pojoFields.add(new PojoField(clazz.getDeclaredField("realAcc"), + realAgg.getAccumulatorType)) --- End diff -- `getAccumulatorType ` may return null if has not been overrided. In this case, accumulator type should be inferred. see
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r170577418 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAggDelegateFunction.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.aggfunctions + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo} +import org.apache.flink.table.api.dataview.MapView +import org.apache.flink.table.dataview.MapViewTypeInfo +import org.apache.flink.table.functions.AggregateFunction + +class DistinctAccumulator[E, ACC] (var mapView: MapView[E, Integer], var realAcc: ACC) { --- End diff -- Remove blanks between `[E, ACC]` and `(var mapView` ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r170577818 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala --- @@ -50,6 +50,80 @@ class OverWindowITCase extends StreamingWithStateTestBase { (8L, 8, "Hello World"), (20L, 20, "Hello World")) + @Test + def testUnboundedDistinctGroupWindow(): Unit = { --- End diff -- Remove this test case. The distinct in this case is solved by `AggregateExpandDistinctAggregatesRule` ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r170577731 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1393,6 +1393,21 @@ object AggregateUtil { throw new TableException(s"unsupported Function: '${unSupported.getName}'") } } + + // create distinct accumulator delegate + if (aggregateCall.isDistinct) { --- End diff -- It is better to move this logic into the upper match. Distinct is also a kind of aggregate whose fields should not be empty. Also, we can reuse some variables. What do you think? ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
GitHub user walterddr opened a pull request: https://github.com/apache/flink/pull/ [FLINK-8689][table]Add runtime support of distinct filter using MapView for codegen ## What is the purpose of the change - Adding in runtime support using distinct aggregation delegate to support distinct filtering using MapView. This is only fixing the currently broken over-window aggregate with distinct filter, however this change is meant to be used by other distinct operations on datastream such as group window aggregate, group aggregate, etc. ## Brief change log - Adding a new DistinctAggDelegateFunction and DistinctAccumulator that encapsulates any real aggregate function. - change codegen to specifically generate distinct filter before invoking the actual aggregate function. - adding in more codegen specifically for using delegates on merge and reset. ## Verifying this change This change added tests and can be verified as follows: - Added over-window unit-test to verify generated plan before codegen. - Added integration tests for testing distinct over-window aggregate end-to-end. ## Does this pull request potentially affect one of the following parts: no ## Documentation this does not expose and additional external facing functionality, which will come in separated PR. You can merge this pull request into a Git repository by running: $ git pull https://github.com/walterddr/flink FLINK-8689 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes # commit cb853e6d1adfbb04819a234f8fbced32a8ffb21f Author: Rong RongDate: 2018-02-21T21:40:29Z support distinct agg function delegate using mapview accumulator in runtime environment commit fff7fa48f07b350f6fd4565f58d6a35af88faa50 Author: Rong Rong Date: 2018-02-21T21:43:02Z add over window aggregate and unbounded aggregate ITCase commit 03fc641ac01af6867d2516263bcea0ac96f1802c Author: Rong Rong Date: 2018-02-22T05:15:26Z adding in overwindow logical test cases to verify distinct modifier ---