[GitHub] walterddr commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream
walterddr commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream URL: https://github.com/apache/flink/pull/6521#discussion_r214744230 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala ## @@ -214,10 +214,25 @@ trait ImplicitExpressionOperations { def varSamp = VarSamp(expr) /** -* Returns multiset aggregate of a given expression. +* Returns multiset aggregate of a given expression. */ def collect = Collect(expr) + /** +* Returns a distinct field reference to a given expression +*/ + def distinct: Expression = { Review comment: Ahh. that makes perfect sense. Sorry for misunderstanding the comment. I will make the change. This is in fact much more elegant!! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] walterddr commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream
walterddr commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream URL: https://github.com/apache/flink/pull/6521#discussion_r214387582 ## File path: docs/dev/table/tableApi.md ## @@ -381,6 +381,36 @@ Table result = orders {% highlight java %} Table orders = tableEnv.scan("Orders"); Table result = orders.distinct(); +{% endhighlight %} +Note: For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. + + + + +Distinct Aggregation +Streaming Review comment: I agree, I added the labels. Regarding adding the sections towards each individual `Aggregation` I wasn't able to find a clean construct since some of the discussions (UDAGG, built-in) are general and it's pretty messy to replicate those 3 different ways. I regenerated the page and it looks pretty obvious since it is within the `aggregation` tab and all necessary information (such as Over aggregate only applies to stream) is pretty much in the same place. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] walterddr commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream
walterddr commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream URL: https://github.com/apache/flink/pull/6521#discussion_r214386870 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala ## @@ -40,6 +40,84 @@ class AggregateITCase extends StreamingWithStateTestBase { private val queryConfig = new StreamQueryConfig() queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2)) + @Test + def testDistinctUDAGG(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStateBackend(getStateBackend) +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.clear + +val testAgg = new DataViewTestAgg +val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) + .groupBy('e) + .select('e, testAgg.distinct('d, 'e)) + +val results = t.toRetractStream[Row](queryConfig) +results.addSink(new StreamITCase.RetractingSink).setParallelism(1) +env.execute() + +val expected = mutable.MutableList("1,10", "2,21", "3,12") +assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) + } + + @Test + def testDistinctUDAGGMixedWithNonDistinctUsage(): Unit = { Review comment: I original discover the distinct modifier bug using this test. Can we still keep this. I found some time mixed test cases can expose potentially hard to find bugs. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] walterddr commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream
walterddr commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream URL: https://github.com/apache/flink/pull/6521#discussion_r214386474 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala ## @@ -214,10 +214,15 @@ trait ImplicitExpressionOperations { def varSamp = VarSamp(expr) /** -* Returns multiset aggregate of a given expression. +* Returns multiset aggregate of a given expression. */ def collect = Collect(expr) + /** +* Returns a distinct field reference to a given expression +*/ + def distinct = DistinctAgg(expr) Review comment: I was not sure if this is what you had in mind. please take another look. thx This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] walterddr commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream
walterddr commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream URL: https://github.com/apache/flink/pull/6521#discussion_r214386228 ## File path: docs/dev/table/udfs.md ## @@ -650,6 +650,36 @@ tEnv.sqlQuery("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GRO +User-defined aggregation function can be used with `distinct` modifiers. To calculate the aggregate results only for distinct values, simply add the distinct modifier towards the aggregation function. Review comment: Done. yes I agree. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] walterddr commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream
walterddr commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream URL: https://github.com/apache/flink/pull/6521#discussion_r209313630 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala ## @@ -136,4 +136,11 @@ abstract class AggregateFunction[T, ACC] extends UserDefinedFunction { * accumulator type should be automatically inferred. */ def getAccumulatorType: TypeInformation[ACC] = null + + private[flink] var isDistinctAgg: Boolean = false + + private[flink] def distinct: AggregateFunction[T, ACC] = { Review comment: This is not going to work as it modifies an underlying field of this particular AggregateFunction object. For example: ``` table.select(udagg.distinct('a), udagg('a)) ``` will return the same result in both column because distinct modifier has been added to this particular `udagg` element. This is a blunder on my end and I should fixed this before further reviews can be conducted. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] walterddr commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream
walterddr commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream URL: https://github.com/apache/flink/pull/6521#discussion_r208624881 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala ## @@ -214,10 +214,16 @@ trait ImplicitExpressionOperations { def varSamp = VarSamp(expr) /** -* Returns multiset aggregate of a given expression. +* Returns multiset aggregate of a given expression. */ def collect = Collect(expr) + /** +* Return a distinct field reference to a given expression +* @return +*/ + def distinct = DistinctAgg(expr) Review comment: This is the particular places I would like to have 2nd opinion on. I thought of 2 ways of doing this: 1. the way I did, adding distinct in `ExpressionDsl.scala`, and treate `distinct` as a special Aggregation; separately having a `distinct` method in `AggregationFunction` class for UDAGG. 2. adding distinct into `ExpressionParser.scala` and create special prefix / suffix rules to handle distinct modifier with different expression converter. I prefer the 1st way, since the 2nd way requires unifying the parsing for both `distinct` in agg and `distinct` in table. But would like to more opinions. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services