[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r184442107
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
 ---
@@ -50,6 +50,155 @@ class OverWindowITCase extends 
StreamingWithStateTestBase {
 (8L, 8, "Hello World"),
 (20L, 20, "Hello World"))
 
+  @Test
+  def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+StreamITCase.clear
+
+val t = StreamTestData.get5TupleDataStream(env)
+  .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+
+val sqlQuery = "SELECT a, " +
+  "  SUM(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW), " +
+  "  MIN(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW), " +
+  "  COLLECT(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW) " +
+  "FROM MyTable"
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List(
+  "1,1,1,{1=1}",
+  "2,2,2,{2=1}",
+  "2,3,1,{1=1, 2=1}",
+  "3,2,2,{2=1}",
+  "3,2,2,{2=1}",
+  "3,5,2,{2=1, 3=1}",
+  "4,2,2,{2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "5,1,1,{1=1}",
+  "5,4,1,{1=1, 3=1}",
+  "5,4,1,{1=1, 3=1}",
+  "5,6,1,{1=1, 2=1, 3=1}",
+  "5,5,2,{2=1, 3=1}")
+assertEquals(expected, StreamITCase.testResults)
+  }
+
+  @Test
+  def testProcTimeDistinctUnboundedPartitionedRowsOver(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+StreamITCase.clear
+
+val t = StreamTestData.get5TupleDataStream(env)
+  .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+
+val sqlQuery = "SELECT a, " +
+  "  COUNT(e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
+  "  SUM(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
+  "  MIN(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding) " +
+  "FROM MyTable"
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List(
+  "1,1,1,1",
+  "2,1,2,2",
+  "2,2,3,1",
+  "3,1,2,2",
+  "3,2,2,2",
+  "3,3,5,2",
+  "4,1,2,2",
+  "4,2,3,1",
+  "4,3,3,1",
+  "4,4,3,1",
+  "5,1,1,1",
+  "5,2,4,1",
+  "5,3,4,1",
+  "5,4,6,1",
+  "5,5,6,1")
+assertEquals(expected, StreamITCase.testResults)
+  }
+
+  @Test
+  def testRowTimeDistinctBoundedNonPartitionedRowsOver(): Unit = {
+// use out-of-order data to test distinct accumulator remove
+val data = Seq(
+  Left((2L, (2L, 2, "Hello"))),
+  Left((2L, (2L, 2, "Hello"))),
+  Left((1L, (1L, 1, "Hello"))),
+  Left((1L, (1L, 1, "Hello"))),
+  Left((2L, (2L, 2, "Hello"))),
+  Left((1L, (1L, 1, "Hello"))),
+  Left((20L, (20L, 20, "Hello World"))), // early row
+  Right(3L),
+  Left((2L, (2L, 2, "Hello"))), // late row
+  Left((3L, (3L, 3, "Hello"))),
+  Left((4L, (4L, 4, "Hello"))),
+  Left((5L, (5L, 5, "Hello"))),
+  Left((6L, (6L, 6, "Hello"))),
+  Left((7L, (7L, 7, "Hello World"))),
+  Right(7L),
+  Left((9L, (9L, 9, "Hello World"))),
+  Left((8L, (8L, 8, "Hello World"))),
+  Left((8L, (8L, 8, "Hello World"))),
+  Right(20L))
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+env.setStateBackend(getStateBackend)
+env.setParallelism(1)
--- End diff --

By default the LocalExecEnv chooses the parallelism is picked based on the 
number CPU cores. Are you running the tests in some kind of container?


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-26 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r184441869
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAccumulator.scala
 ---
@@ -47,60 +51,51 @@ class DistinctAccumulator[E, ACC](var realAcc: ACC, var 
mapView: MapView[E, JLon
   override def equals(that: Any): Boolean =
 that match {
   case that: DistinctAccumulator[E, ACC] => that.canEqual(this) &&
-this.mapView == that.mapView
+this.distinctValueMap == that.distinctValueMap
   case _ => false
 }
 
   def add(element: E): Boolean = {
-if (element != null) {
-  val currentVal = mapView.get(element)
-  if (currentVal != null) {
-mapView.put(element, currentVal + 1L)
-false
-  } else {
-mapView.put(element, 1L)
-true
-  }
-} else {
+val wrappedElement = Row.of(element)
--- End diff --

Thanks @fhueske for the insight. yeah I thought about that before the last 
commit but didn't go through with it, since we still need to construct the row 
of single element before passing to the distinct accumulator. But you are 
right, it will make future optimization easier


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r184419816
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAccumulator.scala
 ---
@@ -47,60 +51,51 @@ class DistinctAccumulator[E, ACC](var realAcc: ACC, var 
mapView: MapView[E, JLon
   override def equals(that: Any): Boolean =
 that match {
   case that: DistinctAccumulator[E, ACC] => that.canEqual(this) &&
-this.mapView == that.mapView
+this.distinctValueMap == that.distinctValueMap
   case _ => false
 }
 
   def add(element: E): Boolean = {
-if (element != null) {
-  val currentVal = mapView.get(element)
-  if (currentVal != null) {
-mapView.put(element, currentVal + 1L)
-false
-  } else {
-mapView.put(element, 1L)
-true
-  }
-} else {
+val wrappedElement = Row.of(element)
--- End diff --

I think we should remove the `E` type parameter and directly pass the `Row` 
as an argument. That will also make the extension to multiple arguments very 
easy. 

Actually, I think I'll do that before merging


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-26 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r184429742
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
 ---
@@ -50,6 +50,155 @@ class OverWindowITCase extends 
StreamingWithStateTestBase {
 (8L, 8, "Hello World"),
 (20L, 20, "Hello World"))
 
+  @Test
+  def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+StreamITCase.clear
+
+val t = StreamTestData.get5TupleDataStream(env)
+  .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+
+val sqlQuery = "SELECT a, " +
+  "  SUM(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW), " +
+  "  MIN(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW), " +
+  "  COLLECT(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW) " +
+  "FROM MyTable"
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List(
+  "1,1,1,{1=1}",
+  "2,2,2,{2=1}",
+  "2,3,1,{1=1, 2=1}",
+  "3,2,2,{2=1}",
+  "3,2,2,{2=1}",
+  "3,5,2,{2=1, 3=1}",
+  "4,2,2,{2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "5,1,1,{1=1}",
+  "5,4,1,{1=1, 3=1}",
+  "5,4,1,{1=1, 3=1}",
+  "5,6,1,{1=1, 2=1, 3=1}",
+  "5,5,2,{2=1, 3=1}")
+assertEquals(expected, StreamITCase.testResults)
+  }
+
+  @Test
+  def testProcTimeDistinctUnboundedPartitionedRowsOver(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+StreamITCase.clear
+
+val t = StreamTestData.get5TupleDataStream(env)
+  .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+
+val sqlQuery = "SELECT a, " +
+  "  COUNT(e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
+  "  SUM(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
+  "  MIN(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding) " +
+  "FROM MyTable"
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List(
+  "1,1,1,1",
+  "2,1,2,2",
+  "2,2,3,1",
+  "3,1,2,2",
+  "3,2,2,2",
+  "3,3,5,2",
+  "4,1,2,2",
+  "4,2,3,1",
+  "4,3,3,1",
+  "4,4,3,1",
+  "5,1,1,1",
+  "5,2,4,1",
+  "5,3,4,1",
+  "5,4,6,1",
+  "5,5,6,1")
+assertEquals(expected, StreamITCase.testResults)
+  }
+
+  @Test
+  def testRowTimeDistinctBoundedNonPartitionedRowsOver(): Unit = {
+// use out-of-order data to test distinct accumulator remove
+val data = Seq(
+  Left((2L, (2L, 2, "Hello"))),
+  Left((2L, (2L, 2, "Hello"))),
+  Left((1L, (1L, 1, "Hello"))),
+  Left((1L, (1L, 1, "Hello"))),
+  Left((2L, (2L, 2, "Hello"))),
+  Left((1L, (1L, 1, "Hello"))),
+  Left((20L, (20L, 20, "Hello World"))), // early row
+  Right(3L),
+  Left((2L, (2L, 2, "Hello"))), // late row
+  Left((3L, (3L, 3, "Hello"))),
+  Left((4L, (4L, 4, "Hello"))),
+  Left((5L, (5L, 5, "Hello"))),
+  Left((6L, (6L, 6, "Hello"))),
+  Left((7L, (7L, 7, "Hello World"))),
+  Right(7L),
+  Left((9L, (9L, 9, "Hello World"))),
+  Left((8L, (8L, 8, "Hello World"))),
+  Left((8L, (8L, 8, "Hello World"))),
+  Right(20L))
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+env.setStateBackend(getStateBackend)
+env.setParallelism(1)
--- End diff --

hmm. I still don't understand why this ITCase failed, it runs fine with or 
without `parallelism(1)` for me. Is this Travis CI failure the indeterministic 

[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-26 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r184428065
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
 ---
@@ -50,6 +50,155 @@ class OverWindowITCase extends 
StreamingWithStateTestBase {
 (8L, 8, "Hello World"),
 (20L, 20, "Hello World"))
 
+  @Test
+  def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+StreamITCase.clear
+
+val t = StreamTestData.get5TupleDataStream(env)
+  .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+
+val sqlQuery = "SELECT a, " +
+  "  SUM(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW), " +
+  "  MIN(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW), " +
+  "  COLLECT(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW) " +
+  "FROM MyTable"
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List(
+  "1,1,1,{1=1}",
+  "2,2,2,{2=1}",
+  "2,3,1,{1=1, 2=1}",
+  "3,2,2,{2=1}",
+  "3,2,2,{2=1}",
+  "3,5,2,{2=1, 3=1}",
+  "4,2,2,{2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "5,1,1,{1=1}",
+  "5,4,1,{1=1, 3=1}",
+  "5,4,1,{1=1, 3=1}",
+  "5,6,1,{1=1, 2=1, 3=1}",
+  "5,5,2,{2=1, 3=1}")
+assertEquals(expected, StreamITCase.testResults)
+  }
+
+  @Test
+  def testProcTimeDistinctUnboundedPartitionedRowsOver(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+StreamITCase.clear
+
+val t = StreamTestData.get5TupleDataStream(env)
+  .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+
+val sqlQuery = "SELECT a, " +
+  "  COUNT(e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
+  "  SUM(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
+  "  MIN(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding) " +
+  "FROM MyTable"
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List(
+  "1,1,1,1",
+  "2,1,2,2",
+  "2,2,3,1",
+  "3,1,2,2",
+  "3,2,2,2",
+  "3,3,5,2",
+  "4,1,2,2",
+  "4,2,3,1",
+  "4,3,3,1",
+  "4,4,3,1",
+  "5,1,1,1",
+  "5,2,4,1",
+  "5,3,4,1",
+  "5,4,6,1",
+  "5,5,6,1")
+assertEquals(expected, StreamITCase.testResults)
+  }
+
+  @Test
+  def testRowTimeDistinctBoundedNonPartitionedRowsOver(): Unit = {
+// use out-of-order data to test distinct accumulator remove
+val data = Seq(
+  Left((2L, (2L, 2, "Hello"))),
+  Left((2L, (2L, 2, "Hello"))),
+  Left((1L, (1L, 1, "Hello"))),
+  Left((1L, (1L, 1, "Hello"))),
+  Left((2L, (2L, 2, "Hello"))),
+  Left((1L, (1L, 1, "Hello"))),
+  Left((20L, (20L, 20, "Hello World"))), // early row
+  Right(3L),
+  Left((2L, (2L, 2, "Hello"))), // late row
+  Left((3L, (3L, 3, "Hello"))),
+  Left((4L, (4L, 4, "Hello"))),
+  Left((5L, (5L, 5, "Hello"))),
+  Left((6L, (6L, 6, "Hello"))),
+  Left((7L, (7L, 7, "Hello World"))),
+  Right(7L),
+  Left((9L, (9L, 9, "Hello World"))),
+  Left((8L, (8L, 8, "Hello World"))),
+  Left((8L, (8L, 8, "Hello World"))),
+  Right(20L))
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+env.setStateBackend(getStateBackend)
+env.setParallelism(1)
--- End diff --

I think all other DISTINCT aggregates are tested against proctime so I was 
just trying to use this one to test out-of-order delivery, as well as test 
against 

[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r184405910
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
 ---
@@ -50,6 +50,155 @@ class OverWindowITCase extends 
StreamingWithStateTestBase {
 (8L, 8, "Hello World"),
 (20L, 20, "Hello World"))
 
+  @Test
+  def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+StreamITCase.clear
+
+val t = StreamTestData.get5TupleDataStream(env)
+  .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+
+val sqlQuery = "SELECT a, " +
+  "  SUM(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW), " +
+  "  MIN(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW), " +
+  "  COLLECT(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW) " +
+  "FROM MyTable"
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List(
+  "1,1,1,{1=1}",
+  "2,2,2,{2=1}",
+  "2,3,1,{1=1, 2=1}",
+  "3,2,2,{2=1}",
+  "3,2,2,{2=1}",
+  "3,5,2,{2=1, 3=1}",
+  "4,2,2,{2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "5,1,1,{1=1}",
+  "5,4,1,{1=1, 3=1}",
+  "5,4,1,{1=1, 3=1}",
+  "5,6,1,{1=1, 2=1, 3=1}",
+  "5,5,2,{2=1, 3=1}")
+assertEquals(expected, StreamITCase.testResults)
+  }
+
+  @Test
+  def testProcTimeDistinctUnboundedPartitionedRowsOver(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+StreamITCase.clear
+
+val t = StreamTestData.get5TupleDataStream(env)
+  .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+
+val sqlQuery = "SELECT a, " +
+  "  COUNT(e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
+  "  SUM(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
+  "  MIN(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding) " +
+  "FROM MyTable"
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List(
+  "1,1,1,1",
+  "2,1,2,2",
+  "2,2,3,1",
+  "3,1,2,2",
+  "3,2,2,2",
+  "3,3,5,2",
+  "4,1,2,2",
+  "4,2,3,1",
+  "4,3,3,1",
+  "4,4,3,1",
+  "5,1,1,1",
+  "5,2,4,1",
+  "5,3,4,1",
+  "5,4,6,1",
+  "5,5,6,1")
+assertEquals(expected, StreamITCase.testResults)
+  }
+
+  @Test
+  def testRowTimeDistinctBoundedNonPartitionedRowsOver(): Unit = {
+// use out-of-order data to test distinct accumulator remove
+val data = Seq(
+  Left((2L, (2L, 2, "Hello"))),
+  Left((2L, (2L, 2, "Hello"))),
+  Left((1L, (1L, 1, "Hello"))),
+  Left((1L, (1L, 1, "Hello"))),
+  Left((2L, (2L, 2, "Hello"))),
+  Left((1L, (1L, 1, "Hello"))),
+  Left((20L, (20L, 20, "Hello World"))), // early row
+  Right(3L),
+  Left((2L, (2L, 2, "Hello"))), // late row
+  Left((3L, (3L, 3, "Hello"))),
+  Left((4L, (4L, 4, "Hello"))),
+  Left((5L, (5L, 5, "Hello"))),
+  Left((6L, (6L, 6, "Hello"))),
+  Left((7L, (7L, 7, "Hello World"))),
+  Right(7L),
+  Left((9L, (9L, 9, "Hello World"))),
+  Left((8L, (8L, 8, "Hello World"))),
+  Left((8L, (8L, 8, "Hello World"))),
+  Right(20L))
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+env.setStateBackend(getStateBackend)
+env.setParallelism(1)
--- End diff --

Also, the event-time test is not required. We test the retract case also 
with BOUNDED OVER windows (rows the fall out of the window are retracted).


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r184405329
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
 ---
@@ -50,6 +50,155 @@ class OverWindowITCase extends 
StreamingWithStateTestBase {
 (8L, 8, "Hello World"),
 (20L, 20, "Hello World"))
 
+  @Test
+  def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+StreamITCase.clear
+
+val t = StreamTestData.get5TupleDataStream(env)
+  .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+
+val sqlQuery = "SELECT a, " +
+  "  SUM(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW), " +
+  "  MIN(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW), " +
+  "  COLLECT(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW) " +
+  "FROM MyTable"
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List(
+  "1,1,1,{1=1}",
+  "2,2,2,{2=1}",
+  "2,3,1,{1=1, 2=1}",
+  "3,2,2,{2=1}",
+  "3,2,2,{2=1}",
+  "3,5,2,{2=1, 3=1}",
+  "4,2,2,{2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "5,1,1,{1=1}",
+  "5,4,1,{1=1, 3=1}",
+  "5,4,1,{1=1, 3=1}",
+  "5,6,1,{1=1, 2=1, 3=1}",
+  "5,5,2,{2=1, 3=1}")
+assertEquals(expected, StreamITCase.testResults)
+  }
+
+  @Test
+  def testProcTimeDistinctUnboundedPartitionedRowsOver(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+StreamITCase.clear
+
+val t = StreamTestData.get5TupleDataStream(env)
+  .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+
+val sqlQuery = "SELECT a, " +
+  "  COUNT(e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
+  "  SUM(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
+  "  MIN(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding) " +
+  "FROM MyTable"
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List(
+  "1,1,1,1",
+  "2,1,2,2",
+  "2,2,3,1",
+  "3,1,2,2",
+  "3,2,2,2",
+  "3,3,5,2",
+  "4,1,2,2",
+  "4,2,3,1",
+  "4,3,3,1",
+  "4,4,3,1",
+  "5,1,1,1",
+  "5,2,4,1",
+  "5,3,4,1",
+  "5,4,6,1",
+  "5,5,6,1")
+assertEquals(expected, StreamITCase.testResults)
+  }
+
+  @Test
+  def testRowTimeDistinctBoundedNonPartitionedRowsOver(): Unit = {
+// use out-of-order data to test distinct accumulator remove
+val data = Seq(
+  Left((2L, (2L, 2, "Hello"))),
+  Left((2L, (2L, 2, "Hello"))),
+  Left((1L, (1L, 1, "Hello"))),
+  Left((1L, (1L, 1, "Hello"))),
+  Left((2L, (2L, 2, "Hello"))),
+  Left((1L, (1L, 1, "Hello"))),
+  Left((20L, (20L, 20, "Hello World"))), // early row
+  Right(3L),
+  Left((2L, (2L, 2, "Hello"))), // late row
+  Left((3L, (3L, 3, "Hello"))),
+  Left((4L, (4L, 4, "Hello"))),
+  Left((5L, (5L, 5, "Hello"))),
+  Left((6L, (6L, 6, "Hello"))),
+  Left((7L, (7L, 7, "Hello World"))),
+  Right(7L),
+  Left((9L, (9L, 9, "Hello World"))),
+  Left((8L, (8L, 8, "Hello World"))),
+  Left((8L, (8L, 8, "Hello World"))),
+  Right(20L))
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+env.setStateBackend(getStateBackend)
+env.setParallelism(1)
--- End diff --

Actually, I was wrong on this one. Late elements are not deterministic 
handled if p > 1.
Will change it back.


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-25 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r184115873
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -151,8 +157,36 @@ class AggregationCodeGenerator(
   }
 }
 
-// initialize and create data views
-addReusableDataViews()
+// get distinct filter of acc fields for each aggregate functions
+val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}"
+
+val distinctAggs: Array[Seq[DataViewSpec[_]]] = 
isDistinctAggs.zipWithIndex.map {
+  case (isDistinctAgg, idx) => if (isDistinctAgg) {
+val fieldIndex: Int = aggFields(idx)(0)
+val mapViewTypeInfo = new MapViewTypeInfo(
+  physicalInputTypes(fieldIndex), BasicTypeInfo.LONG_TYPE_INFO)
--- End diff --

at this moment it will disregard any null value. But as you mention this is 
not correct. Will address. 


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-25 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r184125030
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAccumulator.scala
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Long => JLong}
+import java.lang.{Iterable => JIterable}
+import java.util.{Map => JMap}
+
+import org.apache.flink.table.api.dataview.MapView
+
+/**
+  * The base class for accumulator wrapper when applying distinct 
aggregation.
+  * @param realAcc the actual accumulator which gets invoke after distinct 
filter.
+  * @param mapView the [[MapView]] element used to store the distinct 
filter hash map.
+  * @tparam E the element type for the distinct filter hash map.
+  * @tparam ACC the accumulator type for the realAcc.
+  */
+class DistinctAccumulator[E, ACC](var realAcc: ACC, var mapView: 
MapView[E, JLong]) {
+  def this() {
+this(null.asInstanceOf[ACC], new MapView[E, JLong]())
+  }
+
+  def this(realAcc: ACC) {
+this(realAcc, new MapView[E, JLong]())
+  }
+
+  def getRealAcc: ACC = realAcc
+
+  def canEqual(a: Any): Boolean = a.isInstanceOf[DistinctAccumulator[E, 
ACC]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: DistinctAccumulator[E, ACC] => that.canEqual(this) &&
+this.mapView == that.mapView
+  case _ => false
+}
+
+  def add(element: E): Boolean = {
+if (element != null) {
--- End diff --

Great catch @fhueske. Sorry I misunderstood what you meant previous 
regrading `null` value. This line was to disregard all `null` values. But as 
you mentioned, some operators might treat `null` value differently, e.g. 
`COUNT` aggregators. I added the handling and the tests as well. 


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-25 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r184115947
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -151,8 +157,36 @@ class AggregationCodeGenerator(
   }
 }
 
-// initialize and create data views
-addReusableDataViews()
+// get distinct filter of acc fields for each aggregate functions
+val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}"
+
+val distinctAggs: Array[Seq[DataViewSpec[_]]] = 
isDistinctAggs.zipWithIndex.map {
+  case (isDistinctAgg, idx) => if (isDistinctAgg) {
+val fieldIndex: Int = aggFields(idx)(0)
+val mapViewTypeInfo = new MapViewTypeInfo(
+  physicalInputTypes(fieldIndex), BasicTypeInfo.LONG_TYPE_INFO)
+Seq(
+  MapViewSpec(
+"distinctAgg" + idx + "_field" + fieldIndex,
--- End diff --

+1


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r184041497
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -207,112 +241,164 @@ class AggregationCodeGenerator(
 }
 }
 
+
 /**
-  * Create DataView Term, for example, acc1_map_dataview.
-  *
-  * @param aggIndex index of aggregate function
-  * @param fieldName field name of DataView
-  * @return term to access [[MapView]] or [[ListView]]
+  * Add all data view for all distinct filters defined by aggregation 
functions.
   */
-def createDataViewTerm(aggIndex: Int, fieldName: String): String = {
-  s"acc${aggIndex}_${fieldName}_dataview"
+def addDistinctFilterDataViews(): Unit = {
+  val descMapping: Map[String, StateDescriptor[_, _]] = distinctAggs
+.flatMap(specs => specs.map(s => (s.stateId, s.toStateDescriptor)))
+.toMap[String, StateDescriptor[_ <: State, _]]
+  if (isStateBackedDataViews) {
+for (i <- aggs.indices) yield {
+  for (spec <- distinctAggs(i)) {
+// Check if stat descriptor exists.
+val desc: StateDescriptor[_, _] = 
descMapping.getOrElse(spec.stateId,
+  throw new CodeGenException(
+s"Can not find DataView for distinct filter in accumulator 
by id: ${spec.stateId}"))
+
+addReusableDataView(spec, desc, i)
+  }
+}
+  }
 }
 
 /**
-  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
-  * close and member area of the generated function.
-  *
+  * Add all data views for all field accumulators defined by 
aggregation functions.
   */
-def addReusableDataViews(): Unit = {
+def addAccumulatorDataViews(): Unit = {
   if (accConfig.isDefined) {
 val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.get
   .flatMap(specs => specs.map(s => (s.stateId, 
s.toStateDescriptor)))
   .toMap[String, StateDescriptor[_ <: State, _]]
 
 for (i <- aggs.indices) yield {
   for (spec <- accConfig.get(i)) yield {
-val dataViewField = spec.field
-val dataViewTypeTerm = dataViewField.getType.getCanonicalName
-val desc = descMapping.getOrElse(spec.stateId,
+// Check if stat descriptor exists.
+val desc: StateDescriptor[_, _] = 
descMapping.getOrElse(spec.stateId,
   throw new CodeGenException(
 s"Can not find DataView in accumulator by id: 
${spec.stateId}"))
 
-// define the DataView variables
-val serializedData = serializeStateDescriptor(desc)
-val dataViewFieldTerm = createDataViewTerm(i, 
dataViewField.getName)
-val field =
-  s"""
- |final $dataViewTypeTerm $dataViewFieldTerm;
- |""".stripMargin
-reusableMemberStatements.add(field)
-
-// create DataViews
-val descFieldTerm = s"${dataViewFieldTerm}_desc"
-val descClassQualifier = classOf[StateDescriptor[_, 
_]].getCanonicalName
-val descDeserializeCode =
-  s"""
- |$descClassQualifier $descFieldTerm = 
($descClassQualifier)
- |  
org.apache.flink.util.InstantiationUtil.deserializeObject(
- |  
org.apache.commons.codec.binary.Base64.decodeBase64("$serializedData"),
- |  $contextTerm.getUserCodeClassLoader());
- |""".stripMargin
-val createDataView = if (dataViewField.getType == 
classOf[MapView[_, _]]) {
-  s"""
- |$descDeserializeCode
- |$dataViewFieldTerm = new 
org.apache.flink.table.dataview.StateMapView(
- |  $contextTerm.getMapState((
- |  
org.apache.flink.api.common.state.MapStateDescriptor)$descFieldTerm));
- |""".stripMargin
-} else if (dataViewField.getType == classOf[ListView[_]]) {
-  s"""
- |$descDeserializeCode
- |$dataViewFieldTerm = new 
org.apache.flink.table.dataview.StateListView(
- |  $contextTerm.getListState((
- |  
org.apache.flink.api.common.state.ListStateDescriptor)$descFieldTerm));
- |""".stripMargin
-} else {
-  throw new CodeGenException(s"Unsupported dataview type: 

[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r184041109
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -207,112 +241,164 @@ class AggregationCodeGenerator(
 }
 }
 
+
 /**
-  * Create DataView Term, for example, acc1_map_dataview.
-  *
-  * @param aggIndex index of aggregate function
-  * @param fieldName field name of DataView
-  * @return term to access [[MapView]] or [[ListView]]
+  * Add all data view for all distinct filters defined by aggregation 
functions.
   */
-def createDataViewTerm(aggIndex: Int, fieldName: String): String = {
-  s"acc${aggIndex}_${fieldName}_dataview"
+def addDistinctFilterDataViews(): Unit = {
+  val descMapping: Map[String, StateDescriptor[_, _]] = distinctAggs
+.flatMap(specs => specs.map(s => (s.stateId, s.toStateDescriptor)))
+.toMap[String, StateDescriptor[_ <: State, _]]
+  if (isStateBackedDataViews) {
+for (i <- aggs.indices) yield {
+  for (spec <- distinctAggs(i)) {
+// Check if stat descriptor exists.
+val desc: StateDescriptor[_, _] = 
descMapping.getOrElse(spec.stateId,
+  throw new CodeGenException(
+s"Can not find DataView for distinct filter in accumulator 
by id: ${spec.stateId}"))
+
+addReusableDataView(spec, desc, i)
+  }
+}
+  }
 }
 
 /**
-  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
-  * close and member area of the generated function.
-  *
+  * Add all data views for all field accumulators defined by 
aggregation functions.
   */
-def addReusableDataViews(): Unit = {
+def addAccumulatorDataViews(): Unit = {
   if (accConfig.isDefined) {
 val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.get
   .flatMap(specs => specs.map(s => (s.stateId, 
s.toStateDescriptor)))
   .toMap[String, StateDescriptor[_ <: State, _]]
 
 for (i <- aggs.indices) yield {
   for (spec <- accConfig.get(i)) yield {
-val dataViewField = spec.field
-val dataViewTypeTerm = dataViewField.getType.getCanonicalName
-val desc = descMapping.getOrElse(spec.stateId,
+// Check if stat descriptor exists.
+val desc: StateDescriptor[_, _] = 
descMapping.getOrElse(spec.stateId,
   throw new CodeGenException(
 s"Can not find DataView in accumulator by id: 
${spec.stateId}"))
 
-// define the DataView variables
-val serializedData = serializeStateDescriptor(desc)
-val dataViewFieldTerm = createDataViewTerm(i, 
dataViewField.getName)
-val field =
-  s"""
- |final $dataViewTypeTerm $dataViewFieldTerm;
- |""".stripMargin
-reusableMemberStatements.add(field)
-
-// create DataViews
-val descFieldTerm = s"${dataViewFieldTerm}_desc"
-val descClassQualifier = classOf[StateDescriptor[_, 
_]].getCanonicalName
-val descDeserializeCode =
-  s"""
- |$descClassQualifier $descFieldTerm = 
($descClassQualifier)
- |  
org.apache.flink.util.InstantiationUtil.deserializeObject(
- |  
org.apache.commons.codec.binary.Base64.decodeBase64("$serializedData"),
- |  $contextTerm.getUserCodeClassLoader());
- |""".stripMargin
-val createDataView = if (dataViewField.getType == 
classOf[MapView[_, _]]) {
-  s"""
- |$descDeserializeCode
- |$dataViewFieldTerm = new 
org.apache.flink.table.dataview.StateMapView(
- |  $contextTerm.getMapState((
- |  
org.apache.flink.api.common.state.MapStateDescriptor)$descFieldTerm));
- |""".stripMargin
-} else if (dataViewField.getType == classOf[ListView[_]]) {
-  s"""
- |$descDeserializeCode
- |$dataViewFieldTerm = new 
org.apache.flink.table.dataview.StateListView(
- |  $contextTerm.getListState((
- |  
org.apache.flink.api.common.state.ListStateDescriptor)$descFieldTerm));
- |""".stripMargin
-} else {
-  throw new CodeGenException(s"Unsupported dataview type: 

[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r184040988
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -207,112 +241,164 @@ class AggregationCodeGenerator(
 }
 }
 
+
 /**
-  * Create DataView Term, for example, acc1_map_dataview.
-  *
-  * @param aggIndex index of aggregate function
-  * @param fieldName field name of DataView
-  * @return term to access [[MapView]] or [[ListView]]
+  * Add all data view for all distinct filters defined by aggregation 
functions.
   */
-def createDataViewTerm(aggIndex: Int, fieldName: String): String = {
-  s"acc${aggIndex}_${fieldName}_dataview"
+def addDistinctFilterDataViews(): Unit = {
+  val descMapping: Map[String, StateDescriptor[_, _]] = distinctAggs
+.flatMap(specs => specs.map(s => (s.stateId, s.toStateDescriptor)))
+.toMap[String, StateDescriptor[_ <: State, _]]
+  if (isStateBackedDataViews) {
+for (i <- aggs.indices) yield {
+  for (spec <- distinctAggs(i)) {
+// Check if stat descriptor exists.
+val desc: StateDescriptor[_, _] = 
descMapping.getOrElse(spec.stateId,
+  throw new CodeGenException(
+s"Can not find DataView for distinct filter in accumulator 
by id: ${spec.stateId}"))
+
+addReusableDataView(spec, desc, i)
+  }
+}
+  }
 }
 
 /**
-  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
-  * close and member area of the generated function.
-  *
+  * Add all data views for all field accumulators defined by 
aggregation functions.
   */
-def addReusableDataViews(): Unit = {
+def addAccumulatorDataViews(): Unit = {
   if (accConfig.isDefined) {
 val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.get
   .flatMap(specs => specs.map(s => (s.stateId, 
s.toStateDescriptor)))
   .toMap[String, StateDescriptor[_ <: State, _]]
 
 for (i <- aggs.indices) yield {
   for (spec <- accConfig.get(i)) yield {
-val dataViewField = spec.field
-val dataViewTypeTerm = dataViewField.getType.getCanonicalName
-val desc = descMapping.getOrElse(spec.stateId,
+// Check if stat descriptor exists.
+val desc: StateDescriptor[_, _] = 
descMapping.getOrElse(spec.stateId,
   throw new CodeGenException(
 s"Can not find DataView in accumulator by id: 
${spec.stateId}"))
 
-// define the DataView variables
-val serializedData = serializeStateDescriptor(desc)
-val dataViewFieldTerm = createDataViewTerm(i, 
dataViewField.getName)
-val field =
-  s"""
- |final $dataViewTypeTerm $dataViewFieldTerm;
- |""".stripMargin
-reusableMemberStatements.add(field)
-
-// create DataViews
-val descFieldTerm = s"${dataViewFieldTerm}_desc"
-val descClassQualifier = classOf[StateDescriptor[_, 
_]].getCanonicalName
-val descDeserializeCode =
-  s"""
- |$descClassQualifier $descFieldTerm = 
($descClassQualifier)
- |  
org.apache.flink.util.InstantiationUtil.deserializeObject(
- |  
org.apache.commons.codec.binary.Base64.decodeBase64("$serializedData"),
- |  $contextTerm.getUserCodeClassLoader());
- |""".stripMargin
-val createDataView = if (dataViewField.getType == 
classOf[MapView[_, _]]) {
-  s"""
- |$descDeserializeCode
- |$dataViewFieldTerm = new 
org.apache.flink.table.dataview.StateMapView(
- |  $contextTerm.getMapState((
- |  
org.apache.flink.api.common.state.MapStateDescriptor)$descFieldTerm));
- |""".stripMargin
-} else if (dataViewField.getType == classOf[ListView[_]]) {
-  s"""
- |$descDeserializeCode
- |$dataViewFieldTerm = new 
org.apache.flink.table.dataview.StateListView(
- |  $contextTerm.getListState((
- |  
org.apache.flink.api.common.state.ListStateDescriptor)$descFieldTerm));
- |""".stripMargin
-} else {
-  throw new CodeGenException(s"Unsupported dataview type: 

[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r184041605
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -207,112 +241,164 @@ class AggregationCodeGenerator(
 }
 }
 
+
 /**
-  * Create DataView Term, for example, acc1_map_dataview.
-  *
-  * @param aggIndex index of aggregate function
-  * @param fieldName field name of DataView
-  * @return term to access [[MapView]] or [[ListView]]
+  * Add all data view for all distinct filters defined by aggregation 
functions.
   */
-def createDataViewTerm(aggIndex: Int, fieldName: String): String = {
-  s"acc${aggIndex}_${fieldName}_dataview"
+def addDistinctFilterDataViews(): Unit = {
+  val descMapping: Map[String, StateDescriptor[_, _]] = distinctAggs
+.flatMap(specs => specs.map(s => (s.stateId, s.toStateDescriptor)))
+.toMap[String, StateDescriptor[_ <: State, _]]
+  if (isStateBackedDataViews) {
+for (i <- aggs.indices) yield {
+  for (spec <- distinctAggs(i)) {
+// Check if stat descriptor exists.
+val desc: StateDescriptor[_, _] = 
descMapping.getOrElse(spec.stateId,
+  throw new CodeGenException(
+s"Can not find DataView for distinct filter in accumulator 
by id: ${spec.stateId}"))
+
+addReusableDataView(spec, desc, i)
+  }
+}
+  }
 }
 
 /**
-  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
-  * close and member area of the generated function.
-  *
+  * Add all data views for all field accumulators defined by 
aggregation functions.
   */
-def addReusableDataViews(): Unit = {
+def addAccumulatorDataViews(): Unit = {
   if (accConfig.isDefined) {
 val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.get
   .flatMap(specs => specs.map(s => (s.stateId, 
s.toStateDescriptor)))
   .toMap[String, StateDescriptor[_ <: State, _]]
 
 for (i <- aggs.indices) yield {
   for (spec <- accConfig.get(i)) yield {
-val dataViewField = spec.field
-val dataViewTypeTerm = dataViewField.getType.getCanonicalName
-val desc = descMapping.getOrElse(spec.stateId,
+// Check if stat descriptor exists.
+val desc: StateDescriptor[_, _] = 
descMapping.getOrElse(spec.stateId,
   throw new CodeGenException(
 s"Can not find DataView in accumulator by id: 
${spec.stateId}"))
 
-// define the DataView variables
-val serializedData = serializeStateDescriptor(desc)
-val dataViewFieldTerm = createDataViewTerm(i, 
dataViewField.getName)
-val field =
-  s"""
- |final $dataViewTypeTerm $dataViewFieldTerm;
- |""".stripMargin
-reusableMemberStatements.add(field)
-
-// create DataViews
-val descFieldTerm = s"${dataViewFieldTerm}_desc"
-val descClassQualifier = classOf[StateDescriptor[_, 
_]].getCanonicalName
-val descDeserializeCode =
-  s"""
- |$descClassQualifier $descFieldTerm = 
($descClassQualifier)
- |  
org.apache.flink.util.InstantiationUtil.deserializeObject(
- |  
org.apache.commons.codec.binary.Base64.decodeBase64("$serializedData"),
- |  $contextTerm.getUserCodeClassLoader());
- |""".stripMargin
-val createDataView = if (dataViewField.getType == 
classOf[MapView[_, _]]) {
-  s"""
- |$descDeserializeCode
- |$dataViewFieldTerm = new 
org.apache.flink.table.dataview.StateMapView(
- |  $contextTerm.getMapState((
- |  
org.apache.flink.api.common.state.MapStateDescriptor)$descFieldTerm));
- |""".stripMargin
-} else if (dataViewField.getType == classOf[ListView[_]]) {
-  s"""
- |$descDeserializeCode
- |$dataViewFieldTerm = new 
org.apache.flink.table.dataview.StateListView(
- |  $contextTerm.getListState((
- |  
org.apache.flink.api.common.state.ListStateDescriptor)$descFieldTerm));
- |""".stripMargin
-} else {
-  throw new CodeGenException(s"Unsupported dataview type: 

[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r184040192
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAccumulator.scala
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Long => JLong}
+import java.lang.{Iterable => JIterable}
+import java.util.{Map => JMap}
+
+import org.apache.flink.table.api.dataview.MapView
+
+/**
+  * The base class for accumulator wrapper when applying distinct 
aggregation.
+  * @param realAcc the actual accumulator which gets invoke after distinct 
filter.
+  * @param mapView the [[MapView]] element used to store the distinct 
filter hash map.
+  * @tparam E the element type for the distinct filter hash map.
+  * @tparam ACC the accumulator type for the realAcc.
+  */
+class DistinctAccumulator[E, ACC](var realAcc: ACC, var mapView: 
MapView[E, JLong]) {
--- End diff --

rename `mapView` to `distinctValueMap`?


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r184036382
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -151,8 +157,36 @@ class AggregationCodeGenerator(
   }
 }
 
-// initialize and create data views
-addReusableDataViews()
+// get distinct filter of acc fields for each aggregate functions
+val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}"
+
+val distinctAggs: Array[Seq[DataViewSpec[_]]] = 
isDistinctAggs.zipWithIndex.map {
+  case (isDistinctAgg, idx) => if (isDistinctAgg) {
+val fieldIndex: Int = aggFields(idx)(0)
+val mapViewTypeInfo = new MapViewTypeInfo(
+  physicalInputTypes(fieldIndex), BasicTypeInfo.LONG_TYPE_INFO)
--- End diff --

Did you check if this works for `null` keys as well, or whether we should 
wrap it in a `Row(1)`?


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r184075230
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -481,14 +622,34 @@ class AggregationCodeGenerator(
|org.apache.flink.types.Row b)
""".stripMargin
   val merge: String = {
-for (i <- aggs.indices) yield
-  j"""
- |${accTypes(i)} aAcc$i = (${accTypes(i)}) a.getField($i);
- |${accTypes(i)} bAcc$i = (${accTypes(i)}) 
b.getField(${mapping(i)});
- |accIt$i.setElement(bAcc$i);
- |${aggs(i)}.merge(aAcc$i, accIt$i);
- |a.setField($i, aAcc$i);
-  """.stripMargin
+for (i <- aggs.indices) yield {
+  if (isDistinctAggs(i)) {
+j"""
+   |$distinctAccType aDistinctAcc$i = ($distinctAccType) 
a.getField($i);
+   |$distinctAccType bDistinctAcc$i = ($distinctAccType) 
b.getField(${mapping(i)});
+   |java.util.Iterator mergeIt$i =
+   |bDistinctAcc$i.elements().iterator();
+   |while (mergeIt$i.hasNext()) {
+   |  java.util.Map.Entry entry = (java.util.Map.Entry) 
mergeIt$i.next();
+   |  Object k = entry.getKey();
+   |  Long v = (Long) entry.getValue();
+   |  if (aDistinctAcc$i.add(k, v)) {
+   |${accTypes(i)} aAcc$i = (${accTypes(i)}) 
aDistinctAcc$i.getRealAcc();
--- End diff --

Move this out of the while loop?


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r184048806
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -417,13 +548,23 @@ class AggregationCodeGenerator(
 .stripMargin
   val create: String = {
 for (i <- aggs.indices) yield {
-  j"""
- |${accTypes(i)} acc$i = (${accTypes(i)}) 
${aggs(i)}.createAccumulator();
- |${genDataViewFieldSetter(s"acc$i", i)}
- |accs.setField(
- |  $i,
- |  acc$i);"""
-.stripMargin
+  if (isDistinctAggs(i)) {
+j"""
+   |${accTypes(i)} acc$i = (${accTypes(i)}) 
${aggs(i)}.createAccumulator();
+   |$distinctAccType distinctAcc$i = ($distinctAccType) 
new org.apache.flink.table.
--- End diff --

`classOf[DistinctAccumulator].getCanonicalName`


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r184036079
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -151,8 +157,36 @@ class AggregationCodeGenerator(
   }
 }
 
-// initialize and create data views
-addReusableDataViews()
+// get distinct filter of acc fields for each aggregate functions
+val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}"
+
+val distinctAggs: Array[Seq[DataViewSpec[_]]] = 
isDistinctAggs.zipWithIndex.map {
+  case (isDistinctAgg, idx) => if (isDistinctAgg) {
+val fieldIndex: Int = aggFields(idx)(0)
--- End diff --

Add a safety check that `aggFields(idx).length == 1`


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r184037303
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -151,8 +157,36 @@ class AggregationCodeGenerator(
   }
 }
 
-// initialize and create data views
-addReusableDataViews()
+// get distinct filter of acc fields for each aggregate functions
+val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}"
+
+val distinctAggs: Array[Seq[DataViewSpec[_]]] = 
isDistinctAggs.zipWithIndex.map {
+  case (isDistinctAgg, idx) => if (isDistinctAgg) {
+val fieldIndex: Int = aggFields(idx)(0)
+val mapViewTypeInfo = new MapViewTypeInfo(
+  physicalInputTypes(fieldIndex), BasicTypeInfo.LONG_TYPE_INFO)
+Seq(
+  MapViewSpec(
+"distinctAgg" + idx + "_field" + fieldIndex,
--- End diff --

I think we can remove the `"_field" + fieldIndex` postfix. Once we add 
support for distinct multi-argument function, we have to use a single 
`MapViewSpec` with a `Row` that holds all arguments.


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r184039407
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -207,112 +241,164 @@ class AggregationCodeGenerator(
 }
 }
 
+
 /**
-  * Create DataView Term, for example, acc1_map_dataview.
-  *
-  * @param aggIndex index of aggregate function
-  * @param fieldName field name of DataView
-  * @return term to access [[MapView]] or [[ListView]]
+  * Add all data view for all distinct filters defined by aggregation 
functions.
   */
-def createDataViewTerm(aggIndex: Int, fieldName: String): String = {
-  s"acc${aggIndex}_${fieldName}_dataview"
+def addDistinctFilterDataViews(): Unit = {
+  val descMapping: Map[String, StateDescriptor[_, _]] = distinctAggs
--- End diff --

can be moved into the `if` branch (like in `addAccumulatorDataViews`)


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r184092223
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
 ---
@@ -50,6 +50,155 @@ class OverWindowITCase extends 
StreamingWithStateTestBase {
 (8L, 8, "Hello World"),
 (20L, 20, "Hello World"))
 
+  @Test
+  def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+StreamITCase.clear
+
+val t = StreamTestData.get5TupleDataStream(env)
+  .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+
+val sqlQuery = "SELECT a, " +
+  "  SUM(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW), " +
+  "  MIN(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW), " +
+  "  COLLECT(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW) " +
+  "FROM MyTable"
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List(
+  "1,1,1,{1=1}",
+  "2,2,2,{2=1}",
+  "2,3,1,{1=1, 2=1}",
+  "3,2,2,{2=1}",
+  "3,2,2,{2=1}",
+  "3,5,2,{2=1, 3=1}",
+  "4,2,2,{2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "5,1,1,{1=1}",
+  "5,4,1,{1=1, 3=1}",
+  "5,4,1,{1=1, 3=1}",
+  "5,6,1,{1=1, 2=1, 3=1}",
+  "5,5,2,{2=1, 3=1}")
+assertEquals(expected, StreamITCase.testResults)
+  }
+
+  @Test
+  def testProcTimeDistinctUnboundedPartitionedRowsOver(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+StreamITCase.clear
+
+val t = StreamTestData.get5TupleDataStream(env)
+  .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+
+val sqlQuery = "SELECT a, " +
+  "  COUNT(e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
+  "  SUM(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
+  "  MIN(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding) " +
+  "FROM MyTable"
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List(
+  "1,1,1,1",
+  "2,1,2,2",
+  "2,2,3,1",
+  "3,1,2,2",
+  "3,2,2,2",
+  "3,3,5,2",
+  "4,1,2,2",
+  "4,2,3,1",
+  "4,3,3,1",
+  "4,4,3,1",
+  "5,1,1,1",
+  "5,2,4,1",
+  "5,3,4,1",
+  "5,4,6,1",
+  "5,5,6,1")
+assertEquals(expected, StreamITCase.testResults)
+  }
+
+  @Test
+  def testRowTimeDistinctBoundedNonPartitionedRowsOver(): Unit = {
+// use out-of-order data to test distinct accumulator remove
+val data = Seq(
+  Left((2L, (2L, 2, "Hello"))),
+  Left((2L, (2L, 2, "Hello"))),
+  Left((1L, (1L, 1, "Hello"))),
+  Left((1L, (1L, 1, "Hello"))),
+  Left((2L, (2L, 2, "Hello"))),
+  Left((1L, (1L, 1, "Hello"))),
+  Left((20L, (20L, 20, "Hello World"))), // early row
+  Right(3L),
+  Left((2L, (2L, 2, "Hello"))), // late row
+  Left((3L, (3L, 3, "Hello"))),
+  Left((4L, (4L, 4, "Hello"))),
+  Left((5L, (5L, 5, "Hello"))),
+  Left((6L, (6L, 6, "Hello"))),
+  Left((7L, (7L, 7, "Hello World"))),
+  Right(7L),
+  Left((9L, (9L, 9, "Hello World"))),
+  Left((8L, (8L, 8, "Hello World"))),
+  Left((8L, (8L, 8, "Hello World"))),
+  Right(20L))
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+env.setStateBackend(getStateBackend)
+env.setParallelism(1)
--- End diff --

should also work for the default parallelism (p=1 only important for proc 
time processing)


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r184085613
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAccumulator.scala
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Long => JLong}
+import java.lang.{Iterable => JIterable}
+import java.util.{Map => JMap}
+
+import org.apache.flink.table.api.dataview.MapView
+
+/**
+  * The base class for accumulator wrapper when applying distinct 
aggregation.
+  * @param realAcc the actual accumulator which gets invoke after distinct 
filter.
+  * @param mapView the [[MapView]] element used to store the distinct 
filter hash map.
+  * @tparam E the element type for the distinct filter hash map.
+  * @tparam ACC the accumulator type for the realAcc.
+  */
+class DistinctAccumulator[E, ACC](var realAcc: ACC, var mapView: 
MapView[E, JLong]) {
+  def this() {
+this(null.asInstanceOf[ACC], new MapView[E, JLong]())
+  }
+
+  def this(realAcc: ACC) {
+this(realAcc, new MapView[E, JLong]())
+  }
+
+  def getRealAcc: ACC = realAcc
+
+  def canEqual(a: Any): Boolean = a.isInstanceOf[DistinctAccumulator[E, 
ACC]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: DistinctAccumulator[E, ACC] => that.canEqual(this) &&
+this.mapView == that.mapView
+  case _ => false
+}
+
+  def add(element: E): Boolean = {
+if (element != null) {
--- End diff --

Shouldn't it be up to the aggregation function how to handle `null` valued 
parameters?
I'd pass the null value on to the function instead of filtering it out. 
We'd need to wrap it in a `Row()` with a single field to distinguish the case 
of empty entry and null-valued entry.



---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-24 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r183880939
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -151,8 +157,15 @@ class AggregationCodeGenerator(
   }
 }
 
-// initialize and create data views
-addReusableDataViews()
+// get distinct filter of acc fields for each aggregate functions
+val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}"
+val isDistinctAggs = distinctAggs.map(_.nonEmpty)
--- End diff --

Yes. You are right. Sorry I missed that. Just updated :-)


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r183867019
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
 ---
@@ -50,6 +50,155 @@ class OverWindowITCase extends 
StreamingWithStateTestBase {
 (8L, 8, "Hello World"),
 (20L, 20, "Hello World"))
 
+  @Test
+  def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+StreamITCase.clear
+
+val t = StreamTestData.get5TupleDataStream(env)
+  .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+
+val sqlQuery = "SELECT a, " +
+  "  SUM(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW), " +
+  "  MIN(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW), " +
+  "  COLLECT(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW) " +
+  "FROM MyTable"
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List(
+  "1,1,1,{1=1}",
+  "2,2,2,{2=1}",
+  "2,3,1,{1=1, 2=1}",
+  "3,2,2,{2=1}",
+  "3,2,2,{2=1}",
+  "3,5,2,{2=1, 3=1}",
+  "4,2,2,{2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "5,1,1,{1=1}",
+  "5,4,1,{1=1, 3=1}",
+  "5,4,1,{1=1, 3=1}",
+  "5,6,1,{1=1, 2=1, 3=1}",
+  "5,5,2,{2=1, 3=1}")
+assertEquals(expected, StreamITCase.testResults)
+  }
+
+  @Test
+  def testProcTimeDistinctUnboundedPartitionedRowsOver(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+StreamITCase.clear
+
+val t = StreamTestData.get5TupleDataStream(env)
+  .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+
+val sqlQuery = "SELECT a, " +
+  "  COUNT(e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
+  "  SUM(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
+  "  MIN(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding) " +
+  "FROM MyTable"
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List(
+  "1,1,1,1",
+  "2,1,2,2",
+  "2,2,3,1",
+  "3,1,2,2",
+  "3,2,2,2",
+  "3,3,5,2",
+  "4,1,2,2",
+  "4,2,3,1",
+  "4,3,3,1",
+  "4,4,3,1",
+  "5,1,1,1",
+  "5,2,4,1",
+  "5,3,4,1",
+  "5,4,6,1",
+  "5,5,6,1")
+assertEquals(expected, StreamITCase.testResults)
+  }
+
+  @Test
+  def testRowTimeDistinctBoundedNonPartitionedRowsOver(): Unit = {
--- End diff --

Great, thanks!


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r183866776
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -151,8 +157,15 @@ class AggregationCodeGenerator(
   }
 }
 
-// initialize and create data views
-addReusableDataViews()
+// get distinct filter of acc fields for each aggregate functions
+val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}"
+val isDistinctAggs = distinctAggs.map(_.nonEmpty)
--- End diff --

I think we don't need the type info of the accumulator to create the 
`MapViewTypeInfo`, but the type info of the input type of the aggregation 
function. The input types can be resolved from `physicalInputTypes` and 
`aggFields`. 


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-23 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r183222089
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1439,7 +1467,47 @@ object AggregateUtil {
   }
 }
 
-(aggFieldIndexes, aggregates, accTypes, accSpecs)
+// create distinct accumulator filter argument
+val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
+
+aggregateCalls.zipWithIndex.foreach {
+  case (aggCall, index) =>
+if (aggCall.isDistinct) {
+  val argList: util.List[Integer] = aggCall.getArgList
+  // Only support single argument for distinct operation
+  if (argList.size() > 1) {
+throw TableException(
+  "Cannot apply distinct filter on multiple input argument 
fields at this moment!")
+  }
+  val relDataType = 
aggregateInputType.getFieldList.get(argList.get(0)).getType
+  val fieldIndex = aggFieldIndexes(index)(0)
+  val mapViewTypeInfo = new MapViewTypeInfo(
+FlinkTypeFactory.toTypeInfo(relDataType), 
BasicTypeInfo.INT_TYPE_INFO)
+
+  distinctAggs(index) = Seq(
--- End diff --

`mapViewTypeInfo` doesn't seem to be available in codegen.


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-23 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r183222353
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -327,19 +392,41 @@ class AggregationCodeGenerator(
 for (i <- aggs.indices) yield
 
   if (partialResults) {
-j"""
-   |output.setField(
-   |  ${aggMapping(i)},
-   |  (${accTypes(i)}) accs.getField($i));""".stripMargin
+if (isDistinctAggs(i)) {
+
+  j"""
+ |$distinctAccType distinctAcc$i = ($distinctAccType) 
accs.getField($i);
+ |output.setField(
+ |  ${aggMapping(i)},
+ |  (${accTypes(i)}) 
distinctAcc$i.getRealAcc());""".stripMargin
+} else {
+  j"""
+ |output.setField(
+ |  ${aggMapping(i)},
+ |  (${accTypes(i)}) accs.getField($i));""".stripMargin
+}
   } else {
-j"""
-   |org.apache.flink.table.functions.AggregateFunction 
baseClass$i =
-   |  (org.apache.flink.table.functions.AggregateFunction) 
${aggs(i)};
-   |${accTypes(i)} acc$i = (${accTypes(i)}) 
accs.getField($i);
-   |${genDataViewFieldSetter(s"acc$i", i)}
-   |output.setField(
-   |  ${aggMapping(i)},
-   |  baseClass$i.getValue(acc$i));""".stripMargin
+if (isDistinctAggs(i)) {
+  j"""
+ |org.apache.flink.table.functions.AggregateFunction 
baseClass$i =
+ |  
(org.apache.flink.table.functions.AggregateFunction) ${aggs(i)};
+ |$distinctAccType distinctAcc$i = ($distinctAccType) 
accs.getField($i);
+ |${genDistinctDataViewFieldSetter(s"distinctAcc$i", 
i)}
+ |${accTypes(i)} acc$i = (${accTypes(i)}) 
distinctAcc$i.getRealAcc();
+ |${genAccDataViewFieldSetter(s"acc$i", i)}
+ |output.setField(
+ |  ${aggMapping(i)},
+ |  baseClass$i.getValue(acc$i));""".stripMargin
+} else {
--- End diff --

yeah, that's true. it will avoid a lot of typo possibility as well. +1


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-23 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r183222457
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala
 ---
@@ -70,7 +70,12 @@ trait OverAggregate {
 
 val aggStrings = namedAggregates.map(_.getKey).map(
   a => s"${a.getAggregation}(${
-if (a.getArgList.size() > 0) {
+val prefix = if (a.isDistinct) {
--- End diff --

will address in FLINK-8690


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-23 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r183222451
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAccumulator.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util.{Map => JMap}
+import org.apache.flink.table.api.dataview.MapView
+
+/**
+  * The base class for accumulator wrapper when applying distinct 
aggregation.
+  * @param realAcc the actual accumulator which gets invoke after distinct 
filter.
+  * @param mapView the [[MapView]] element used to store the distinct 
filter hash map.
+  * @tparam E the element type for the distinct filter hash map.
+  * @tparam ACC the accumulator type for the realAcc.
+  */
+class DistinctAccumulator[E, ACC](var realAcc: ACC, var mapView: 
MapView[E, Integer]) {
+  def this() {
+this(null.asInstanceOf[ACC], new MapView[E, Integer]())
+  }
+
+  def this(realAcc: ACC) {
+this(realAcc, new MapView[E, Integer]())
+  }
+
+  def getRealAcc: ACC = realAcc
+
+  def canEqual(a: Any): Boolean = a.isInstanceOf[DistinctAccumulator[E, 
ACC]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: DistinctAccumulator[E, ACC] => that.canEqual(this) &&
+this.mapView == that.mapView
+  case _ => false
+}
+
+  def add(element: E): Boolean = {
+if (element != null) {
+  if (mapView.contains(element)) {
+mapView.put(element, mapView.get(element) + 1)
+false
+  } else {
+mapView.put(element, 1)
+true
+  }
+} else {
+  false
+}
+  }
+
+  def add(element: E, count: Int): Boolean = {
+if (element != null) {
+  if (mapView.contains(element)) {
--- End diff --

+1 here as well, just in case I forgot


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-23 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r183222077
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1439,7 +1467,47 @@ object AggregateUtil {
   }
 }
 
-(aggFieldIndexes, aggregates, accTypes, accSpecs)
+// create distinct accumulator filter argument
+val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
+
+aggregateCalls.zipWithIndex.foreach {
+  case (aggCall, index) =>
+if (aggCall.isDistinct) {
+  val argList: util.List[Integer] = aggCall.getArgList
+  // Only support single argument for distinct operation
+  if (argList.size() > 1) {
+throw TableException(
+  "Cannot apply distinct filter on multiple input argument 
fields at this moment!")
+  }
+  val relDataType = 
aggregateInputType.getFieldList.get(argList.get(0)).getType
+  val fieldIndex = aggFieldIndexes(index)(0)
+  val mapViewTypeInfo = new MapViewTypeInfo(
+FlinkTypeFactory.toTypeInfo(relDataType), 
BasicTypeInfo.INT_TYPE_INFO)
--- End diff --

That's a good point. I will add test for this use case.


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-23 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r18367
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -151,8 +157,15 @@ class AggregationCodeGenerator(
   }
 }
 
-// initialize and create data views
-addReusableDataViews()
+// get distinct filter of acc fields for each aggregate functions
+val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}"
+val isDistinctAggs = distinctAggs.map(_.nonEmpty)
--- End diff --

Hmm I tried to generate MapViewSpec in codegen but 
`AggregationCodeGenerator.generateAggregations` call signature seems to miss 
the typeinfo for accumulator (`accTypes` only shows the accumulator type, not 
the `PojoField` info). 

I can certainly changed to a more simple: `DistinctAccumulator[Object, 
Long]` but I think there's pros and cons, also making a `MapTypeInfo` for plain 
`Object` seems funny to me. How about we leave it to the future optimization.


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-23 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r183598006
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
 ---
@@ -50,6 +50,155 @@ class OverWindowITCase extends 
StreamingWithStateTestBase {
 (8L, 8, "Hello World"),
 (20L, 20, "Hello World"))
 
+  @Test
+  def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+StreamITCase.clear
+
+val t = StreamTestData.get5TupleDataStream(env)
+  .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+
+val sqlQuery = "SELECT a, " +
+  "  SUM(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW), " +
+  "  MIN(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW), " +
+  "  COLLECT(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW) " +
+  "FROM MyTable"
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List(
+  "1,1,1,{1=1}",
+  "2,2,2,{2=1}",
+  "2,3,1,{1=1, 2=1}",
+  "3,2,2,{2=1}",
+  "3,2,2,{2=1}",
+  "3,5,2,{2=1, 3=1}",
+  "4,2,2,{2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "5,1,1,{1=1}",
+  "5,4,1,{1=1, 3=1}",
+  "5,4,1,{1=1, 3=1}",
+  "5,6,1,{1=1, 2=1, 3=1}",
+  "5,5,2,{2=1, 3=1}")
+assertEquals(expected, StreamITCase.testResults)
+  }
+
+  @Test
+  def testProcTimeDistinctUnboundedPartitionedRowsOver(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+StreamITCase.clear
+
+val t = StreamTestData.get5TupleDataStream(env)
+  .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+
+val sqlQuery = "SELECT a, " +
+  "  COUNT(e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
+  "  SUM(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
+  "  MIN(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding) " +
+  "FROM MyTable"
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List(
+  "1,1,1,1",
+  "2,1,2,2",
+  "2,2,3,1",
+  "3,1,2,2",
+  "3,2,2,2",
+  "3,3,5,2",
+  "4,1,2,2",
+  "4,2,3,1",
+  "4,3,3,1",
+  "4,4,3,1",
+  "5,1,1,1",
+  "5,2,4,1",
+  "5,3,4,1",
+  "5,4,6,1",
+  "5,5,6,1")
+assertEquals(expected, StreamITCase.testResults)
+  }
+
+  @Test
+  def testRowTimeDistinctBoundedNonPartitionedRowsOver(): Unit = {
--- End diff --

This test case was added to cover all over aggregate usage on distinct 
accumulator. 


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-23 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r183222573
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
 ---
@@ -50,6 +51,96 @@ class OverWindowITCase extends 
StreamingWithStateTestBase {
 (8L, 8, "Hello World"),
 (20L, 20, "Hello World"))
 
+  @Test
+  def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = {
--- End diff --

Both case went through: `AggregateExpandDistinctAggregatesRule`, thus 
produce plans that breaks the distinct aggregate into a `GROUP BY` over 
distinct key, and aggregate without distinct. In the 2nd case, 
`LogicalWindowAggregateRule` did not applied due to `distinct` keyword. 

They should both be fixed in FLINK-8690


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182496636
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1439,7 +1467,47 @@ object AggregateUtil {
   }
 }
 
-(aggFieldIndexes, aggregates, accTypes, accSpecs)
+// create distinct accumulator filter argument
+val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
+
+aggregateCalls.zipWithIndex.foreach {
+  case (aggCall, index) =>
+if (aggCall.isDistinct) {
+  val argList: util.List[Integer] = aggCall.getArgList
+  // Only support single argument for distinct operation
+  if (argList.size() > 1) {
+throw TableException(
+  "Cannot apply distinct filter on multiple input argument 
fields at this moment!")
+  }
+  val relDataType = 
aggregateInputType.getFieldList.get(argList.get(0)).getType
+  val fieldIndex = aggFieldIndexes(index)(0)
+  val mapViewTypeInfo = new MapViewTypeInfo(
+FlinkTypeFactory.toTypeInfo(relDataType), 
BasicTypeInfo.INT_TYPE_INFO)
+
+  distinctAggs(index) = Seq(
--- End diff --

I would generate the `MapViewSpec` in the aggregation code generator


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182226785
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1439,7 +1467,47 @@ object AggregateUtil {
   }
 }
 
-(aggFieldIndexes, aggregates, accTypes, accSpecs)
+// create distinct accumulator filter argument
+val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
+
+aggregateCalls.zipWithIndex.foreach {
+  case (aggCall, index) =>
+if (aggCall.isDistinct) {
+  val argList: util.List[Integer] = aggCall.getArgList
+  // Only support single argument for distinct operation
+  if (argList.size() > 1) {
+throw TableException(
+  "Cannot apply distinct filter on multiple input argument 
fields at this moment!")
--- End diff --

Reword error message to `"DISTINCT aggregations with multiple parameters 
not fully supported yet."`.


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182753514
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -151,8 +157,15 @@ class AggregationCodeGenerator(
   }
 }
 
-// initialize and create data views
-addReusableDataViews()
+// get distinct filter of acc fields for each aggregate functions
+val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}"
+val isDistinctAggs = distinctAggs.map(_.nonEmpty)
--- End diff --

Instead we can create the `DataViewSpecs` here.


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182478446
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -93,6 +97,8 @@ class AggregationCodeGenerator(
   aggregates: Array[AggregateFunction[_ <: Any, _ <: Any]],
   aggFields: Array[Array[Int]],
   aggMapping: Array[Int],
+  distinctAggs: Array[Seq[DataViewSpec[_]]],
--- End diff --

I would make this an `Array[Boolean]` and rename to `isDistinctAgg`. We can 
build the `MapViewSpec`s in the method. We have all the information for that in 
the other parameters.



---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182481625
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAccumulator.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util.{Map => JMap}
+import org.apache.flink.table.api.dataview.MapView
+
+/**
+  * The base class for accumulator wrapper when applying distinct 
aggregation.
+  * @param realAcc the actual accumulator which gets invoke after distinct 
filter.
+  * @param mapView the [[MapView]] element used to store the distinct 
filter hash map.
+  * @tparam E the element type for the distinct filter hash map.
+  * @tparam ACC the accumulator type for the realAcc.
+  */
+class DistinctAccumulator[E, ACC](var realAcc: ACC, var mapView: 
MapView[E, Integer]) {
+  def this() {
+this(null.asInstanceOf[ACC], new MapView[E, Integer]())
+  }
+
+  def this(realAcc: ACC) {
+this(realAcc, new MapView[E, Integer]())
+  }
+
+  def getRealAcc: ACC = realAcc
+
+  def canEqual(a: Any): Boolean = a.isInstanceOf[DistinctAccumulator[E, 
ACC]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: DistinctAccumulator[E, ACC] => that.canEqual(this) &&
+this.mapView == that.mapView
+  case _ => false
+}
+
+  def add(element: E): Boolean = {
+if (element != null) {
+  if (mapView.contains(element)) {
--- End diff --

+1


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182479163
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -93,6 +97,8 @@ class AggregationCodeGenerator(
   aggregates: Array[AggregateFunction[_ <: Any, _ <: Any]],
   aggFields: Array[Array[Int]],
   aggMapping: Array[Int],
+  distinctAggs: Array[Seq[DataViewSpec[_]]],
+  isStateBackedDataViews: Boolean,
--- End diff --

We should add a constructor check for `if (partialResults && 
isStateBackedDataViews)` and throw an exception if `true`.  `partialResults` 
means that the `Row` with the accumulators has to be emitted which won't work 
well for state-backed distinct maps that are probably too big.


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182768521
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -327,19 +392,41 @@ class AggregationCodeGenerator(
 for (i <- aggs.indices) yield
 
   if (partialResults) {
-j"""
-   |output.setField(
-   |  ${aggMapping(i)},
-   |  (${accTypes(i)}) accs.getField($i));""".stripMargin
+if (isDistinctAggs(i)) {
+
+  j"""
+ |$distinctAccType distinctAcc$i = ($distinctAccType) 
accs.getField($i);
+ |output.setField(
+ |  ${aggMapping(i)},
+ |  (${accTypes(i)}) 
distinctAcc$i.getRealAcc());""".stripMargin
+} else {
+  j"""
+ |output.setField(
+ |  ${aggMapping(i)},
+ |  (${accTypes(i)}) accs.getField($i));""".stripMargin
+}
   } else {
-j"""
-   |org.apache.flink.table.functions.AggregateFunction 
baseClass$i =
-   |  (org.apache.flink.table.functions.AggregateFunction) 
${aggs(i)};
-   |${accTypes(i)} acc$i = (${accTypes(i)}) 
accs.getField($i);
-   |${genDataViewFieldSetter(s"acc$i", i)}
-   |output.setField(
-   |  ${aggMapping(i)},
-   |  baseClass$i.getValue(acc$i));""".stripMargin
+if (isDistinctAggs(i)) {
+  j"""
+ |org.apache.flink.table.functions.AggregateFunction 
baseClass$i =
+ |  
(org.apache.flink.table.functions.AggregateFunction) ${aggs(i)};
+ |$distinctAccType distinctAcc$i = ($distinctAccType) 
accs.getField($i);
+ |${genDistinctDataViewFieldSetter(s"distinctAcc$i", 
i)}
--- End diff --

we don't need this statement


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182218521
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1439,7 +1467,47 @@ object AggregateUtil {
   }
 }
 
-(aggFieldIndexes, aggregates, accTypes, accSpecs)
+// create distinct accumulator filter argument
+val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
--- End diff --

I would not create the `MapViewSpec`s here but do that in the code gen 
function. Here we should create an `Array[Boolean]` to flag distinct 
aggregations.


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182496465
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1439,7 +1467,47 @@ object AggregateUtil {
   }
 }
 
-(aggFieldIndexes, aggregates, accTypes, accSpecs)
+// create distinct accumulator filter argument
+val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
+
+aggregateCalls.zipWithIndex.foreach {
+  case (aggCall, index) =>
+if (aggCall.isDistinct) {
+  val argList: util.List[Integer] = aggCall.getArgList
+  // Only support single argument for distinct operation
+  if (argList.size() > 1) {
+throw TableException(
+  "Cannot apply distinct filter on multiple input argument 
fields at this moment!")
+  }
+  val relDataType = 
aggregateInputType.getFieldList.get(argList.get(0)).getType
+  val fieldIndex = aggFieldIndexes(index)(0)
+  val mapViewTypeInfo = new MapViewTypeInfo(
+FlinkTypeFactory.toTypeInfo(relDataType), 
BasicTypeInfo.INT_TYPE_INFO)
--- End diff --

Does the approach also work for `null` values in both MapViews? If not, we 
can use a `Row(1)` that serializes a bitmask for `null` values.


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182773359
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -417,13 +530,26 @@ class AggregationCodeGenerator(
 .stripMargin
   val create: String = {
 for (i <- aggs.indices) yield {
-  j"""
- |${accTypes(i)} acc$i = (${accTypes(i)}) 
${aggs(i)}.createAccumulator();
- |${genDataViewFieldSetter(s"acc$i", i)}
- |accs.setField(
- |  $i,
- |  acc$i);"""
-.stripMargin
+  if (isDistinctAggs(i)) {
+j"""
+   |${accTypes(i)} acc$i = (${accTypes(i)}) 
${aggs(i)}.createAccumulator();
+   |$distinctAccType distinctAcc$i = ($distinctAccType) 
new org.apache.flink.table.
+   |functions.aggfunctions.DistinctAccumulator(acc$i);
+   |${genDistinctDataViewFieldSetter(s"distinctAcc$i", i)}
--- End diff --

I think this and the `genAccDataViewFieldSetter` call (both from here and 
the non-distinct case) can be removed.


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182219690
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
 ---
@@ -50,6 +51,96 @@ class OverWindowITCase extends 
StreamingWithStateTestBase {
 (8L, 8, "Hello World"),
 (20L, 20, "Hello World"))
 
+  @Test
+  def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = {
--- End diff --

I tested a few other queries with distinct aggregates. 
* Queries with non-windowed `DISTINCT` aggregations work, but they are they 
are translated without distinct aggregations. So they changes in this PR are 
not used.
* Queries with `DISTINCT` aggregates and `TUMBLE` or `HOP` windows fail 
with strange exceptions. Did not look related to these changes, but would be 
good to check.

We don't have to fix these things in this PR (unless it is broken by these 
changes).

In general, I think it would be good to add unit tests for the 
`AggregationCodeGenerator`. We could generate  `GeneratedAggregations` for 
different configurations, compile them, and check if the result is expected. 
Actually, we should have done that already. This should also work for 
state-backend backed views if we embed the test in a HarnessTest.


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182489141
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1439,7 +1467,47 @@ object AggregateUtil {
   }
 }
 
-(aggFieldIndexes, aggregates, accTypes, accSpecs)
+// create distinct accumulator filter argument
+val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
+
+aggregateCalls.zipWithIndex.foreach {
+  case (aggCall, index) =>
+if (aggCall.isDistinct) {
+  val argList: util.List[Integer] = aggCall.getArgList
+  // Only support single argument for distinct operation
+  if (argList.size() > 1) {
+throw TableException(
+  "Cannot apply distinct filter on multiple input argument 
fields at this moment!")
+  }
+  val relDataType = 
aggregateInputType.getFieldList.get(argList.get(0)).getType
+  val fieldIndex = aggFieldIndexes(index)(0)
+  val mapViewTypeInfo = new MapViewTypeInfo(
+FlinkTypeFactory.toTypeInfo(relDataType), 
BasicTypeInfo.INT_TYPE_INFO)
--- End diff --

+1


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182233134
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1439,7 +1467,47 @@ object AggregateUtil {
   }
 }
 
-(aggFieldIndexes, aggregates, accTypes, accSpecs)
+// create distinct accumulator filter argument
+val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
+
+aggregateCalls.zipWithIndex.foreach {
+  case (aggCall, index) =>
+if (aggCall.isDistinct) {
+  val argList: util.List[Integer] = aggCall.getArgList
--- End diff --

add more comments for the `aggCall.isDistinct` branch


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182753219
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -151,8 +157,15 @@ class AggregationCodeGenerator(
   }
 }
 
-// initialize and create data views
-addReusableDataViews()
+// get distinct filter of acc fields for each aggregate functions
+val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}"
+val isDistinctAggs = distinctAggs.map(_.nonEmpty)
--- End diff --

If we change the input parameter, we have the `Array[Boolean]` already


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182254735
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -327,19 +392,41 @@ class AggregationCodeGenerator(
 for (i <- aggs.indices) yield
 
   if (partialResults) {
-j"""
-   |output.setField(
-   |  ${aggMapping(i)},
-   |  (${accTypes(i)}) accs.getField($i));""".stripMargin
+if (isDistinctAggs(i)) {
+
+  j"""
+ |$distinctAccType distinctAcc$i = ($distinctAccType) 
accs.getField($i);
+ |output.setField(
+ |  ${aggMapping(i)},
+ |  (${accTypes(i)}) 
distinctAcc$i.getRealAcc());""".stripMargin
--- End diff --

We need to forward the distinct maps as well. `partialResults` is used when 
an operator needs to emit partial aggregation results such as a combine 
function in batch execution. So we don't need to distinguish the 
`isDistinctAggs(i)` case here.


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182768726
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -327,19 +392,41 @@ class AggregationCodeGenerator(
 for (i <- aggs.indices) yield
 
   if (partialResults) {
-j"""
-   |output.setField(
-   |  ${aggMapping(i)},
-   |  (${accTypes(i)}) accs.getField($i));""".stripMargin
+if (isDistinctAggs(i)) {
+
+  j"""
+ |$distinctAccType distinctAcc$i = ($distinctAccType) 
accs.getField($i);
+ |output.setField(
+ |  ${aggMapping(i)},
+ |  (${accTypes(i)}) 
distinctAcc$i.getRealAcc());""".stripMargin
+} else {
+  j"""
+ |output.setField(
+ |  ${aggMapping(i)},
+ |  (${accTypes(i)}) accs.getField($i));""".stripMargin
+}
   } else {
-j"""
-   |org.apache.flink.table.functions.AggregateFunction 
baseClass$i =
-   |  (org.apache.flink.table.functions.AggregateFunction) 
${aggs(i)};
-   |${accTypes(i)} acc$i = (${accTypes(i)}) 
accs.getField($i);
-   |${genDataViewFieldSetter(s"acc$i", i)}
-   |output.setField(
-   |  ${aggMapping(i)},
-   |  baseClass$i.getValue(acc$i));""".stripMargin
+if (isDistinctAggs(i)) {
+  j"""
+ |org.apache.flink.table.functions.AggregateFunction 
baseClass$i =
+ |  
(org.apache.flink.table.functions.AggregateFunction) ${aggs(i)};
+ |$distinctAccType distinctAcc$i = ($distinctAccType) 
accs.getField($i);
+ |${genDistinctDataViewFieldSetter(s"distinctAcc$i", 
i)}
+ |${accTypes(i)} acc$i = (${accTypes(i)}) 
distinctAcc$i.getRealAcc();
+ |${genAccDataViewFieldSetter(s"acc$i", i)}
+ |output.setField(
+ |  ${aggMapping(i)},
+ |  baseClass$i.getValue(acc$i));""".stripMargin
+} else {
--- End diff --

both cases share a lot of code. We could only retrieve `acc$i` differently.
Does that make sense or fragment the code too much?
Same would apply for `accumulate()` and `retract()`.


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182469110
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala
 ---
@@ -70,7 +70,12 @@ trait OverAggregate {
 
 val aggStrings = namedAggregates.map(_.getKey).map(
   a => s"${a.getAggregation}(${
-if (a.getArgList.size() > 0) {
+val prefix = if (a.isDistinct) {
--- End diff --

In case we also want to support group-windowed DISTINCT aggregates, we 
would need to change `CommonAggregate.aggregationToString()` as well. 


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-03-10 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r173644059
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAccumulator.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util.{Map => JMap}
+import org.apache.flink.table.api.dataview.MapView
+
+/**
+  * The base class for accumulator wrapper when applying distinct 
aggregation.
+  * @param realAcc the actual accumulator which gets invoke after distinct 
filter.
+  * @param mapView the [[MapView]] element used to store the distinct 
filter hash map.
+  * @tparam E the element type for the distinct filter hash map.
+  * @tparam ACC the accumulator type for the realAcc.
+  */
+class DistinctAccumulator[E, ACC](var realAcc: ACC, var mapView: 
MapView[E, Integer]) {
+  def this() {
+this(null.asInstanceOf[ACC], new MapView[E, Integer]())
+  }
+
+  def this(realAcc: ACC) {
+this(realAcc, new MapView[E, Integer]())
+  }
+
+  def getRealAcc: ACC = realAcc
+
+  def canEqual(a: Any): Boolean = a.isInstanceOf[DistinctAccumulator[E, 
ACC]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: DistinctAccumulator[E, ACC] => that.canEqual(this) &&
+this.mapView == that.mapView
+  case _ => false
+}
+
+  def add(element: E): Boolean = {
+if (element != null) {
+  if (mapView.contains(element)) {
--- End diff --

Use mapView.get() directly and reuse the result. This can avoid to get 
state twice.


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-03-10 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r173644062
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1439,7 +1467,47 @@ object AggregateUtil {
   }
 }
 
-(aggFieldIndexes, aggregates, accTypes, accSpecs)
+// create distinct accumulator filter argument
+val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
+
+aggregateCalls.zipWithIndex.foreach {
+  case (aggCall, index) =>
+if (aggCall.isDistinct) {
+  val argList: util.List[Integer] = aggCall.getArgList
+  // Only support single argument for distinct operation
+  if (argList.size() > 1) {
+throw TableException(
+  "Cannot apply distinct filter on multiple input argument 
fields at this moment!")
+  }
+  val relDataType = 
aggregateInputType.getFieldList.get(argList.get(0)).getType
+  val fieldIndex = aggFieldIndexes(index)(0)
+  val mapViewTypeInfo = new MapViewTypeInfo(
+FlinkTypeFactory.toTypeInfo(relDataType), 
BasicTypeInfo.INT_TYPE_INFO)
--- End diff --

LONG_TYPE_INFO is more safe? Int overflow will be easily reached given 1w 
records are processed per second. 


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-02-26 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r170828036
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1393,6 +1393,21 @@ object AggregateUtil {
 throw new TableException(s"unsupported Function: 
'${unSupported.getName}'")
 }
   }
+
+  // create distinct accumulator delegate
+  if (aggregateCall.isDistinct) {
--- End diff --

Sql will be verified by calcite to exclude single DISTINCT during sql parse 
phase, so maybe we don't have to consider single distinct here.


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-02-26 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r170828027
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAggDelegateFunction.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo}
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+class DistinctAccumulator[E, ACC] (var mapView: MapView[E, Integer], var 
realAcc: ACC) {
+  def this() {
+this(null, null.asInstanceOf[ACC])
+  }
+
+  def getRealAcc: ACC = realAcc
+
+  def canEqual(a: Any): Boolean = a.isInstanceOf[DistinctAccumulator[E, 
ACC]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: DistinctAccumulator[E, ACC] => that.canEqual(this) &&
+this.mapView == that.mapView
+  case _ => false
+}
+
+}
+
+class DistinctAggDelegateFunction[E, ACC](elementTypeInfo: 
TypeInformation[_],
+  var realAgg: 
AggregateFunction[_, ACC])
+  extends AggregateFunction[util.Map[E, Integer], DistinctAccumulator[E, 
ACC]] {
+
+  def getRealAgg: AggregateFunction[_, ACC] = realAgg
+
+  override def createAccumulator(): DistinctAccumulator[E, ACC] = {
+new DistinctAccumulator[E, ACC](
+  new MapView[E, Integer](
+elementTypeInfo.asInstanceOf[TypeInformation[E]],
+BasicTypeInfo.INT_TYPE_INFO),
+  realAgg.createAccumulator())
+  }
+
+  def accumulate(acc: DistinctAccumulator[E, ACC], element: E): Boolean = {
+if (element != null) {
+  if (acc.mapView.contains(element)) {
+acc.mapView.put(element, acc.mapView.get(element) + 1)
+false
+  } else {
+acc.mapView.put(element, 1)
+true
+  }
+} else {
+  false
+}
+  }
+
+  def accumulate(acc: DistinctAccumulator[E, ACC], element: E, count: 
Int): Boolean = {
+if (element != null) {
+  if (acc.mapView.contains(element)) {
+acc.mapView.put(element, acc.mapView.get(element) + count)
+false
+  } else {
+acc.mapView.put(element, count)
+true
+  }
+} else {
+  false
+}
+  }
+
+  def retract(acc: DistinctAccumulator[E, ACC], element: E): Boolean = {
+if (element != null) {
+  val count = acc.mapView.get(element)
+  if (count == 1) {
+acc.mapView.remove(element)
+true
+  } else {
+acc.mapView.put(element, count - 1)
+false
+  }
+} else {
+  false
+}
+  }
+
+  def resetAccumulator(acc: DistinctAccumulator[E, ACC]): Unit = {
+acc.mapView.clear()
+  }
+
+  override def getValue(acc: DistinctAccumulator[E, ACC]): util.Map[E, 
Integer] = {
+acc.mapView.map
+  }
+
+  override def getAccumulatorType: TypeInformation[DistinctAccumulator[E, 
ACC]] = {
+val clazz = classOf[DistinctAccumulator[E, ACC]]
+val pojoFields = new util.ArrayList[PojoField]
+pojoFields.add(new PojoField(clazz.getDeclaredField("mapView"),
+  new MapViewTypeInfo[E, Integer](
+elementTypeInfo.asInstanceOf[TypeInformation[E]], 
BasicTypeInfo.INT_TYPE_INFO)))
+pojoFields.add(new PojoField(clazz.getDeclaredField("realAcc"),
+  realAgg.getAccumulatorType))
+new PojoTypeInfo[DistinctAccumulator[E, ACC]](clazz, pojoFields)
--- End diff --

Make sense.


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-02-26 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r170602449
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAggDelegateFunction.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo}
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+class DistinctAccumulator[E, ACC] (var mapView: MapView[E, Integer], var 
realAcc: ACC) {
+  def this() {
+this(null, null.asInstanceOf[ACC])
+  }
+
+  def getRealAcc: ACC = realAcc
+
+  def canEqual(a: Any): Boolean = a.isInstanceOf[DistinctAccumulator[E, 
ACC]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: DistinctAccumulator[E, ACC] => that.canEqual(this) &&
+this.mapView == that.mapView
+  case _ => false
+}
+
+}
+
+class DistinctAggDelegateFunction[E, ACC](elementTypeInfo: 
TypeInformation[_],
+  var realAgg: 
AggregateFunction[_, ACC])
+  extends AggregateFunction[util.Map[E, Integer], DistinctAccumulator[E, 
ACC]] {
+
+  def getRealAgg: AggregateFunction[_, ACC] = realAgg
+
+  override def createAccumulator(): DistinctAccumulator[E, ACC] = {
+new DistinctAccumulator[E, ACC](
+  new MapView[E, Integer](
+elementTypeInfo.asInstanceOf[TypeInformation[E]],
+BasicTypeInfo.INT_TYPE_INFO),
+  realAgg.createAccumulator())
+  }
+
+  def accumulate(acc: DistinctAccumulator[E, ACC], element: E): Boolean = {
+if (element != null) {
+  if (acc.mapView.contains(element)) {
+acc.mapView.put(element, acc.mapView.get(element) + 1)
+false
+  } else {
+acc.mapView.put(element, 1)
+true
+  }
+} else {
+  false
+}
+  }
+
+  def accumulate(acc: DistinctAccumulator[E, ACC], element: E, count: 
Int): Boolean = {
+if (element != null) {
+  if (acc.mapView.contains(element)) {
+acc.mapView.put(element, acc.mapView.get(element) + count)
+false
+  } else {
+acc.mapView.put(element, count)
+true
+  }
+} else {
+  false
+}
+  }
+
+  def retract(acc: DistinctAccumulator[E, ACC], element: E): Boolean = {
+if (element != null) {
+  val count = acc.mapView.get(element)
+  if (count == 1) {
+acc.mapView.remove(element)
+true
+  } else {
+acc.mapView.put(element, count - 1)
+false
+  }
+} else {
+  false
+}
+  }
+
+  def resetAccumulator(acc: DistinctAccumulator[E, ACC]): Unit = {
+acc.mapView.clear()
+  }
+
+  override def getValue(acc: DistinctAccumulator[E, ACC]): util.Map[E, 
Integer] = {
+acc.mapView.map
+  }
+
+  override def getAccumulatorType: TypeInformation[DistinctAccumulator[E, 
ACC]] = {
+val clazz = classOf[DistinctAccumulator[E, ACC]]
+val pojoFields = new util.ArrayList[PojoField]
+pojoFields.add(new PojoField(clazz.getDeclaredField("mapView"),
+  new MapViewTypeInfo[E, Integer](
+elementTypeInfo.asInstanceOf[TypeInformation[E]], 
BasicTypeInfo.INT_TYPE_INFO)))
+pojoFields.add(new PojoField(clazz.getDeclaredField("realAcc"),
+  realAgg.getAccumulatorType))
--- End diff --

Good catch! 


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-02-26 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r170603205
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAggDelegateFunction.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo}
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+class DistinctAccumulator[E, ACC] (var mapView: MapView[E, Integer], var 
realAcc: ACC) {
+  def this() {
+this(null, null.asInstanceOf[ACC])
+  }
+
+  def getRealAcc: ACC = realAcc
+
+  def canEqual(a: Any): Boolean = a.isInstanceOf[DistinctAccumulator[E, 
ACC]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: DistinctAccumulator[E, ACC] => that.canEqual(this) &&
+this.mapView == that.mapView
+  case _ => false
+}
+
+}
+
+class DistinctAggDelegateFunction[E, ACC](elementTypeInfo: 
TypeInformation[_],
+  var realAgg: 
AggregateFunction[_, ACC])
+  extends AggregateFunction[util.Map[E, Integer], DistinctAccumulator[E, 
ACC]] {
+
+  def getRealAgg: AggregateFunction[_, ACC] = realAgg
+
+  override def createAccumulator(): DistinctAccumulator[E, ACC] = {
+new DistinctAccumulator[E, ACC](
+  new MapView[E, Integer](
+elementTypeInfo.asInstanceOf[TypeInformation[E]],
+BasicTypeInfo.INT_TYPE_INFO),
+  realAgg.createAccumulator())
+  }
+
+  def accumulate(acc: DistinctAccumulator[E, ACC], element: E): Boolean = {
+if (element != null) {
+  if (acc.mapView.contains(element)) {
+acc.mapView.put(element, acc.mapView.get(element) + 1)
+false
+  } else {
+acc.mapView.put(element, 1)
+true
+  }
+} else {
+  false
+}
+  }
+
+  def accumulate(acc: DistinctAccumulator[E, ACC], element: E, count: 
Int): Boolean = {
+if (element != null) {
+  if (acc.mapView.contains(element)) {
+acc.mapView.put(element, acc.mapView.get(element) + count)
+false
+  } else {
+acc.mapView.put(element, count)
+true
+  }
+} else {
+  false
+}
+  }
+
+  def retract(acc: DistinctAccumulator[E, ACC], element: E): Boolean = {
+if (element != null) {
+  val count = acc.mapView.get(element)
+  if (count == 1) {
+acc.mapView.remove(element)
+true
+  } else {
+acc.mapView.put(element, count - 1)
+false
+  }
+} else {
+  false
+}
+  }
+
+  def resetAccumulator(acc: DistinctAccumulator[E, ACC]): Unit = {
+acc.mapView.clear()
+  }
+
+  override def getValue(acc: DistinctAccumulator[E, ACC]): util.Map[E, 
Integer] = {
+acc.mapView.map
+  }
+
+  override def getAccumulatorType: TypeInformation[DistinctAccumulator[E, 
ACC]] = {
+val clazz = classOf[DistinctAccumulator[E, ACC]]
+val pojoFields = new util.ArrayList[PojoField]
+pojoFields.add(new PojoField(clazz.getDeclaredField("mapView"),
+  new MapViewTypeInfo[E, Integer](
+elementTypeInfo.asInstanceOf[TypeInformation[E]], 
BasicTypeInfo.INT_TYPE_INFO)))
+pojoFields.add(new PojoField(clazz.getDeclaredField("realAcc"),
+  realAgg.getAccumulatorType))
+new PojoTypeInfo[DistinctAccumulator[E, ACC]](clazz, pojoFields)
--- End diff --

Yes, I was actually aware of that as one of the limitation of 

[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-02-26 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r170605004
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
 ---
@@ -50,6 +50,80 @@ class OverWindowITCase extends 
StreamingWithStateTestBase {
 (8L, 8, "Hello World"),
 (20L, 20, "Hello World"))
 
+  @Test
+  def testUnboundedDistinctGroupWindow(): Unit = {
--- End diff --

oops.. this was suppose to be added later in FLINK-8690 :-P


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-02-26 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r170604304
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1393,6 +1393,21 @@ object AggregateUtil {
 throw new TableException(s"unsupported Function: 
'${unSupported.getName}'")
 }
   }
+
+  // create distinct accumulator delegate
+  if (aggregateCall.isDistinct) {
--- End diff --

Good point. Actually there's a very interesting question I've been thinking 
- This AggFunction literally is the distinct version of the  
`CollectAggFunction`, however it cannot exist by itself unless chained with 
other `realAgg` functions --> this means we will need to chain 
`DistinctAggFunction` with `CollectAggFunction` which just dont make sense. I 
will try some other approach and see if they work.


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-02-26 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r170577231
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -105,11 +106,32 @@ class AggregationCodeGenerator(
 
 // get unique function name
 val funcName = newName(name)
+
+// get distinct filter of acc fields for each aggregate functions
+val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}"
+val distinctAggType = s"${classOf[DistinctAggDelegateFunction[_, 
_]].getName}"
+val isDistinctAcc = 
aggregates.map(_.isInstanceOf[DistinctAggDelegateFunction[_, _]])
+
 // register UDAGGs
 val aggs = aggregates.map(a => addReusableFunction(a, contextTerm))
 
+// register real aggregate functions without distinct delegate
+val realAggregates: Array[AggregateFunction[_ <: Any, _ <: Any]] = 
aggregates.map {
+  case distinctAggDelegate: DistinctAggDelegateFunction[_, _] =>
+distinctAggDelegate.realAgg
+  case agg: AggregateFunction[_, _] =>
+agg
+}
+
+val realAggTypes = aggregates.map {
--- End diff --

Can be replaced by `realAggregates.map(_.getClass.getName)`


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-02-26 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r170577575
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAggDelegateFunction.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo}
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+class DistinctAccumulator[E, ACC] (var mapView: MapView[E, Integer], var 
realAcc: ACC) {
+  def this() {
+this(null, null.asInstanceOf[ACC])
+  }
+
+  def getRealAcc: ACC = realAcc
+
+  def canEqual(a: Any): Boolean = a.isInstanceOf[DistinctAccumulator[E, 
ACC]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: DistinctAccumulator[E, ACC] => that.canEqual(this) &&
+this.mapView == that.mapView
+  case _ => false
+}
+
+}
+
+class DistinctAggDelegateFunction[E, ACC](elementTypeInfo: 
TypeInformation[_],
+  var realAgg: 
AggregateFunction[_, ACC])
+  extends AggregateFunction[util.Map[E, Integer], DistinctAccumulator[E, 
ACC]] {
+
+  def getRealAgg: AggregateFunction[_, ACC] = realAgg
+
+  override def createAccumulator(): DistinctAccumulator[E, ACC] = {
+new DistinctAccumulator[E, ACC](
+  new MapView[E, Integer](
+elementTypeInfo.asInstanceOf[TypeInformation[E]],
+BasicTypeInfo.INT_TYPE_INFO),
+  realAgg.createAccumulator())
+  }
+
+  def accumulate(acc: DistinctAccumulator[E, ACC], element: E): Boolean = {
+if (element != null) {
+  if (acc.mapView.contains(element)) {
+acc.mapView.put(element, acc.mapView.get(element) + 1)
+false
+  } else {
+acc.mapView.put(element, 1)
+true
+  }
+} else {
+  false
+}
+  }
+
+  def accumulate(acc: DistinctAccumulator[E, ACC], element: E, count: 
Int): Boolean = {
+if (element != null) {
+  if (acc.mapView.contains(element)) {
+acc.mapView.put(element, acc.mapView.get(element) + count)
+false
+  } else {
+acc.mapView.put(element, count)
+true
+  }
+} else {
+  false
+}
+  }
+
+  def retract(acc: DistinctAccumulator[E, ACC], element: E): Boolean = {
+if (element != null) {
+  val count = acc.mapView.get(element)
+  if (count == 1) {
+acc.mapView.remove(element)
+true
+  } else {
+acc.mapView.put(element, count - 1)
+false
+  }
+} else {
+  false
+}
+  }
+
+  def resetAccumulator(acc: DistinctAccumulator[E, ACC]): Unit = {
+acc.mapView.clear()
+  }
+
+  override def getValue(acc: DistinctAccumulator[E, ACC]): util.Map[E, 
Integer] = {
+acc.mapView.map
+  }
+
+  override def getAccumulatorType: TypeInformation[DistinctAccumulator[E, 
ACC]] = {
+val clazz = classOf[DistinctAccumulator[E, ACC]]
+val pojoFields = new util.ArrayList[PojoField]
+pojoFields.add(new PojoField(clazz.getDeclaredField("mapView"),
+  new MapViewTypeInfo[E, Integer](
+elementTypeInfo.asInstanceOf[TypeInformation[E]], 
BasicTypeInfo.INT_TYPE_INFO)))
+pojoFields.add(new PojoField(clazz.getDeclaredField("realAcc"),
+  realAgg.getAccumulatorType))
+new PojoTypeInfo[DistinctAccumulator[E, ACC]](clazz, pojoFields)
--- End diff --

Currently, `MapView` and `ListView` only supported at first 

[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-02-26 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r170577434
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAggDelegateFunction.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo}
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+class DistinctAccumulator[E, ACC] (var mapView: MapView[E, Integer], var 
realAcc: ACC) {
+  def this() {
+this(null, null.asInstanceOf[ACC])
+  }
+
+  def getRealAcc: ACC = realAcc
+
+  def canEqual(a: Any): Boolean = a.isInstanceOf[DistinctAccumulator[E, 
ACC]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: DistinctAccumulator[E, ACC] => that.canEqual(this) &&
+this.mapView == that.mapView
+  case _ => false
+}
+
+}
+
+class DistinctAggDelegateFunction[E, ACC](elementTypeInfo: 
TypeInformation[_],
+  var realAgg: 
AggregateFunction[_, ACC])
+  extends AggregateFunction[util.Map[E, Integer], DistinctAccumulator[E, 
ACC]] {
+
+  def getRealAgg: AggregateFunction[_, ACC] = realAgg
+
+  override def createAccumulator(): DistinctAccumulator[E, ACC] = {
+new DistinctAccumulator[E, ACC](
+  new MapView[E, Integer](
+elementTypeInfo.asInstanceOf[TypeInformation[E]],
+BasicTypeInfo.INT_TYPE_INFO),
+  realAgg.createAccumulator())
+  }
+
+  def accumulate(acc: DistinctAccumulator[E, ACC], element: E): Boolean = {
+if (element != null) {
+  if (acc.mapView.contains(element)) {
+acc.mapView.put(element, acc.mapView.get(element) + 1)
+false
+  } else {
+acc.mapView.put(element, 1)
+true
+  }
+} else {
+  false
+}
+  }
+
+  def accumulate(acc: DistinctAccumulator[E, ACC], element: E, count: 
Int): Boolean = {
--- End diff --

Add a corresponding retract function.


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-02-26 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r170577325
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAggDelegateFunction.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo}
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
--- End diff --

Add comments for this class and functions.


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-02-26 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r170577531
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAggDelegateFunction.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo}
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+class DistinctAccumulator[E, ACC] (var mapView: MapView[E, Integer], var 
realAcc: ACC) {
+  def this() {
+this(null, null.asInstanceOf[ACC])
+  }
+
+  def getRealAcc: ACC = realAcc
+
+  def canEqual(a: Any): Boolean = a.isInstanceOf[DistinctAccumulator[E, 
ACC]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: DistinctAccumulator[E, ACC] => that.canEqual(this) &&
+this.mapView == that.mapView
+  case _ => false
+}
+
+}
+
+class DistinctAggDelegateFunction[E, ACC](elementTypeInfo: 
TypeInformation[_],
+  var realAgg: 
AggregateFunction[_, ACC])
+  extends AggregateFunction[util.Map[E, Integer], DistinctAccumulator[E, 
ACC]] {
+
+  def getRealAgg: AggregateFunction[_, ACC] = realAgg
+
+  override def createAccumulator(): DistinctAccumulator[E, ACC] = {
+new DistinctAccumulator[E, ACC](
+  new MapView[E, Integer](
+elementTypeInfo.asInstanceOf[TypeInformation[E]],
+BasicTypeInfo.INT_TYPE_INFO),
+  realAgg.createAccumulator())
+  }
+
+  def accumulate(acc: DistinctAccumulator[E, ACC], element: E): Boolean = {
+if (element != null) {
+  if (acc.mapView.contains(element)) {
+acc.mapView.put(element, acc.mapView.get(element) + 1)
+false
+  } else {
+acc.mapView.put(element, 1)
+true
+  }
+} else {
+  false
+}
+  }
+
+  def accumulate(acc: DistinctAccumulator[E, ACC], element: E, count: 
Int): Boolean = {
+if (element != null) {
+  if (acc.mapView.contains(element)) {
+acc.mapView.put(element, acc.mapView.get(element) + count)
+false
+  } else {
+acc.mapView.put(element, count)
+true
+  }
+} else {
+  false
+}
+  }
+
+  def retract(acc: DistinctAccumulator[E, ACC], element: E): Boolean = {
+if (element != null) {
+  val count = acc.mapView.get(element)
+  if (count == 1) {
+acc.mapView.remove(element)
+true
+  } else {
+acc.mapView.put(element, count - 1)
+false
+  }
+} else {
+  false
+}
+  }
+
+  def resetAccumulator(acc: DistinctAccumulator[E, ACC]): Unit = {
+acc.mapView.clear()
+  }
+
+  override def getValue(acc: DistinctAccumulator[E, ACC]): util.Map[E, 
Integer] = {
+acc.mapView.map
+  }
+
+  override def getAccumulatorType: TypeInformation[DistinctAccumulator[E, 
ACC]] = {
+val clazz = classOf[DistinctAccumulator[E, ACC]]
+val pojoFields = new util.ArrayList[PojoField]
+pojoFields.add(new PojoField(clazz.getDeclaredField("mapView"),
+  new MapViewTypeInfo[E, Integer](
+elementTypeInfo.asInstanceOf[TypeInformation[E]], 
BasicTypeInfo.INT_TYPE_INFO)))
+pojoFields.add(new PojoField(clazz.getDeclaredField("realAcc"),
+  realAgg.getAccumulatorType))
--- End diff --

`getAccumulatorType ` may return null if has not been overrided. In this 
case, accumulator type should be inferred. see 

[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-02-26 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r170577418
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAggDelegateFunction.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo}
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+class DistinctAccumulator[E, ACC] (var mapView: MapView[E, Integer], var 
realAcc: ACC) {
--- End diff --

Remove blanks between `[E, ACC]` and `(var mapView`


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-02-26 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r170577818
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
 ---
@@ -50,6 +50,80 @@ class OverWindowITCase extends 
StreamingWithStateTestBase {
 (8L, 8, "Hello World"),
 (20L, 20, "Hello World"))
 
+  @Test
+  def testUnboundedDistinctGroupWindow(): Unit = {
--- End diff --

Remove this test case. The distinct in this case is solved by 
`AggregateExpandDistinctAggregatesRule`


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-02-26 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r170577731
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1393,6 +1393,21 @@ object AggregateUtil {
 throw new TableException(s"unsupported Function: 
'${unSupported.getName}'")
 }
   }
+
+  // create distinct accumulator delegate
+  if (aggregateCall.isDistinct) {
--- End diff --

It is better to move this logic into the upper match. Distinct is also a 
kind of aggregate whose fields should not be empty. Also, we can reuse some 
variables. What do you think? 


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-02-21 Thread walterddr
GitHub user walterddr opened a pull request:

https://github.com/apache/flink/pull/

[FLINK-8689][table]Add runtime support of distinct filter using MapView for 
codegen


## What is the purpose of the change

- Adding in runtime support using distinct aggregation delegate to support 
distinct filtering using MapView. This is only fixing the currently broken 
over-window aggregate with distinct filter, however this change is meant to be 
used by other distinct operations on datastream such as group window aggregate, 
group aggregate, etc.


## Brief change log

  - Adding a new DistinctAggDelegateFunction and DistinctAccumulator that 
encapsulates any real aggregate function.
  - change codegen to specifically generate distinct filter before invoking 
the actual aggregate function.
  - adding in more codegen specifically for using delegates on merge and 
reset.

## Verifying this change

This change added tests and can be verified as follows:

  - Added over-window unit-test to verify generated plan before codegen.
  - Added integration tests for testing distinct over-window aggregate 
end-to-end.

## Does this pull request potentially affect one of the following parts:

no

## Documentation

this does not expose and additional external facing functionality, which 
will come in separated PR.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/walterddr/flink FLINK-8689

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #


commit cb853e6d1adfbb04819a234f8fbced32a8ffb21f
Author: Rong Rong 
Date:   2018-02-21T21:40:29Z

support distinct agg function delegate using mapview accumulator in runtime 
environment

commit fff7fa48f07b350f6fd4565f58d6a35af88faa50
Author: Rong Rong 
Date:   2018-02-21T21:43:02Z

add over window aggregate and unbounded aggregate ITCase

commit 03fc641ac01af6867d2516263bcea0ac96f1802c
Author: Rong Rong 
Date:   2018-02-22T05:15:26Z

adding in overwindow logical test cases to verify distinct modifier




---