[GitHub] walterddr commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream

2018-09-03 Thread GitBox
walterddr commented on a change in pull request #6521: [FLINK-5315][table] 
Adding support for distinct operation for table API on DataStream
URL: https://github.com/apache/flink/pull/6521#discussion_r214744230
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ##
 @@ -214,10 +214,25 @@ trait ImplicitExpressionOperations {
   def varSamp = VarSamp(expr)
 
   /**
-*  Returns multiset aggregate of a given expression.
+* Returns multiset aggregate of a given expression.
 */
   def collect = Collect(expr)
 
+  /**
+* Returns a distinct field reference to a given expression
+*/
+  def distinct: Expression = {
 
 Review comment:
   Ahh. that makes perfect sense. Sorry for misunderstanding the comment. I 
will make the change. This is in fact much more elegant!!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] walterddr commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream

2018-08-31 Thread GitBox
walterddr commented on a change in pull request #6521: [FLINK-5315][table] 
Adding support for distinct operation for table API on DataStream
URL: https://github.com/apache/flink/pull/6521#discussion_r214387582
 
 

 ##
 File path: docs/dev/table/tableApi.md
 ##
 @@ -381,6 +381,36 @@ Table result = orders
 {% highlight java %}
 Table orders = tableEnv.scan("Orders");
 Table result = orders.distinct();
+{% endhighlight %}
+Note: For streaming queries the required state to compute 
the query result might grow infinitely depending on the number of distinct 
fields. Please provide a query configuration with valid retention interval to 
prevent excessive state size. See Streaming 
Concepts for details.
+  
+
+
+  
+Distinct Aggregation
+Streaming
 
 Review comment:
   I agree, I added the labels. Regarding adding the sections towards each 
individual `Aggregation` I wasn't able to find a clean construct since some of 
the discussions (UDAGG, built-in) are general and it's pretty messy to 
replicate those 3 different ways. 
   
   I regenerated the page and it looks pretty obvious since it is within the 
`aggregation` tab and all necessary information (such as Over aggregate only 
applies to stream) is pretty much in the same place.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] walterddr commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream

2018-08-31 Thread GitBox
walterddr commented on a change in pull request #6521: [FLINK-5315][table] 
Adding support for distinct operation for table API on DataStream
URL: https://github.com/apache/flink/pull/6521#discussion_r214386870
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala
 ##
 @@ -40,6 +40,84 @@ class AggregateITCase extends StreamingWithStateTestBase {
   private val queryConfig = new StreamQueryConfig()
   queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))
 
+  @Test
+  def testDistinctUDAGG(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.clear
+
+val testAgg = new DataViewTestAgg
+val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 
'd, 'e)
+  .groupBy('e)
+  .select('e, testAgg.distinct('d, 'e))
+
+val results = t.toRetractStream[Row](queryConfig)
+results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
+env.execute()
+
+val expected = mutable.MutableList("1,10", "2,21", "3,12")
+assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testDistinctUDAGGMixedWithNonDistinctUsage(): Unit = {
 
 Review comment:
   I original discover the distinct modifier bug using this test. Can we still 
keep this. I found some time mixed test cases can expose potentially hard to 
find bugs. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] walterddr commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream

2018-08-31 Thread GitBox
walterddr commented on a change in pull request #6521: [FLINK-5315][table] 
Adding support for distinct operation for table API on DataStream
URL: https://github.com/apache/flink/pull/6521#discussion_r214386474
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ##
 @@ -214,10 +214,15 @@ trait ImplicitExpressionOperations {
   def varSamp = VarSamp(expr)
 
   /**
-*  Returns multiset aggregate of a given expression.
+* Returns multiset aggregate of a given expression.
 */
   def collect = Collect(expr)
 
+  /**
+* Returns a distinct field reference to a given expression
+*/
+  def distinct = DistinctAgg(expr)
 
 Review comment:
   I was not sure if this is what you had in mind. please take another look. thx


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] walterddr commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream

2018-08-31 Thread GitBox
walterddr commented on a change in pull request #6521: [FLINK-5315][table] 
Adding support for distinct operation for table API on DataStream
URL: https://github.com/apache/flink/pull/6521#discussion_r214386228
 
 

 ##
 File path: docs/dev/table/udfs.md
 ##
 @@ -650,6 +650,36 @@ tEnv.sqlQuery("SELECT user, wAvg(points, level) AS 
avgPoints FROM userScores GRO
 
 
 
+User-defined aggregation function can be used with `distinct` modifiers. To 
calculate the aggregate results only for distinct values, simply add the 
distinct modifier towards the aggregation function.
 
 Review comment:
   Done. yes I agree. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] walterddr commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream

2018-08-10 Thread GitBox
walterddr commented on a change in pull request #6521: [FLINK-5315][table] 
Adding support for distinct operation for table API on DataStream
URL: https://github.com/apache/flink/pull/6521#discussion_r209313630
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
 ##
 @@ -136,4 +136,11 @@ abstract class AggregateFunction[T, ACC] extends 
UserDefinedFunction {
 * accumulator type should be automatically inferred.
 */
   def getAccumulatorType: TypeInformation[ACC] = null
+
+  private[flink] var isDistinctAgg: Boolean = false
+
+  private[flink] def distinct: AggregateFunction[T, ACC] = {
 
 Review comment:
   This is not going to work as it modifies an underlying field of this 
particular AggregateFunction object. 
   For example: 
   ```
   table.select(udagg.distinct('a), udagg('a))
   ```
   will return the same result in both column because distinct modifier has 
been added to this particular `udagg` element. This is a blunder on my end and 
I should fixed this before further reviews can be conducted. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] walterddr commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream

2018-08-08 Thread GitBox
walterddr commented on a change in pull request #6521: [FLINK-5315][table] 
Adding support for distinct operation for table API on DataStream
URL: https://github.com/apache/flink/pull/6521#discussion_r208624881
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ##
 @@ -214,10 +214,16 @@ trait ImplicitExpressionOperations {
   def varSamp = VarSamp(expr)
 
   /**
-*  Returns multiset aggregate of a given expression.
+* Returns multiset aggregate of a given expression.
 */
   def collect = Collect(expr)
 
+  /**
+* Return a distinct field reference to a given expression
+* @return
+*/
+  def distinct = DistinctAgg(expr)
 
 Review comment:
   This is the particular places I would like to have 2nd opinion on. I thought 
of 2 ways of doing this: 
   1. the way I did, adding distinct in `ExpressionDsl.scala`, and treate 
`distinct` as a special Aggregation; separately having a `distinct` method in 
`AggregationFunction` class for UDAGG.
   2. adding distinct into `ExpressionParser.scala` and create special prefix / 
suffix rules to handle distinct modifier with different expression converter.
   
   I prefer the 1st way, since the 2nd way requires unifying the parsing for 
both `distinct` in agg and `distinct` in table. But would like to more opinions.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services