[GitHub] flink pull request #4736: [FLINK-7371] [table] Add support for constant para...

2017-10-16 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4736


---


[GitHub] flink pull request #4736: [FLINK-7371] [table] Add support for constant para...

2017-10-02 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4736#discussion_r142254410
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala
 ---
@@ -85,6 +85,46 @@ class OverWindowITCase extends 
StreamingWithStateTestBase {
   }
 
   @Test
+  def testOverWindowWithConstant(): Unit = {
+
+val data = List(
+  (1L, 1, "Hello"),
+  (2L, 2, "Hello"),
+  (3L, 3, "Hello"),
+  (4L, 4, "Hello"),
+  (5L, 5, "Hello"),
+  (6L, 6, "Hello"),
+  (7L, 7, "Hello World"),
+  (8L, 8, "Hello World"),
+  (8L, 8, "Hello World"),
+  (20L, 20, "Hello World"))
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(1)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.testResults = mutable.MutableList()
+StreamITCase.clear
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+val weightAvgFun = new WeightedAvg
+
+val windowedTable = table
+  .window(
+Over partitionBy 'c orderBy 'proctime preceding UNBOUNDED_ROW as 
'w)
+  .select('c, weightAvgFun('a, 42, 'b, "2") over 'w as 'wAvg)
+  .select('c, 'wAvg)
--- End diff --

can be removed


---


[GitHub] flink pull request #4736: [FLINK-7371] [table] Add support for constant para...

2017-10-02 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4736#discussion_r142247401
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1670,4 +1670,34 @@ abstract class CodeGenerator(
 
 fieldTerm
   }
+
+  /**
+* Adds a reusable constant to the member area of the generated 
[[Function]].
+*
+* @param constant constant expression
+* @return member variable term
+*/
+  def addReusableBoxedConstant(constant: GeneratedExpression): String = {
+require(constant.literal, "Literal expected")
+
+val fieldTerm = newName("constant")
+
+val boxed = generateOutputFieldBoxing(constant)
+val boxedType = boxedTypeTermForTypeInfo(boxed.resultType)
+
+val field =
+  s"""
+|transient $boxedType $fieldTerm;
--- End diff --

why `transient`? Couldn't this be `final`?


---


[GitHub] flink pull request #4736: [FLINK-7371] [table] Add support for constant para...

2017-10-02 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4736#discussion_r142256742
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
 ---
@@ -86,6 +86,13 @@ public Long getValue(WeightedAvgAccum accumulator) {
}
 
// overloaded accumulate method
+   // dummy to test constants
+   public void accumulate(WeightedAvgAccum accumulator, long 
iValue, int iWeight, int x, String string) {
+   accumulator.sum += iWeight + Integer.parseInt(string);
--- End diff --

change the method to 
```
accumulator.sum += (iValue + Integer.parseInt(string)) * iWeight;
accumulator.count += iWeight;
``` 

to have some influence of the value of `string` in the result?


---


[GitHub] flink pull request #4736: [FLINK-7371] [table] Add support for constant para...

2017-09-27 Thread twalthr
GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/4736

[FLINK-7371] [table] Add support for constant parameters in OVER aggregate

## What is the purpose of the change

This PR allows to pass constants to OVER window aggregates. E.g. 
`.select('c, weightAvgFun('a, 42, 'b, "2") over 'w as 'wAvg)`.


## Brief change log

Until now the constants where simply ignored. I added code generation for 
the literals in `AggregationCodeGenerator`.


## Verifying this change

I add a ITCase for it. I might add more tests if I have time. In general, 
we need to rework the logic there a little bit, because I think we also do not 
support DATE, TIME etc. right now.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-7371

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4736.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4736


commit 19e056e038009e22e2b607b38931f575d5c948df
Author: twalthr 
Date:   2017-09-27T15:11:28Z

[FLINK-7371] [table] Add support for constant parameters in OVER aggregate




---