[GitHub] flink pull request #4736: [FLINK-7371] [table] Add support for constant para...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4736 ---
[GitHub] flink pull request #4736: [FLINK-7371] [table] Add support for constant para...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4736#discussion_r142254410 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala --- @@ -85,6 +85,46 @@ class OverWindowITCase extends StreamingWithStateTestBase { } @Test + def testOverWindowWithConstant(): Unit = { + +val data = List( + (1L, 1, "Hello"), + (2L, 2, "Hello"), + (3L, 3, "Hello"), + (4L, 4, "Hello"), + (5L, 5, "Hello"), + (6L, 6, "Hello"), + (7L, 7, "Hello World"), + (8L, 8, "Hello World"), + (8L, 8, "Hello World"), + (20L, 20, "Hello World")) + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setParallelism(1) +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.testResults = mutable.MutableList() +StreamITCase.clear +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) +val weightAvgFun = new WeightedAvg + +val windowedTable = table + .window( +Over partitionBy 'c orderBy 'proctime preceding UNBOUNDED_ROW as 'w) + .select('c, weightAvgFun('a, 42, 'b, "2") over 'w as 'wAvg) + .select('c, 'wAvg) --- End diff -- can be removed ---
[GitHub] flink pull request #4736: [FLINK-7371] [table] Add support for constant para...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4736#discussion_r142247401 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -1670,4 +1670,34 @@ abstract class CodeGenerator( fieldTerm } + + /** +* Adds a reusable constant to the member area of the generated [[Function]]. +* +* @param constant constant expression +* @return member variable term +*/ + def addReusableBoxedConstant(constant: GeneratedExpression): String = { +require(constant.literal, "Literal expected") + +val fieldTerm = newName("constant") + +val boxed = generateOutputFieldBoxing(constant) +val boxedType = boxedTypeTermForTypeInfo(boxed.resultType) + +val field = + s""" +|transient $boxedType $fieldTerm; --- End diff -- why `transient`? Couldn't this be `final`? ---
[GitHub] flink pull request #4736: [FLINK-7371] [table] Add support for constant para...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4736#discussion_r142256742 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java --- @@ -86,6 +86,13 @@ public Long getValue(WeightedAvgAccum accumulator) { } // overloaded accumulate method + // dummy to test constants + public void accumulate(WeightedAvgAccum accumulator, long iValue, int iWeight, int x, String string) { + accumulator.sum += iWeight + Integer.parseInt(string); --- End diff -- change the method to ``` accumulator.sum += (iValue + Integer.parseInt(string)) * iWeight; accumulator.count += iWeight; ``` to have some influence of the value of `string` in the result? ---
[GitHub] flink pull request #4736: [FLINK-7371] [table] Add support for constant para...
GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/4736 [FLINK-7371] [table] Add support for constant parameters in OVER aggregate ## What is the purpose of the change This PR allows to pass constants to OVER window aggregates. E.g. `.select('c, weightAvgFun('a, 42, 'b, "2") over 'w as 'wAvg)`. ## Brief change log Until now the constants where simply ignored. I added code generation for the literals in `AggregationCodeGenerator`. ## Verifying this change I add a ITCase for it. I might add more tests if I have time. In general, we need to rework the logic there a little bit, because I think we also do not support DATE, TIME etc. right now. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink FLINK-7371 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4736.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4736 commit 19e056e038009e22e2b607b38931f575d5c948df Author: twalthrDate: 2017-09-27T15:11:28Z [FLINK-7371] [table] Add support for constant parameters in OVER aggregate ---