[jira] [Commented] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run

2017-04-18 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974124#comment-15974124
 ] 

mingleizhang commented on FLINK-6311:
-

[~tzulitai] Thanks for telling me so useful information. I am very appreciate 
it. Yep, I would like to work on this and been working on it soon enough.

> NPE in FlinkKinesisConsumer if source was closed before run
> ---
>
> Key: FLINK-6311
> URL: https://issues.apache.org/jira/browse/FLINK-6311
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: mingleizhang
>
> This was reported by an user: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html
> The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected 
> against the condition when the source was closed before it started running. 
> Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5095) Add explicit notifyOfAddedX methods to MetricReporter interface

2017-04-18 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974103#comment-15974103
 ] 

Bowen Li commented on FLINK-5095:
-

I'll spend time in the following few days to see how other projects (hadoop, 
kafka, etc) handles this situation, and kick off a discussion

> Add explicit notifyOfAddedX methods to MetricReporter interface
> ---
>
> Key: FLINK-5095
> URL: https://issues.apache.org/jira/browse/FLINK-5095
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.1.3
>Reporter: Chesnay Schepler
>Priority: Minor
>
> I would like to start a discussion on the MetricReporter interface, 
> specifically the methods that notify a reporter of added or removed metrics.
> Currently, the methods are defined as follows:
> {code}
> void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
> void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup 
> group);
> {code}
> All metrics, regardless of their actual type, are passed to the reporter with 
> these methods.
> Since the different metric types have to be handled differently we thus force 
> every reporter to do something like this:
> {code}
> if (metric instanceof Counter) {
> Counter c = (Counter) metric;
>   // deal with counter
> } else if (metric instanceof Gauge) {
>   // deal with gauge
> } else if (metric instanceof Histogram) {
>   // deal with histogram
> } else if (metric instanceof Meter) {
>   // deal with meter
> } else {
>   // log something or throw an exception
> }
> {code}
> This has a few issues
> * the instanceof checks and castings are unnecessary overhead
> * it requires the implementer to be aware of every metric type
> * it encourages throwing an exception in the final else block
> We could remedy all of these by reworking the interface to contain explicit 
> add/remove methods for every metric type. This would however be a breaking 
> change and blow up the interface to 12 methods from the current 4. We could 
> also add a RichMetricReporter interface with these methods, which would 
> require relatively little changes but add additional complexity.
> I was wondering what other people think about this.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3736: [Flink-6013][metrics] Add Datadog HTTP metrics reporter

2017-04-18 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/3736
  
Addressed @zentol 's comments. Ready for another round. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-18 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112115930
  
--- Diff: flink-metrics/flink-metrics-datadog/pom.xml ---
@@ -0,0 +1,85 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+   
+   org.apache.flink
+   flink-metrics
+   1.3-SNAPSHOT
+   ..
+   
+
+org.apache.flink
+flink-metrics-datadog
+1.3-SNAPSHOT
+
+   
+   
+   org.apache.flink
+   flink-metrics-core
+   ${project.version}
+   provided
+   
+
+   
+   com.google.guava
--- End diff --

shaded


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-18 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112115925
  
--- Diff: docs/monitoring/metrics.md ---
@@ -436,6 +436,38 @@ metrics.reporter.stsd.port: 8125
 
 {% endhighlight %}
 
+### Datadog (org.apache.flink.metrics.datadog.DatadogHttpReporter)
+
+In order to use this reporter you must copy 
`/opt/flink-metrics-datadog-{{site.version}}.jar` into the `/lib` folder
+of your Flink distribution.
+
+Parameters:
+
+- `apikey` - the Datadog API key
+- `tags` - (optional) the global tags that will be applied to metrics when 
sending to Datadog. Tags should be separated by comma only
+
+Example configuration:
+
+{% highlight yaml %}
+
+metrics.reporters: dghttp
+metrics.reporter.dghttp.class: 
org.apache.flink.metrics.datadog.DatadogHttpReporter
+metrics.reporter.dghttp.apikey: xxx
+metrics.reporter.dghttp.tags: myflinkapp,prod
+
+// , , , , , 
 will be sent to Datadog as tags
+metrics.scope.jm: .jobmanager
+metrics.scope.jm.job: ..jobmanager.job
+metrics.scope.tm: ..taskmanager
+metrics.scope.tm.job: ...taskmanager.job
+metrics.scope.task: 
.task
+metrics.scope.operator: 
.operator
+
+{% endhighlight %}
+
+Such metric reporting implementation is a best practice based on our 
experience working with Datadog. It helps 
--- End diff --

removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run

2017-04-18 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974069#comment-15974069
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-6311:


the {{flink-connector-kinesis}} module is only included as a separate build 
profile ("include-kinesis"). We did this because of AWS License issues, and do 
not build the kinesis connector along with the other code. But this shouldn't 
be relevant for this JIRA, you can still proceed to fix it if you want to :)

> NPE in FlinkKinesisConsumer if source was closed before run
> ---
>
> Key: FLINK-6311
> URL: https://issues.apache.org/jira/browse/FLINK-6311
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: mingleizhang
>
> This was reported by an user: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html
> The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected 
> against the condition when the source was closed before it started running. 
> Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run

2017-04-18 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang reassigned FLINK-6311:
---

Assignee: mingleizhang

> NPE in FlinkKinesisConsumer if source was closed before run
> ---
>
> Key: FLINK-6311
> URL: https://issues.apache.org/jira/browse/FLINK-6311
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: mingleizhang
>
> This was reported by an user: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html
> The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected 
> against the condition when the source was closed before it started running. 
> Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run

2017-04-18 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974012#comment-15974012
 ] 

mingleizhang commented on FLINK-6311:
-

[~tzulitai] I just watch FlinkKinesisConsumer which under the package of 
{code}org.apache.flink.streaming.connectors.kinesis{code} in the module 
flink-connector-kinesis.  And the flink-connector-kinesis is under the module 
of flink-connectors in which pom.xml dont contains the 
{code}flink-connector-kinesis{code}. I would think we should add the module 
{code} flink-connector-kinesis {code} in flink-connectors pom.xml and then 
return to this issue. How do you think of this ? 

> NPE in FlinkKinesisConsumer if source was closed before run
> ---
>
> Key: FLINK-6311
> URL: https://issues.apache.org/jira/browse/FLINK-6311
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>
> This was reported by an user: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html
> The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected 
> against the condition when the source was closed before it started running. 
> Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-6149) add additional flink logical relation nodes

2017-04-18 Thread Kurt Young (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-6149.
-
   Resolution: Implemented
Fix Version/s: 1.3.0

> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
> Fix For: 1.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3594


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974000#comment-15974000
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3594


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4821) Implement rescalable non-partitioned state for Kinesis Connector

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973988#comment-15973988
 ] 

ASF GitHub Bot commented on FLINK-4821:
---

Github user tony810430 commented on the issue:

https://github.com/apache/flink/pull/3001
  
Hi @tzulitai ,
I have rebased it and updated to the public API.


> Implement rescalable non-partitioned state for Kinesis Connector
> 
>
> Key: FLINK-4821
> URL: https://issues.apache.org/jira/browse/FLINK-4821
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Wei-Che Wei
>
> FLINK-4379 added the rescalable non-partitioned state feature, along with the 
> implementation for the Kafka connector.
> The AWS Kinesis connector will benefit from the feature and should implement 
> it too. This ticket tracks progress for this.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3001: [FLINK-4821] [kinesis] Implement rescalable non-partition...

2017-04-18 Thread tony810430
Github user tony810430 commented on the issue:

https://github.com/apache/flink/pull/3001
  
Hi @tzulitai ,
I have rebased it and updated to the public API.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-04-18 Thread hongyuhong (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973982#comment-15973982
 ] 

hongyuhong commented on FLINK-6232:
---

Hi [~fhueske] i have implement it use normal timestamp attribute but not like 
a.proctime/a.rowtime, cause it's not supported, I would appreciate if you can 
give some advice.

Thanks very much.

> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL: https://issues.apache.org/jira/browse/FLINK-6232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + 
> INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the 
> time condition only support bounded time range like {{o.proctime BETWEEN 
> s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not 
> support unbounded like {{o.proctime > s.protime}},  and  should include both 
> two stream's proctime attribute, {{o.proctime between proctime() and 
> proctime() + 1}} should also not be supported.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3001: [FLINK-4821] [kinesis] Implement rescalable non-partition...

2017-04-18 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3001
  
Hi @tony810430,
The exposure for union list state was just merged to master.
Could you rebase this? Once rebased I'll do a full review so that we can 
finally work towards merging this :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973954#comment-15973954
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user hongyuhong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112105763
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
 ---
@@ -1140,6 +1140,202 @@ class SqlITCase extends StreamingWithStateTestBase {
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  @Test
+  def testNonPartitionedProcTimeOverDistinctWindow(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+StreamITCase.testResults = mutable.MutableList()
+
+val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv).as('a, 
'b, 'c, 'd, 'e)
+tEnv.registerTable("MyTable", t)
+
+val sqlQuery = "SELECT a,  " +
+  "  SUM(DIST(e)) OVER (" +
+  " ORDER BY procTime() ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS 
sumE " +
--- End diff --

I think we can add some test for multi aggregation with distinct.


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973955#comment-15973955
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user hongyuhong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112103318
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
 ---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+class ProcTimeBoundedDistinctRowsOver(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Array[Int]],
+  private val distinctAggsFlag: Array[Boolean],
+  private val precedingOffset: Long,
+  private val forwardedFieldCount: Int,
+  private val aggregatesTypeInfo: RowTypeInfo,
+  private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkNotNull(distinctAggsFlag)
+  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkArgument(precedingOffset > 0)
+
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  private var output: Row = _
+  private var counterState: ValueState[Long] = _
+  private var smallestTsState: ValueState[Long] = _
+  private var distinctValueState: MapState[Any, Row] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState = 
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
+smallestTsState = 

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

2017-04-18 Thread hongyuhong
Github user hongyuhong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112103318
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
 ---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+class ProcTimeBoundedDistinctRowsOver(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Array[Int]],
+  private val distinctAggsFlag: Array[Boolean],
+  private val precedingOffset: Long,
+  private val forwardedFieldCount: Int,
+  private val aggregatesTypeInfo: RowTypeInfo,
+  private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkNotNull(distinctAggsFlag)
+  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkArgument(precedingOffset > 0)
+
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  private var output: Row = _
+  private var counterState: ValueState[Long] = _
+  private var smallestTsState: ValueState[Long] = _
+  private var distinctValueState: MapState[Any, Row] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState = 
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
+smallestTsState = 
getRuntimeContext.getState(smallestTimestampDescriptor)
+
+val distinctValDescriptor : MapStateDescriptor[Any, Row] =
+  new MapStateDescriptor[Any, Row]("distinctValuesBufferMapState", 
classOf[Any], classOf[Row])
--- End diff --


[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

2017-04-18 Thread hongyuhong
Github user hongyuhong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112105763
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
 ---
@@ -1140,6 +1140,202 @@ class SqlITCase extends StreamingWithStateTestBase {
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  @Test
+  def testNonPartitionedProcTimeOverDistinctWindow(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+StreamITCase.testResults = mutable.MutableList()
+
+val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv).as('a, 
'b, 'c, 'd, 'e)
+tEnv.registerTable("MyTable", t)
+
+val sqlQuery = "SELECT a,  " +
+  "  SUM(DIST(e)) OVER (" +
+  " ORDER BY procTime() ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS 
sumE " +
--- End diff --

I think we can add some test for multi aggregation with distinct.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973956#comment-15973956
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user hongyuhong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112105507
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
 ---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+class ProcTimeBoundedDistinctRowsOver(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Array[Int]],
+  private val distinctAggsFlag: Array[Boolean],
+  private val precedingOffset: Long,
+  private val forwardedFieldCount: Int,
+  private val aggregatesTypeInfo: RowTypeInfo,
+  private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkNotNull(distinctAggsFlag)
+  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkArgument(precedingOffset > 0)
+
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  private var output: Row = _
+  private var counterState: ValueState[Long] = _
+  private var smallestTsState: ValueState[Long] = _
+  private var distinctValueState: MapState[Any, Row] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState = 
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
+smallestTsState = 

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

2017-04-18 Thread hongyuhong
Github user hongyuhong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112105029
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
 ---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+class ProcTimeBoundedDistinctRowsOver(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Array[Int]],
+  private val distinctAggsFlag: Array[Boolean],
+  private val precedingOffset: Long,
+  private val forwardedFieldCount: Int,
+  private val aggregatesTypeInfo: RowTypeInfo,
+  private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkNotNull(distinctAggsFlag)
+  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkArgument(precedingOffset > 0)
+
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  private var output: Row = _
+  private var counterState: ValueState[Long] = _
+  private var smallestTsState: ValueState[Long] = _
+  private var distinctValueState: MapState[Any, Row] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState = 
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
+smallestTsState = 
getRuntimeContext.getState(smallestTimestampDescriptor)
+
+val distinctValDescriptor : MapStateDescriptor[Any, Row] =
+  new MapStateDescriptor[Any, Row]("distinctValuesBufferMapState", 
classOf[Any], classOf[Row])
+distinctValueState = 

[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973953#comment-15973953
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user hongyuhong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112105029
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
 ---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+class ProcTimeBoundedDistinctRowsOver(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Array[Int]],
+  private val distinctAggsFlag: Array[Boolean],
+  private val precedingOffset: Long,
+  private val forwardedFieldCount: Int,
+  private val aggregatesTypeInfo: RowTypeInfo,
+  private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkNotNull(distinctAggsFlag)
+  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkArgument(precedingOffset > 0)
+
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  private var output: Row = _
+  private var counterState: ValueState[Long] = _
+  private var smallestTsState: ValueState[Long] = _
+  private var distinctValueState: MapState[Any, Row] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState = 
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
+smallestTsState = 

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

2017-04-18 Thread hongyuhong
Github user hongyuhong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112105507
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
 ---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+class ProcTimeBoundedDistinctRowsOver(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Array[Int]],
+  private val distinctAggsFlag: Array[Boolean],
+  private val precedingOffset: Long,
+  private val forwardedFieldCount: Int,
+  private val aggregatesTypeInfo: RowTypeInfo,
+  private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkNotNull(distinctAggsFlag)
+  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkArgument(precedingOffset > 0)
+
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  private var output: Row = _
+  private var counterState: ValueState[Long] = _
+  private var smallestTsState: ValueState[Long] = _
+  private var distinctValueState: MapState[Any, Row] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState = 
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
+smallestTsState = 
getRuntimeContext.getState(smallestTimestampDescriptor)
+
+val distinctValDescriptor : MapStateDescriptor[Any, Row] =
+  new MapStateDescriptor[Any, Row]("distinctValuesBufferMapState", 
classOf[Any], classOf[Row])
+distinctValueState = 

[jira] [Resolved] (FLINK-6324) Refine state access methods in OperatorStateStore

2017-04-18 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-6324.

   Resolution: Fixed
Fix Version/s: 1.3.0

Resolved for 1.3.0 with 
http://git-wip-us.apache.org/repos/asf/flink/commit/a1aab64

> Refine state access methods in OperatorStateStore
> -
>
> Key: FLINK-6324
> URL: https://issues.apache.org/jira/browse/FLINK-6324
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.3.0
>
>
> This proposes to refine the OperatorStateStore interface by,
> - deprecating Java serialization shortcuts
> - rename getOperatorState to getListState
> The Java serialization shortcuts can be deprecated because they were
> previously introduced to provide a smoother migration path from older
> savepoints. However, its usage should definitely be discouraged.
> Renaming to {{getListState}} is a preparation of making the names of state
> access methods contain information about both its redistribution pattern
> on restore and the shape of its data structure, since the combination of
> these two is orthogonal. This convention will also provide a better
> naming pattern for more state access methods in the future, for example
> {{getUnionListState}}. If the method name does not contain its
> redistribution pattern (e.g., {{getListState}}), then it simply implies the
> default repartitioning scheme (SPLIT_DISTRIBUTE).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (FLINK-5991) Expose Broadcast Operator State through public APIs

2017-04-18 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-5991.

Resolution: Fixed

Resolved for 1.3.0 with 
http://git-wip-us.apache.org/repos/asf/flink/commit/2ef4900

> Expose Broadcast Operator State through public APIs
> ---
>
> Key: FLINK-5991
> URL: https://issues.apache.org/jira/browse/FLINK-5991
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.3.0
>
>
> The broadcast operator state functionality was added in FLINK-5265, it just 
> hasn't been exposed through any public APIs yet.
> Currently, we have 2 streaming connector features for 1.3 that are pending on 
> broadcast state: rescalable Kinesis / Kafka consumers with shard / partition 
> discovery (FLINK-4821 & FLINK-4022). We should consider exposing broadcast 
> state for the 1.3 release also.
> This JIRA also serves the purpose to discuss how we want to expose it.
> To initiate the discussion, I propose:
> 1. For the more powerful {{CheckpointedFunction}}, add the following to the 
> {{OperatorStateStore}} interface:
> {code}
>  ListState getBroadcastOperatorState(ListStateDescriptor 
> stateDescriptor);
>  ListState 
> getBroadcastSerializableListState(String stateName);
> {code}
> 2. For a simpler {{ListCheckpointed}} variant, we probably should have a 
> separate {{BroadcastListCheckpointed}} interface.
> Extending {{ListCheckpointed}} to let the user define either the list state 
> type of either {{PARTITIONABLE}} or {{BROADCAST}} might also be possible, if 
> we can rely on a contract that the value doesn't change. Or we expose a 
> defining method (e.g. {{getListStateType()}}) that is called only once in the 
> operator. This would break user code, but can be considered because it is 
> marked as {{PublicEvolving}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5991) Expose Broadcast Operator State through public APIs

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973949#comment-15973949
 ] 

ASF GitHub Bot commented on FLINK-5991:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3508


> Expose Broadcast Operator State through public APIs
> ---
>
> Key: FLINK-5991
> URL: https://issues.apache.org/jira/browse/FLINK-5991
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.3.0
>
>
> The broadcast operator state functionality was added in FLINK-5265, it just 
> hasn't been exposed through any public APIs yet.
> Currently, we have 2 streaming connector features for 1.3 that are pending on 
> broadcast state: rescalable Kinesis / Kafka consumers with shard / partition 
> discovery (FLINK-4821 & FLINK-4022). We should consider exposing broadcast 
> state for the 1.3 release also.
> This JIRA also serves the purpose to discuss how we want to expose it.
> To initiate the discussion, I propose:
> 1. For the more powerful {{CheckpointedFunction}}, add the following to the 
> {{OperatorStateStore}} interface:
> {code}
>  ListState getBroadcastOperatorState(ListStateDescriptor 
> stateDescriptor);
>  ListState 
> getBroadcastSerializableListState(String stateName);
> {code}
> 2. For a simpler {{ListCheckpointed}} variant, we probably should have a 
> separate {{BroadcastListCheckpointed}} interface.
> Extending {{ListCheckpointed}} to let the user define either the list state 
> type of either {{PARTITIONABLE}} or {{BROADCAST}} might also be possible, if 
> we can rely on a contract that the value doesn't change. Or we expose a 
> defining method (e.g. {{getListStateType()}}) that is called only once in the 
> operator. This would break user code, but can be considered because it is 
> marked as {{PublicEvolving}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3508: [FLINK-5991] [state-backend, streaming] Expose Bro...

2017-04-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3508


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (FLINK-6130) Consider calling resourceManager#getTerminationFuture() with lock held

2017-04-18 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973937#comment-15973937
 ] 

mingleizhang edited comment on FLINK-6130 at 4/19/17 2:36 AM:
--

[~Zentol] [~till.rohrmann] That makes sense to me now. So, I just have decided 
that the previous practice {code}Object result = future.value().get();{code} is 
meaningless as I can not get any useful message from it. Thanks and appreciate 
it.


was (Author: mingleizhang):
[~Zentol] [~till.rohrmann] That makes sense to me now. So, I just have decided 
that the previous practice {code}Object result = future.value().get();{code} is 
meaningless as I can not any useful message from it. Thanks and appreciate it.

> Consider calling resourceManager#getTerminationFuture() with lock held
> --
>
> Key: FLINK-6130
> URL: https://issues.apache.org/jira/browse/FLINK-6130
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> In YarnFlinkApplicationMasterRunner#runApplicationMaster() :
> {code}
>   synchronized (lock) {
> LOG.info("Starting High Availability Services");
> ...
>   }
>   // wait for resource manager to finish
>   resourceManager.getTerminationFuture().get();
> {code}
> resourceManager#getTerminationFuture() is called without holding lock.
> We should store the value returned from 
> resourceManager#getTerminationFuture() inside the synchronized block.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6130) Consider calling resourceManager#getTerminationFuture() with lock held

2017-04-18 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973937#comment-15973937
 ] 

mingleizhang commented on FLINK-6130:
-

[~Zentol] [~till.rohrmann] That makes sense to me now. So, I just have decided 
that the previous practice {code}Object result = future.value().get();{code} is 
meaningless as I can not any useful message from it. Thanks and appreciate it.

> Consider calling resourceManager#getTerminationFuture() with lock held
> --
>
> Key: FLINK-6130
> URL: https://issues.apache.org/jira/browse/FLINK-6130
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> In YarnFlinkApplicationMasterRunner#runApplicationMaster() :
> {code}
>   synchronized (lock) {
> LOG.info("Starting High Availability Services");
> ...
>   }
>   // wait for resource manager to finish
>   resourceManager.getTerminationFuture().get();
> {code}
> resourceManager#getTerminationFuture() is called without holding lock.
> We should store the value returned from 
> resourceManager#getTerminationFuture() inside the synchronized block.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6324) Refine state access methods in OperatorStateStore

2017-04-18 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-6324:
--

 Summary: Refine state access methods in OperatorStateStore
 Key: FLINK-6324
 URL: https://issues.apache.org/jira/browse/FLINK-6324
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API, State Backends, Checkpointing
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai


This proposes to refine the OperatorStateStore interface by,
- deprecating Java serialization shortcuts
- rename getOperatorState to getListState

The Java serialization shortcuts can be deprecated because they were
previously introduced to provide a smoother migration path from older
savepoints. However, its usage should definitely be discouraged.

Renaming to {{getListState}} is a preparation of making the names of state
access methods contain information about both its redistribution pattern
on restore and the shape of its data structure, since the combination of
these two is orthogonal. This convention will also provide a better
naming pattern for more state access methods in the future, for example
{{getUnionListState}}. If the method name does not contain its
redistribution pattern (e.g., {{getListState}}), then it simply implies the
default repartitioning scheme (SPLIT_DISTRIBUTE).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973910#comment-15973910
 ] 

ASF GitHub Bot commented on FLINK-6149:
---

Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3594
  
Rebased to master and will merge after build check


> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3594: [FLINK-6149] [table] Add additional flink logical relatio...

2017-04-18 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3594
  
Rebased to master and will merge after build check


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973846#comment-15973846
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user shaoxuan-wang commented on the issue:

https://github.com/apache/flink/pull/3735
  
@fhueske  thanks for your feedback.
Yes, we could keep GeneratedAggregations interface very clean as 
```
abstract class GeneratedAggregations extends Function {
 def setAggregationResults(accumulators: Row, output: Row)
 def setForwardedFields(input: Row, output: Row)
 def accumulate(accumulators: Row, input: Row)
 def retract(accumulators: Row, input: Row)
 def createAccumulators(): Row
 def mergeAccumulatorsPair(a: Row, b: Row): Row
 def resetAccumulator(accumulators: Row)
}
```
But I feel it might be not very good to add more parameters into code 
generate function as caller function will usually have to construct unnecessary 
empty parameters. I think we can break code generate functions into 2-3 
functions (these are just the interface to process code-gen parameters, the 
fundamental implementation of each function will be shared). Let me prototype 
the changes, and we can continue the discussions from there.

Regarding to your other comments. I did not look into the logic of previous 
implementations while just focused on the code-gen. I will take a look and 
optimize them.


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3735: [FLINK-6242] [table] Add code generation for DataSet Aggr...

2017-04-18 Thread shaoxuan-wang
Github user shaoxuan-wang commented on the issue:

https://github.com/apache/flink/pull/3735
  
@fhueske  thanks for your feedback.
Yes, we could keep GeneratedAggregations interface very clean as 
```
abstract class GeneratedAggregations extends Function {
 def setAggregationResults(accumulators: Row, output: Row)
 def setForwardedFields(input: Row, output: Row)
 def accumulate(accumulators: Row, input: Row)
 def retract(accumulators: Row, input: Row)
 def createAccumulators(): Row
 def mergeAccumulatorsPair(a: Row, b: Row): Row
 def resetAccumulator(accumulators: Row)
}
```
But I feel it might be not very good to add more parameters into code 
generate function as caller function will usually have to construct unnecessary 
empty parameters. I think we can break code generate functions into 2-3 
functions (these are just the interface to process code-gen parameters, the 
fundamental implementation of each function will be shared). Let me prototype 
the changes, and we can continue the discussions from there.

Regarding to your other comments. I did not look into the logic of previous 
implementations while just focused on the code-gen. I will take a look and 
optimize them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6295) use LoadingCache instead of WeakHashMap to lower latency

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973832#comment-15973832
 ] 

ASF GitHub Bot commented on FLINK-6295:
---

Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3709
  
Ok i think i've got your point.

Now using WeakHashMap, we add entries when the map doesn't contain the 
requested EG id,  remove invalid entries when GC happens.

By adding `small 2-line branch` as you suggest, we add entries as same way 
as before, but check if a entry is valid when it's accessed by a handler, and 
update/remove it if it's invalid. Is it right?


> use LoadingCache instead of WeakHashMap to lower latency
> 
>
> Key: FLINK-6295
> URL: https://issues.apache.org/jira/browse/FLINK-6295
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Reporter: Tao Wang
>Assignee: Tao Wang
>
> Now in ExecutionGraphHolder, which is used in many handlers, we use a 
> WeakHashMap to cache ExecutionGraph(s), which is only sensitive to garbage 
> collection.
> The latency is too high when JVM do GC rarely, which will make status of jobs 
> or its tasks unmatched with the real ones.
> LoadingCache is a common used cache implementation from guava lib, we can use 
> its time based eviction to lower latency of status update.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3709: [FLINK-6295]use LoadingCache instead of WeakHashMap to lo...

2017-04-18 Thread WangTaoTheTonic
Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3709
  
Ok i think i've got your point.

Now using WeakHashMap, we add entries when the map doesn't contain the 
requested EG id,  remove invalid entries when GC happens.

By adding `small 2-line branch` as you suggest, we add entries as same way 
as before, but check if a entry is valid when it's accessed by a handler, and 
update/remove it if it's invalid. Is it right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-18 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112083224
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/metric/DGauge.java
 ---
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog.metric;
+
+import java.util.List;
+
+/**
+ * Mapping of gauge between Flink and Datadog
+ * */
+public class DGauge extends DMetric {
--- End diff --

Their MetricType are different. One is MetricType.counter and another is 
MetricType.gauge, which will be serialized as a json field 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973662#comment-15973662
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112080186
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
 ---
@@ -84,16 +109,38 @@ class GroupAggProcessFunction(
 }
 
 // Set aggregate result to the final output
-i = 0
-while (i < aggregates.length) {
-  val index = groupings.length + i
-  val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
-  aggregates(i).accumulate(accumulator, 
input.getField(aggFields(i)(0)))
-  output.setField(index, aggregates(i).getValue(accumulator))
-  i += 1
+if (input.command == Command.Delete) {
+  i = 0
+  while (i < aggregates.length) {
+val index = groupings.length + i
+val accumulator = 
accumulators.getField(i).asInstanceOf[Accumulator]
+aggregates(i).retract(accumulator, input.getField(aggFields(i)(0)))
+output.setField(index, aggregates(i).getValue(accumulator))
+i += 1
+  }
+} else {
+  i = 0
+  while (i < aggregates.length) {
+val index = groupings.length + i
+val accumulator = 
accumulators.getField(i).asInstanceOf[Accumulator]
+aggregates(i).accumulate(accumulator, 
input.getField(aggFields(i)(0)))
+output.setField(index, aggregates(i).getValue(accumulator))
+i += 1
+  }
 }
-state.update(accumulators)
 
+// if previous is not null, do retraction process
+if (null != previous) {
+  if (previous.equals(output)) {
+// ignore same output
+return
--- End diff --

We still need to update the state even if we do not emit new values. If we 
have a max aggregation with an accumulator that holds a map of `10->1, 5->2` 
and we add `8`. The new accumulator will be `10->1, 8->1, 5->2` but the 
aggregation result will still be 10. If we later retract `10`, the new max 
would be `5` but should be `8`.


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973658#comment-15973658
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112065233
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
 ---
@@ -152,6 +161,7 @@ public void serialize(Row record, DataOutputView 
target) throws IOException {
fieldSerializers[i].serialize(o, target);
}
}
+   commandSerializer.serialize(record.command, target);
--- End diff --

Also by adding the command to `Row` we add serialization overhead to all 
jobs that use Row (including batch Table API / SQL).


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973656#comment-15973656
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112065725
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -129,11 +131,17 @@ class DataStreamOverAggregate(
 generator,
 inputDS,
 isRowTimeType = false,
-isRowsClause = overWindow.isRows)
+isRowsClause = overWindow.isRows,
+consumeRetraction)
--- End diff --

I think it would be better to enable retraction for all types of OVER 
aggregates at the same time. 
Just supporting one specific type adds more confusion than it helps, in my 
opinion.


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973659#comment-15973659
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112060500
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
 ---
@@ -152,6 +161,7 @@ public void serialize(Row record, DataOutputView 
target) throws IOException {
fieldSerializers[i].serialize(o, target);
}
}
+   commandSerializer.serialize(record.command, target);
--- End diff --

I'm afraid we cannot change the serialization of `Row`. `Row` is a public 
class in `flink-core` and not an internal `flink-table` class. Hence, it is 
used at other places and might also be part of user applications. If we change 
the serialization, users might not be able to restore a job on 1.3 from a 
savepoint taken with 1.2. This restriction rules out to simply add a field to 
`Row` which would avoid major refactorings.

I see two options to add the command field to the data streams in 
`flink-table`

1. use a regular field in `Row`. This would mean that the physical layout 
of the `Row` is no longer the same as the logical layout, i.e., the one 
expected by Calcite. However, we will probably change this anyway for the 
upcoming changes related to the time indicators. For these, the physical layout 
will have fewer fields than the logical layout (we will remove time fields 
which are in the meta data of Flink's records or taken as processing time). By 
adding the command field, we would add a field which is not in the logical 
layout. The problem with this approach is that the command field would be at 
different positions in the Row (probably the last one). We could leverage the 
changes introduced by the time indicator changes (or the other way round). 
@twalthr is working on this. You can have a look at the current status here: 
https://github.com/twalthr/flink/tree/FLINK-5884
2. The other option is to wrap the rows in a custom data type similar to a 
`Tuple2[Row, Command]`. The data type could be names `Change` or `CRow` and 
would have its own `TypeInformation`, `TypeSerializer`, and `TypeComparator` 
which forward most calls to the type info, serializer, and comparator of `Row`. 
The problem with this approach is that we need to change the return types of 
all functions. For some functions this might not be a big issue if we can take 
the `Row` object before passing it to the code gen'd functions. The command 
field could be set when the result Row is returned or in a wrapping 
`Collector`. 

My gut feeling is that the second approach is easier to implement because 
we (hopefully) do not need to touch the generated code and "just" need to wrap 
all `Row` objects in `CRow` objects.


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973655#comment-15973655
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112078358
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
 ---
@@ -68,12 +70,35 @@ class GroupAggProcessFunction(
 var accumulators = state.value()
 
 if (null == accumulators) {
+  previous = null
   accumulators = new Row(aggregates.length)
   i = 0
   while (i < aggregates.length) {
 accumulators.setField(i, aggregates(i).createAccumulator())
 i += 1
   }
+} else {
+  // get previous row
+  if (generateRetraction) {
+if (null == previous) {
+  previous = new Row(groupings.length + aggregates.length)
--- End diff --

can we initialize `previous` in `open()`?


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973665#comment-15973665
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112081457
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala
 ---
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream
+
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, 
StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.flink.table.api.scala._
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.table.utils.TableFunc0
+
+import scala.collection.mutable
+
+/**
+  * tests for retraction
+  */
+class RetractionITCase extends StreamingWithStateTestBase {
+  // input data
+  val data = List(
+("Hello", 1),
+("word", 1),
+("Hello", 1),
+("bark", 1)
+  )
+
+  // keyed groupby + keyed groupby
+  @Test
+  def testWordCount(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.testResults = mutable.MutableList()
+env.setParallelism(1)
+env.setStateBackend(getStateBackend)
+
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'word, 'num)
+val resultTable = table
+  .groupBy('word)
+  .select('word as 'word, 'num.sum as 'count)
+  .groupBy('count)
+  .select('count, 'word.count as 'frequency)
+
+val results = resultTable.toDataStream[Row]
+results.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val expected = Seq("1,1", "1,2", "1,1", "2,1", "1,2")
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  // keyed groupby + non-keyed groupby
+  @Test
+  def testGroupByAndNonKeyedGroupBy(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.testResults = mutable.MutableList()
+env.setParallelism(1)
+env.setStateBackend(getStateBackend)
+
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'word, 'num)
+val resultTable = table
+  .groupBy('word)
+  .select('word as 'word, 'num.sum as 'count)
+  .select('count.sum)
+
+val results = resultTable.toDataStream[Row]
+results.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val expected = Seq("1", "2", "1", "3", "4")
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  // non-keyed groupby + keyed groupby
+  @Test
+  def testNonKeyedGroupByAndGroupBy(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.testResults = mutable.MutableList()
+env.setParallelism(1)
+env.setStateBackend(getStateBackend)
+
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'word, 'num)
+val resultTable = table
+  .select('num.sum as 'count)
+  .groupBy('count)
+  .select('count, 'count.count)
+
+val results = resultTable.toDataStream[Row]
+results.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val 

[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973664#comment-15973664
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112078707
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
 ---
@@ -41,14 +41,16 @@ class GroupAggProcessFunction(
 private val aggregates: Array[AggregateFunction[_]],
 private val aggFields: Array[Array[Int]],
 private val groupings: Array[Int],
-private val aggregationStateType: RowTypeInfo)
+private val aggregationStateType: RowTypeInfo,
+private val generateRetraction: Boolean)
   extends ProcessFunction[Row, Row] {
 
   Preconditions.checkNotNull(aggregates)
   Preconditions.checkNotNull(aggFields)
   Preconditions.checkArgument(aggregates.length == aggFields.length)
 
   private var output: Row = _
--- End diff --

rename to `newRow`.


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973657#comment-15973657
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112079294
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
 ---
@@ -84,16 +109,38 @@ class GroupAggProcessFunction(
 }
 
 // Set aggregate result to the final output
-i = 0
-while (i < aggregates.length) {
-  val index = groupings.length + i
-  val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
-  aggregates(i).accumulate(accumulator, 
input.getField(aggFields(i)(0)))
-  output.setField(index, aggregates(i).getValue(accumulator))
-  i += 1
+if (input.command == Command.Delete) {
+  i = 0
+  while (i < aggregates.length) {
+val index = groupings.length + i
+val accumulator = 
accumulators.getField(i).asInstanceOf[Accumulator]
+aggregates(i).retract(accumulator, input.getField(aggFields(i)(0)))
+output.setField(index, aggregates(i).getValue(accumulator))
+i += 1
+  }
+} else {
+  i = 0
+  while (i < aggregates.length) {
+val index = groupings.length + i
+val accumulator = 
accumulators.getField(i).asInstanceOf[Accumulator]
+aggregates(i).accumulate(accumulator, 
input.getField(aggFields(i)(0)))
+output.setField(index, aggregates(i).getValue(accumulator))
+i += 1
+  }
 }
-state.update(accumulators)
 
+// if previous is not null, do retraction process
+if (null != previous) {
--- End diff --

check against `generateRetraction`. The check can be optimized because 
`generateRetraction`is a `val` and hence `final`.


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973663#comment-15973663
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112081928
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala
 ---
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream
+
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, 
StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.flink.table.api.scala._
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.table.utils.TableFunc0
+
+import scala.collection.mutable
+
+/**
+  * tests for retraction
+  */
+class RetractionITCase extends StreamingWithStateTestBase {
+  // input data
+  val data = List(
+("Hello", 1),
+("word", 1),
+("Hello", 1),
+("bark", 1)
+  )
+
+  // keyed groupby + keyed groupby
+  @Test
+  def testWordCount(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.testResults = mutable.MutableList()
+env.setParallelism(1)
+env.setStateBackend(getStateBackend)
+
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'word, 'num)
+val resultTable = table
+  .groupBy('word)
+  .select('word as 'word, 'num.sum as 'count)
+  .groupBy('count)
+  .select('count, 'word.count as 'frequency)
+
+val results = resultTable.toDataStream[Row]
+results.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val expected = Seq("1,1", "1,2", "1,1", "2,1", "1,2")
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  // keyed groupby + non-keyed groupby
+  @Test
+  def testGroupByAndNonKeyedGroupBy(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.testResults = mutable.MutableList()
+env.setParallelism(1)
+env.setStateBackend(getStateBackend)
+
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'word, 'num)
+val resultTable = table
+  .groupBy('word)
+  .select('word as 'word, 'num.sum as 'count)
+  .select('count.sum)
+
+val results = resultTable.toDataStream[Row]
+results.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val expected = Seq("1", "2", "1", "3", "4")
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  // non-keyed groupby + keyed groupby
+  @Test
+  def testNonKeyedGroupByAndGroupBy(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.testResults = mutable.MutableList()
+env.setParallelism(1)
+env.setStateBackend(getStateBackend)
+
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'word, 'num)
+val resultTable = table
+  .select('num.sum as 'count)
+  .groupBy('count)
+  .select('count, 'count.count)
+
+val results = resultTable.toDataStream[Row]
+results.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val 

[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973660#comment-15973660
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112064885
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
 ---
@@ -107,6 +107,14 @@ class DataStreamGroupWindowAggregate(
 val groupingKeys = grouping.indices.toArray
 val inputDS = 
input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
 
+val consumeRetraction = DataStreamRetractionRules.isAccRetract(input)
+
+if (consumeRetraction) {
+  throw new TableException(
+"Retraction on group window is not supported yet. Note: Currently, 
group window should " +
+  "not follow an unbounded groupby.")
--- End diff --

`unbounded groupBy` -> `non-windowed GroupBy`?


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973661#comment-15973661
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112078731
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
 ---
@@ -41,14 +41,16 @@ class GroupAggProcessFunction(
 private val aggregates: Array[AggregateFunction[_]],
 private val aggFields: Array[Array[Int]],
 private val groupings: Array[Int],
-private val aggregationStateType: RowTypeInfo)
+private val aggregationStateType: RowTypeInfo,
+private val generateRetraction: Boolean)
   extends ProcessFunction[Row, Row] {
 
   Preconditions.checkNotNull(aggregates)
   Preconditions.checkNotNull(aggFields)
   Preconditions.checkArgument(aggregates.length == aggFields.length)
 
   private var output: Row = _
+  private var previous: Row = _
--- End diff --

rename to `previousRow`.


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

2017-04-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112080959
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala
 ---
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream
+
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, 
StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.flink.table.api.scala._
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.table.utils.TableFunc0
+
+import scala.collection.mutable
+
+/**
+  * tests for retraction
+  */
+class RetractionITCase extends StreamingWithStateTestBase {
+  // input data
+  val data = List(
--- End diff --

Please use more test data.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

2017-04-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112081457
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala
 ---
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream
+
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, 
StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.flink.table.api.scala._
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.table.utils.TableFunc0
+
+import scala.collection.mutable
+
+/**
+  * tests for retraction
+  */
+class RetractionITCase extends StreamingWithStateTestBase {
+  // input data
+  val data = List(
+("Hello", 1),
+("word", 1),
+("Hello", 1),
+("bark", 1)
+  )
+
+  // keyed groupby + keyed groupby
+  @Test
+  def testWordCount(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.testResults = mutable.MutableList()
+env.setParallelism(1)
+env.setStateBackend(getStateBackend)
+
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'word, 'num)
+val resultTable = table
+  .groupBy('word)
+  .select('word as 'word, 'num.sum as 'count)
+  .groupBy('count)
+  .select('count, 'word.count as 'frequency)
+
+val results = resultTable.toDataStream[Row]
+results.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val expected = Seq("1,1", "1,2", "1,1", "2,1", "1,2")
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  // keyed groupby + non-keyed groupby
+  @Test
+  def testGroupByAndNonKeyedGroupBy(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.testResults = mutable.MutableList()
+env.setParallelism(1)
+env.setStateBackend(getStateBackend)
+
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'word, 'num)
+val resultTable = table
+  .groupBy('word)
+  .select('word as 'word, 'num.sum as 'count)
+  .select('count.sum)
+
+val results = resultTable.toDataStream[Row]
+results.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val expected = Seq("1", "2", "1", "3", "4")
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  // non-keyed groupby + keyed groupby
+  @Test
+  def testNonKeyedGroupByAndGroupBy(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.testResults = mutable.MutableList()
+env.setParallelism(1)
+env.setStateBackend(getStateBackend)
+
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'word, 'num)
+val resultTable = table
+  .select('num.sum as 'count)
+  .groupBy('count)
+  .select('count, 'count.count)
+
+val results = resultTable.toDataStream[Row]
+results.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val expected = Seq("1,1", "1,0", "2,1", "2,0", "3,1", "3,0", "4,1")
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  // keyed groupby + over agg(unbounded, procTime, keyed)
+  @Test
+  def 

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

2017-04-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112078731
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
 ---
@@ -41,14 +41,16 @@ class GroupAggProcessFunction(
 private val aggregates: Array[AggregateFunction[_]],
 private val aggFields: Array[Array[Int]],
 private val groupings: Array[Int],
-private val aggregationStateType: RowTypeInfo)
+private val aggregationStateType: RowTypeInfo,
+private val generateRetraction: Boolean)
   extends ProcessFunction[Row, Row] {
 
   Preconditions.checkNotNull(aggregates)
   Preconditions.checkNotNull(aggFields)
   Preconditions.checkArgument(aggregates.length == aggFields.length)
 
   private var output: Row = _
+  private var previous: Row = _
--- End diff --

rename to `previousRow`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973654#comment-15973654
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112080959
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala
 ---
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream
+
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, 
StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.flink.table.api.scala._
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.table.utils.TableFunc0
+
+import scala.collection.mutable
+
+/**
+  * tests for retraction
+  */
+class RetractionITCase extends StreamingWithStateTestBase {
+  // input data
+  val data = List(
--- End diff --

Please use more test data.


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

2017-04-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112078707
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
 ---
@@ -41,14 +41,16 @@ class GroupAggProcessFunction(
 private val aggregates: Array[AggregateFunction[_]],
 private val aggFields: Array[Array[Int]],
 private val groupings: Array[Int],
-private val aggregationStateType: RowTypeInfo)
+private val aggregationStateType: RowTypeInfo,
+private val generateRetraction: Boolean)
   extends ProcessFunction[Row, Row] {
 
   Preconditions.checkNotNull(aggregates)
   Preconditions.checkNotNull(aggFields)
   Preconditions.checkArgument(aggregates.length == aggFields.length)
 
   private var output: Row = _
--- End diff --

rename to `newRow`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

2017-04-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112080186
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
 ---
@@ -84,16 +109,38 @@ class GroupAggProcessFunction(
 }
 
 // Set aggregate result to the final output
-i = 0
-while (i < aggregates.length) {
-  val index = groupings.length + i
-  val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
-  aggregates(i).accumulate(accumulator, 
input.getField(aggFields(i)(0)))
-  output.setField(index, aggregates(i).getValue(accumulator))
-  i += 1
+if (input.command == Command.Delete) {
+  i = 0
+  while (i < aggregates.length) {
+val index = groupings.length + i
+val accumulator = 
accumulators.getField(i).asInstanceOf[Accumulator]
+aggregates(i).retract(accumulator, input.getField(aggFields(i)(0)))
+output.setField(index, aggregates(i).getValue(accumulator))
+i += 1
+  }
+} else {
+  i = 0
+  while (i < aggregates.length) {
+val index = groupings.length + i
+val accumulator = 
accumulators.getField(i).asInstanceOf[Accumulator]
+aggregates(i).accumulate(accumulator, 
input.getField(aggFields(i)(0)))
+output.setField(index, aggregates(i).getValue(accumulator))
+i += 1
+  }
 }
-state.update(accumulators)
 
+// if previous is not null, do retraction process
+if (null != previous) {
+  if (previous.equals(output)) {
+// ignore same output
+return
--- End diff --

We still need to update the state even if we do not emit new values. If we 
have a max aggregation with an accumulator that holds a map of `10->1, 5->2` 
and we add `8`. The new accumulator will be `10->1, 8->1, 5->2` but the 
aggregation result will still be 10. If we later retract `10`, the new max 
would be `5` but should be `8`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

2017-04-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112065725
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -129,11 +131,17 @@ class DataStreamOverAggregate(
 generator,
 inputDS,
 isRowTimeType = false,
-isRowsClause = overWindow.isRows)
+isRowsClause = overWindow.isRows,
+consumeRetraction)
--- End diff --

I think it would be better to enable retraction for all types of OVER 
aggregates at the same time. 
Just supporting one specific type adds more confusion than it helps, in my 
opinion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

2017-04-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112079294
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
 ---
@@ -84,16 +109,38 @@ class GroupAggProcessFunction(
 }
 
 // Set aggregate result to the final output
-i = 0
-while (i < aggregates.length) {
-  val index = groupings.length + i
-  val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
-  aggregates(i).accumulate(accumulator, 
input.getField(aggFields(i)(0)))
-  output.setField(index, aggregates(i).getValue(accumulator))
-  i += 1
+if (input.command == Command.Delete) {
+  i = 0
+  while (i < aggregates.length) {
+val index = groupings.length + i
+val accumulator = 
accumulators.getField(i).asInstanceOf[Accumulator]
+aggregates(i).retract(accumulator, input.getField(aggFields(i)(0)))
+output.setField(index, aggregates(i).getValue(accumulator))
+i += 1
+  }
+} else {
+  i = 0
+  while (i < aggregates.length) {
+val index = groupings.length + i
+val accumulator = 
accumulators.getField(i).asInstanceOf[Accumulator]
+aggregates(i).accumulate(accumulator, 
input.getField(aggFields(i)(0)))
+output.setField(index, aggregates(i).getValue(accumulator))
+i += 1
+  }
 }
-state.update(accumulators)
 
+// if previous is not null, do retraction process
+if (null != previous) {
--- End diff --

check against `generateRetraction`. The check can be optimized because 
`generateRetraction`is a `val` and hence `final`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

2017-04-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112078358
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
 ---
@@ -68,12 +70,35 @@ class GroupAggProcessFunction(
 var accumulators = state.value()
 
 if (null == accumulators) {
+  previous = null
   accumulators = new Row(aggregates.length)
   i = 0
   while (i < aggregates.length) {
 accumulators.setField(i, aggregates(i).createAccumulator())
 i += 1
   }
+} else {
+  // get previous row
+  if (generateRetraction) {
+if (null == previous) {
+  previous = new Row(groupings.length + aggregates.length)
--- End diff --

can we initialize `previous` in `open()`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

2017-04-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112065233
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
 ---
@@ -152,6 +161,7 @@ public void serialize(Row record, DataOutputView 
target) throws IOException {
fieldSerializers[i].serialize(o, target);
}
}
+   commandSerializer.serialize(record.command, target);
--- End diff --

Also by adding the command to `Row` we add serialization overhead to all 
jobs that use Row (including batch Table API / SQL).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

2017-04-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112081928
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala
 ---
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream
+
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, 
StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.flink.table.api.scala._
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.table.utils.TableFunc0
+
+import scala.collection.mutable
+
+/**
+  * tests for retraction
+  */
+class RetractionITCase extends StreamingWithStateTestBase {
+  // input data
+  val data = List(
+("Hello", 1),
+("word", 1),
+("Hello", 1),
+("bark", 1)
+  )
+
+  // keyed groupby + keyed groupby
+  @Test
+  def testWordCount(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.testResults = mutable.MutableList()
+env.setParallelism(1)
+env.setStateBackend(getStateBackend)
+
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'word, 'num)
+val resultTable = table
+  .groupBy('word)
+  .select('word as 'word, 'num.sum as 'count)
+  .groupBy('count)
+  .select('count, 'word.count as 'frequency)
+
+val results = resultTable.toDataStream[Row]
+results.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val expected = Seq("1,1", "1,2", "1,1", "2,1", "1,2")
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  // keyed groupby + non-keyed groupby
+  @Test
+  def testGroupByAndNonKeyedGroupBy(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.testResults = mutable.MutableList()
+env.setParallelism(1)
+env.setStateBackend(getStateBackend)
+
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'word, 'num)
+val resultTable = table
+  .groupBy('word)
+  .select('word as 'word, 'num.sum as 'count)
+  .select('count.sum)
+
+val results = resultTable.toDataStream[Row]
+results.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val expected = Seq("1", "2", "1", "3", "4")
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  // non-keyed groupby + keyed groupby
+  @Test
+  def testNonKeyedGroupByAndGroupBy(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.testResults = mutable.MutableList()
+env.setParallelism(1)
+env.setStateBackend(getStateBackend)
+
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'word, 'num)
+val resultTable = table
+  .select('num.sum as 'count)
+  .groupBy('count)
+  .select('count, 'count.count)
+
+val results = resultTable.toDataStream[Row]
+results.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val expected = Seq("1,1", "1,0", "2,1", "2,0", "3,1", "3,0", "4,1")
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  // keyed groupby + over agg(unbounded, procTime, keyed)
+  @Test
+  def 

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

2017-04-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112064885
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
 ---
@@ -107,6 +107,14 @@ class DataStreamGroupWindowAggregate(
 val groupingKeys = grouping.indices.toArray
 val inputDS = 
input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
 
+val consumeRetraction = DataStreamRetractionRules.isAccRetract(input)
+
+if (consumeRetraction) {
+  throw new TableException(
+"Retraction on group window is not supported yet. Note: Currently, 
group window should " +
+  "not follow an unbounded groupby.")
--- End diff --

`unbounded groupBy` -> `non-windowed GroupBy`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

2017-04-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112060500
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
 ---
@@ -152,6 +161,7 @@ public void serialize(Row record, DataOutputView 
target) throws IOException {
fieldSerializers[i].serialize(o, target);
}
}
+   commandSerializer.serialize(record.command, target);
--- End diff --

I'm afraid we cannot change the serialization of `Row`. `Row` is a public 
class in `flink-core` and not an internal `flink-table` class. Hence, it is 
used at other places and might also be part of user applications. If we change 
the serialization, users might not be able to restore a job on 1.3 from a 
savepoint taken with 1.2. This restriction rules out to simply add a field to 
`Row` which would avoid major refactorings.

I see two options to add the command field to the data streams in 
`flink-table`

1. use a regular field in `Row`. This would mean that the physical layout 
of the `Row` is no longer the same as the logical layout, i.e., the one 
expected by Calcite. However, we will probably change this anyway for the 
upcoming changes related to the time indicators. For these, the physical layout 
will have fewer fields than the logical layout (we will remove time fields 
which are in the meta data of Flink's records or taken as processing time). By 
adding the command field, we would add a field which is not in the logical 
layout. The problem with this approach is that the command field would be at 
different positions in the Row (probably the last one). We could leverage the 
changes introduced by the time indicator changes (or the other way round). 
@twalthr is working on this. You can have a look at the current status here: 
https://github.com/twalthr/flink/tree/FLINK-5884
2. The other option is to wrap the rows in a custom data type similar to a 
`Tuple2[Row, Command]`. The data type could be names `Change` or `CRow` and 
would have its own `TypeInformation`, `TypeSerializer`, and `TypeComparator` 
which forward most calls to the type info, serializer, and comparator of `Row`. 
The problem with this approach is that we need to change the return types of 
all functions. For some functions this might not be a big issue if we can take 
the `Row` object before passing it to the code gen'd functions. The command 
field could be set when the result Row is returned or in a wrapping 
`Collector`. 

My gut feeling is that the second approach is easier to implement because 
we (hopefully) do not need to touch the generated code and "just" need to wrap 
all `Row` objects in `CRow` objects.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6032) CEP-Clean up the operator state when not needed.

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973569#comment-15973569
 ] 

ASF GitHub Bot commented on FLINK-6032:
---

Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/3541#discussion_r112071560
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java ---
@@ -31,6 +31,18 @@ public double getVolume() {
}
 
@Override
+   public boolean equals(Object obj) {
+   return obj instanceof SubEvent &&
+   super.equals(obj) &&
+   ((SubEvent) obj).volume == volume;
+   }
+
+   @Override
+   public int hashCode() {
+   return super.hashCode() + (int) volume;
--- End diff --

Common practice is to multiply super.hashCode() by a prime (e.g. 37)


> CEP-Clean up the operator state when not needed.
> 
>
> Key: FLINK-6032
> URL: https://issues.apache.org/jira/browse/FLINK-6032
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3541: [FLINK-6032] [cep] Clean-up operator state when no...

2017-04-18 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/3541#discussion_r112071560
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java ---
@@ -31,6 +31,18 @@ public double getVolume() {
}
 
@Override
+   public boolean equals(Object obj) {
+   return obj instanceof SubEvent &&
+   super.equals(obj) &&
+   ((SubEvent) obj).volume == volume;
+   }
+
+   @Override
+   public int hashCode() {
+   return super.hashCode() + (int) volume;
--- End diff --

Common practice is to multiply super.hashCode() by a prime (e.g. 37)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6032) CEP-Clean up the operator state when not needed.

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973566#comment-15973566
 ] 

ASF GitHub Bot commented on FLINK-6032:
---

Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/3541#discussion_r112071274
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 ---
@@ -385,4 +393,25 @@ public int hashCode() {
return getClass().hashCode();
}
}
+
+   //  Testing Methods 
//
+
+   @VisibleForTesting
+   public boolean hasNonEmptyNFA(KEY key) throws IOException {
+   setCurrentKey(key);
+   return nfaOperatorState.value() != null;
+   }
+
+   @VisibleForTesting
+   public boolean hasNonEmptyPQ(KEY key) throws IOException {
--- End diff --

These 3 methods can be declared package private.


> CEP-Clean up the operator state when not needed.
> 
>
> Key: FLINK-6032
> URL: https://issues.apache.org/jira/browse/FLINK-6032
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3541: [FLINK-6032] [cep] Clean-up operator state when no...

2017-04-18 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/3541#discussion_r112071274
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 ---
@@ -385,4 +393,25 @@ public int hashCode() {
return getClass().hashCode();
}
}
+
+   //  Testing Methods 
//
+
+   @VisibleForTesting
+   public boolean hasNonEmptyNFA(KEY key) throws IOException {
+   setCurrentKey(key);
+   return nfaOperatorState.value() != null;
+   }
+
+   @VisibleForTesting
+   public boolean hasNonEmptyPQ(KEY key) throws IOException {
--- End diff --

These 3 methods can be declared package private.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112065478
  
--- Diff: flink-metrics/flink-metrics-datadog/pom.xml ---
@@ -0,0 +1,85 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+   
+   org.apache.flink
+   flink-metrics
+   1.3-SNAPSHOT
+   ..
+   
+
+org.apache.flink
+flink-metrics-datadog
+1.3-SNAPSHOT
+
+   
+   
+   org.apache.flink
+   flink-metrics-core
+   ${project.version}
+   provided
+   
+
+   
+   com.google.guava
--- End diff --

You should shade this away to make sure there aren't any dependency 
conflicts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112065330
  
--- Diff: docs/monitoring/metrics.md ---
@@ -436,6 +436,38 @@ metrics.reporter.stsd.port: 8125
 
 {% endhighlight %}
 
+### Datadog (org.apache.flink.metrics.datadog.DatadogHttpReporter)
+
+In order to use this reporter you must copy 
`/opt/flink-metrics-datadog-{{site.version}}.jar` into the `/lib` folder
+of your Flink distribution.
+
+Parameters:
+
+- `apikey` - the Datadog API key
+- `tags` - (optional) the global tags that will be applied to metrics when 
sending to Datadog. Tags should be separated by comma only
+
+Example configuration:
+
+{% highlight yaml %}
+
+metrics.reporters: dghttp
+metrics.reporter.dghttp.class: 
org.apache.flink.metrics.datadog.DatadogHttpReporter
+metrics.reporter.dghttp.apikey: xxx
+metrics.reporter.dghttp.tags: myflinkapp,prod
+
+// , , , , , 
 will be sent to Datadog as tags
+metrics.scope.jm: .jobmanager
+metrics.scope.jm.job: ..jobmanager.job
+metrics.scope.tm: ..taskmanager
+metrics.scope.tm.job: ...taskmanager.job
+metrics.scope.task: 
.task
+metrics.scope.operator: 
.operator
+
+{% endhighlight %}
+
+Such metric reporting implementation is a best practice based on our 
experience working with Datadog. It helps 
--- End diff --

Please remove this, it doesn't belong in the documentation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112065890
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/metric/DGauge.java
 ---
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog.metric;
+
+import java.util.List;
+
+/**
+ * Mapping of gauge between Flink and Datadog
+ * */
+public class DGauge extends DMetric {
--- End diff --

What's the benefit in differentiating between DCounters and DGauges?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3736: [Flink-6013][metrics] Add Datadog HTTP metrics reporter

2017-04-18 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3736
  
If you want to enforce a specific scope config then i would suggest to 
instead ignore the configured one (which means not using 
`MetricGroup#getMetricIdentifier()`) and instead extracting the variables you 
want from `MetricGroup#getAllVariables()`. The JMXReporter does this as well 
and is one example of a reporter that completely ignores scope formats. It 
builds a series key-value pairs containing the variables for every metric, not 
unlike this reporter.

You should be able to support Meters, as the DCounter works with Numbers. 
Only exporting the rate is perfectly reasonable. You don't necessarily have to 
ignore Histograms; you can instead create a number of DCounters for every 
property that is extracted from the HistogramStatistics; it's a bit more work 
but doable imo.

Instead of storing the actual value in the DMetric why not store the Flink 
metric instead? When `DMetric#getPoints()` is called you could retrieve the 
value form the metric, you wouldn't have to update anything manually.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3736: [Flink-6013][metrics] Add Datadog HTTP metrics reporter

2017-04-18 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/3736
  
Great advice. Thanks, @zentol !  

My questions to your suggestions:
1. If I make DatadogHttpReport implement MetricReporter interface, shall I 
just ignore Histogram and Meter which are not supported by Datadog Http API and 
document that?
2. For creating DGauges/DCounters. I believe you are correct. I can create 
new objects upon metrics registration, and continue update their values, right?
3. About parsing tags. Great to know I can do it via MetricGroup! Thus, 
shall I just document the metrics scope configs should be as the following? 

```
metrics.scope.jm: jobmanager
metrics.scope.jm.job: jobmanager.job
metrics.scope.tm: taskmanager
metrics.scope.tm.job: taskmanager.job
metrics.scope.task: task
metrics.scope.operator: operator
```




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3737: Release 1.2

2017-04-18 Thread kimlng
Github user kimlng commented on the issue:

https://github.com/apache/flink/pull/3737
  
Pulled request accidentally. Need to revert this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3737: Release 1.2

2017-04-18 Thread kimlng
Github user kimlng closed the pull request at:

https://github.com/apache/flink/pull/3737


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3737: Release 1.2

2017-04-18 Thread kimlng
GitHub user kimlng opened a pull request:

https://github.com/apache/flink/pull/3737

Release 1.2

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apache/flink release-1.2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3737.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3737


commit 119d39b6edac6a0a80f90bb07794eca1f31425f7
Author: mtunique 
Date:   2017-01-12T03:55:57Z

[FLINK-5448] [checkpoints] Fix typo in StateAssignmentOperation Exception

This closes #3097

commit 30b467f266970e267792411d3148b4379ec23439
Author: mtunique 
Date:   2017-01-12T04:02:49Z

[FLINK-5438] [streaming api] Typo in JobGraph generator Exception

This closes #3098

commit 6b3c683450eb7aee1c9c65be75a0fddb06cea2ce
Author: Greg Hogan 
Date:   2017-01-13T18:33:29Z

[FLINK-5485] [webfrontend] Mark compiled web frontend files as binary when 
processed by git diff

Particularly beneficial now that javascript is minified, we can mark
compiled web frontend files as binary when processed by git diff.
  https://linux.die.net/man/5/gitattributes

This does not affect how files are displayed by github.

This closes #3122

commit b1be3f5c3c9e7410d92c74422b10a6efb42fd4d5
Author: Stephan Ewen 
Date:   2017-01-11T20:05:57Z

[FLINK-5345] [core] Add a utility to delete directories without failing in 
the presence of concurrent deletes

commit d1b86aab09061627d8b8c8f99b4277cc60e3dc28
Author: Stephan Ewen 
Date:   2017-01-12T09:49:13Z

[FLINK-5345] [core] Migrate various cleanup calls to concurrency-safe 
directory deletion

commit 27c11e1b79bd68cbd2e8275c7938478e2e9532e6
Author: Aleksandr Chermenin 
Date:   2016-12-16T11:42:50Z

[FLINK-3617] [scala apis] Added null value check.

commit 2eb926f2bed5723f160620b94f3b67e5dc418387
Author: Stephan Ewen 
Date:   2017-01-16T19:17:13Z

[FLINK-4959] [docs] Add documentation for ProcessFunction

commit f3aea01eb6ebecbb58c368cae9006a7831f07f41
Author: twalthr 
Date:   2017-01-13T14:22:25Z

[FLINK-5447] [table] Sync documentation of built-in functions for Table API 
with SQL

This closes #3126.

commit 02b0e65034cb74944aebff33d44470483a5f26e7
Author: beyond1920 
Date:   2016-12-29T07:52:17Z

[FLINK-5394] [table] Fix row count estimation

This closes #3058.

commit 61ac3605f6e56f49b83891c0f348b3fbddb2e075
Author: Kurt Young 
Date:   2017-01-05T03:32:04Z

[FLINK-5144] [table] Fix error while applying rule 
AggregateJoinTransposeRule

This closes #3062.

commit aad8d253c9f7dd33a63310353cbe132cd4900c6b
Author: Jakub Havlik 
Date:   2017-01-17T07:26:07Z

[FLINK-5518] [hadoopCompat] Add null check to HadoopInputFormatBase.close().

This closes #3133

commit 5f81d20ba1a7ea789af973c0e215cef382a621a6
Author: Keiji Yoshida 
Date:   2017-01-18T05:22:17Z

[hotfix] [docs] Several fixes on "Basic API Concepts".

- Fix a wrong function name `groupBy()` to `keyBy()`.
- Add closing parentheses.
- Fix an invalid return type of sample code.
- Remove a duplicate "their".

This closes #3145.
This closes #3146.
This closes #3147.
This closes #3148.

commit 55483b71f36b84ac57d03a9b83e0e9d9b9b98eab
Author: Ufuk Celebi 
Date:   2017-01-17T18:10:33Z

[FLINK-5484] [serialization] Add test for registered Kryo types

commit a7644b1716c54dcc9a5535307f7df983c79711cf

[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973424#comment-15973424
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r112002408
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala
 ---
@@ -38,74 +38,58 @@ import org.apache.flink.util.{Collector, Preconditions}
   * it does no final aggregate evaluation. It also includes the logic of
   * [[DataSetSlideTimeWindowAggFlatMapFunction]].
   *
-  * @param aggregates aggregate functions
-  * @param groupingKeysLength number of grouping keys
-  * @param timeFieldPos position of aligned time field
+  * @param genAggregations Code-generated [[GeneratedAggregations]]
+  * @param keysAndAggregatesArity The total arity of keys and aggregates
   * @param windowSize window size of the sliding window
   * @param windowSlide window slide of the sliding window
   * @param returnType return type of this function
   */
 class DataSetSlideTimeWindowAggReduceGroupFunction(
-private val aggregates: Array[AggregateFunction[_ <: Any]],
-private val groupingKeysLength: Int,
-private val timeFieldPos: Int,
+private val genAggregations: GeneratedAggregationsFunction,
+private val keysAndAggregatesArity: Int,
 private val windowSize: Long,
 private val windowSlide: Long,
 @transient private val returnType: TypeInformation[Row])
   extends RichGroupReduceFunction[Row, Row]
   with CombineFunction[Row, Row]
-  with ResultTypeQueryable[Row] {
+  with ResultTypeQueryable[Row]
+  with Compiler[GeneratedAggregations] {
 
-  Preconditions.checkNotNull(aggregates)
+  private val timeFieldPos = returnType.getArity - 1
+  private val intermediateWindowStartPos = keysAndAggregatesArity
 
   protected var intermediateRow: Row = _
-  // add one field to store window start
-  protected val intermediateRowArity: Int = groupingKeysLength + 
aggregates.length + 1
-  protected val accumulatorList: Array[JArrayList[Accumulator]] = 
Array.fill(aggregates.length) {
-new JArrayList[Accumulator](2)
-  }
-  private val intermediateWindowStartPos: Int = intermediateRowArity - 1
+  private var accumulators: Row = _
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+  private var function: GeneratedAggregations = _
 
   override def open(config: Configuration) {
-intermediateRow = new Row(intermediateRowArity)
-
-// init lists with two empty accumulators
-var i = 0
-while (i < aggregates.length) {
-  val accumulator = aggregates(i).createAccumulator()
-  accumulatorList(i).add(accumulator)
-  accumulatorList(i).add(accumulator)
-  i += 1
-}
+LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
+s"Code:\n$genAggregations.code")
+val clazz = compile(
+  getClass.getClassLoader,
+  genAggregations.name,
+  genAggregations.code)
+LOG.debug("Instantiating AggregateHelper.")
+function = clazz.newInstance()
+
+accumulators = function.createAccumulators()
+intermediateRow = function.createOutputRow()
   }
 
   override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = 
{
 
 // reset first accumulator
-var i = 0
-while (i < aggregates.length) {
-  val accumulator = aggregates(i).createAccumulator()
-  accumulatorList(i).set(0, accumulator)
-  i += 1
-}
+function.resetAccumulator(accumulators)
 
 val iterator = records.iterator()
 
 while (iterator.hasNext) {
   val record = iterator.next()
 
   // accumulate
-  i = 0
-  while (i < aggregates.length) {
-// insert received accumulator into acc list
-val newAcc = record.getField(groupingKeysLength + 
i).asInstanceOf[Accumulator]
-accumulatorList(i).set(1, newAcc)
-// merge acc list
-val retAcc = aggregates(i).merge(accumulatorList(i))
-// insert result into acc list
-accumulatorList(i).set(0, retAcc)
-i += 1
-  }
+  function.mergeAccumulatorsPairWithKeyOffset(accumulators, record)
 
   // trigger tumbling evaluation
   if (!iterator.hasNext) {
--- End diff --

move this behind the loop


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: 

[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...

2017-04-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r112002278
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala
 ---
@@ -145,44 +121,23 @@ class DataSetSlideTimeWindowAggReduceGroupFunction(
   override def combine(records: Iterable[Row]): Row = {
 
 // reset first accumulator
-var i = 0
-while (i < aggregates.length) {
-  aggregates(i).resetAccumulator(accumulatorList(i).get(0))
-  i += 1
-}
+function.resetAccumulator(accumulators)
 
 val iterator = records.iterator()
+
 while (iterator.hasNext) {
   val record = iterator.next()
 
-  i = 0
-  while (i < aggregates.length) {
-// insert received accumulator into acc list
-val newAcc = record.getField(groupingKeysLength + 
i).asInstanceOf[Accumulator]
-accumulatorList(i).set(1, newAcc)
-// merge acc list
-val retAcc = aggregates(i).merge(accumulatorList(i))
-// insert result into acc list
-accumulatorList(i).set(0, retAcc)
-i += 1
-  }
+  function.mergeAccumulatorsPairWithKeyOffset(accumulators, record)
 
   // check if this record is the last record
   if (!iterator.hasNext) {
--- End diff --

move this behind the loop


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...

2017-04-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r111986866
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -263,33 +263,56 @@ class CodeGenerator(
  aggFields: Array[Array[Int]],
  aggMapping: Array[Int],
  fwdMapping: Array[(Int, Int)],
- outputArity: Int)
+ outputArity: Int,
+ groupingKeys: Array[Int])
   : GeneratedAggregationsFunction = {
 
 def genSetAggregationResults(
   accTypes: Array[String],
   aggs: Array[String],
   aggMapping: Array[Int]): String = {
 
-  val sig: String =
+  val sigHelper: String =
 j"""
-|  public void setAggregationResults(
-|org.apache.flink.types.Row accs,
-|org.apache.flink.types.Row output)""".stripMargin
+   |  private final void setAggregationResultsHelper(
+   |org.apache.flink.types.Row accs,
+   |org.apache.flink.types.Row output,
+   |java.lang.Integer offset)""".stripMargin
 
-  val setAggs: String = {
+  val setAggsHelper: String = {
 for (i <- aggs.indices) yield
   j"""
  |org.apache.flink.table.functions.AggregateFunction 
baseClass$i =
  |  (org.apache.flink.table.functions.AggregateFunction) 
${aggs(i)};
  |
  |output.setField(
- |  ${aggMapping(i)},
+ |  ${aggMapping(i)} + offset,
  |  baseClass$i.getValue((${accTypes(i)}) 
accs.getField($i)));""".stripMargin
   }.mkString("\n")
 
-  j"""$sig {
- |$setAggs
+  val setAggregationResults: String =
+j"""
+   |  public void setAggregationResults(
+   |org.apache.flink.types.Row accs,
+   |org.apache.flink.types.Row output) {
+   |setAggregationResultsHelper(accs, output, 0);
--- End diff --

Code generated methods should be as "flat" as possible. Calling other 
helper methods adds overhead compared to inlining the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973421#comment-15973421
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r111999493
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
 ---
@@ -110,12 +110,8 @@ class DataSetSessionWindowAggregatePreProcessor(
 var windowEnd: java.lang.Long = null
--- End diff --

Move implementation to `combine()` can forward the `mapPartition()` call to 
`combine()`


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...

2017-04-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r112002408
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala
 ---
@@ -38,74 +38,58 @@ import org.apache.flink.util.{Collector, Preconditions}
   * it does no final aggregate evaluation. It also includes the logic of
   * [[DataSetSlideTimeWindowAggFlatMapFunction]].
   *
-  * @param aggregates aggregate functions
-  * @param groupingKeysLength number of grouping keys
-  * @param timeFieldPos position of aligned time field
+  * @param genAggregations Code-generated [[GeneratedAggregations]]
+  * @param keysAndAggregatesArity The total arity of keys and aggregates
   * @param windowSize window size of the sliding window
   * @param windowSlide window slide of the sliding window
   * @param returnType return type of this function
   */
 class DataSetSlideTimeWindowAggReduceGroupFunction(
-private val aggregates: Array[AggregateFunction[_ <: Any]],
-private val groupingKeysLength: Int,
-private val timeFieldPos: Int,
+private val genAggregations: GeneratedAggregationsFunction,
+private val keysAndAggregatesArity: Int,
 private val windowSize: Long,
 private val windowSlide: Long,
 @transient private val returnType: TypeInformation[Row])
   extends RichGroupReduceFunction[Row, Row]
   with CombineFunction[Row, Row]
-  with ResultTypeQueryable[Row] {
+  with ResultTypeQueryable[Row]
+  with Compiler[GeneratedAggregations] {
 
-  Preconditions.checkNotNull(aggregates)
+  private val timeFieldPos = returnType.getArity - 1
+  private val intermediateWindowStartPos = keysAndAggregatesArity
 
   protected var intermediateRow: Row = _
-  // add one field to store window start
-  protected val intermediateRowArity: Int = groupingKeysLength + 
aggregates.length + 1
-  protected val accumulatorList: Array[JArrayList[Accumulator]] = 
Array.fill(aggregates.length) {
-new JArrayList[Accumulator](2)
-  }
-  private val intermediateWindowStartPos: Int = intermediateRowArity - 1
+  private var accumulators: Row = _
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+  private var function: GeneratedAggregations = _
 
   override def open(config: Configuration) {
-intermediateRow = new Row(intermediateRowArity)
-
-// init lists with two empty accumulators
-var i = 0
-while (i < aggregates.length) {
-  val accumulator = aggregates(i).createAccumulator()
-  accumulatorList(i).add(accumulator)
-  accumulatorList(i).add(accumulator)
-  i += 1
-}
+LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
+s"Code:\n$genAggregations.code")
+val clazz = compile(
+  getClass.getClassLoader,
+  genAggregations.name,
+  genAggregations.code)
+LOG.debug("Instantiating AggregateHelper.")
+function = clazz.newInstance()
+
+accumulators = function.createAccumulators()
+intermediateRow = function.createOutputRow()
   }
 
   override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = 
{
 
 // reset first accumulator
-var i = 0
-while (i < aggregates.length) {
-  val accumulator = aggregates(i).createAccumulator()
-  accumulatorList(i).set(0, accumulator)
-  i += 1
-}
+function.resetAccumulator(accumulators)
 
 val iterator = records.iterator()
 
 while (iterator.hasNext) {
   val record = iterator.next()
 
   // accumulate
-  i = 0
-  while (i < aggregates.length) {
-// insert received accumulator into acc list
-val newAcc = record.getField(groupingKeysLength + 
i).asInstanceOf[Accumulator]
-accumulatorList(i).set(1, newAcc)
-// merge acc list
-val retAcc = aggregates(i).merge(accumulatorList(i))
-// insert result into acc list
-accumulatorList(i).set(0, retAcc)
-i += 1
-  }
+  function.mergeAccumulatorsPairWithKeyOffset(accumulators, record)
 
   // trigger tumbling evaluation
   if (!iterator.hasNext) {
--- End diff --

move this behind the loop


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973412#comment-15973412
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r111986866
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -263,33 +263,56 @@ class CodeGenerator(
  aggFields: Array[Array[Int]],
  aggMapping: Array[Int],
  fwdMapping: Array[(Int, Int)],
- outputArity: Int)
+ outputArity: Int,
+ groupingKeys: Array[Int])
   : GeneratedAggregationsFunction = {
 
 def genSetAggregationResults(
   accTypes: Array[String],
   aggs: Array[String],
   aggMapping: Array[Int]): String = {
 
-  val sig: String =
+  val sigHelper: String =
 j"""
-|  public void setAggregationResults(
-|org.apache.flink.types.Row accs,
-|org.apache.flink.types.Row output)""".stripMargin
+   |  private final void setAggregationResultsHelper(
+   |org.apache.flink.types.Row accs,
+   |org.apache.flink.types.Row output,
+   |java.lang.Integer offset)""".stripMargin
 
-  val setAggs: String = {
+  val setAggsHelper: String = {
 for (i <- aggs.indices) yield
   j"""
  |org.apache.flink.table.functions.AggregateFunction 
baseClass$i =
  |  (org.apache.flink.table.functions.AggregateFunction) 
${aggs(i)};
  |
  |output.setField(
- |  ${aggMapping(i)},
+ |  ${aggMapping(i)} + offset,
  |  baseClass$i.getValue((${accTypes(i)}) 
accs.getField($i)));""".stripMargin
   }.mkString("\n")
 
-  j"""$sig {
- |$setAggs
+  val setAggregationResults: String =
+j"""
+   |  public void setAggregationResults(
+   |org.apache.flink.types.Row accs,
+   |org.apache.flink.types.Row output) {
+   |setAggregationResultsHelper(accs, output, 0);
--- End diff --

Code generated methods should be as "flat" as possible. Calling other 
helper methods adds overhead compared to inlining the code.


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973425#comment-15973425
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r112007816
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
 ---
@@ -36,6 +36,15 @@ abstract class GeneratedAggregations extends Function {
   def setAggregationResults(accumulators: Row, output: Row)
 
   /**
+* Calculates the results from accumulators, and set the results to the 
output (with key offset)
+*
+* @param accumulators the accumulators (saved in a row) which contains 
the current
+* aggregated results
+* @param output   output results collected in a row
+*/
+  def setAggregationResultsWithKeyOffset(accumulators: Row, output: Row)
--- End diff --

Actually, I'm not sure if we really need to implement a different code 
generation function. I had a look at the code generation code and think that we 
could just add a few more parameters to the current code gen method. Right now, 
the behavior of most generated methods can be exactly defined:

- `createAccumulators()`: generates a `Row` with the accumulators for each 
provided `AggregationFunction`. Some methods to `GeneratedAggregations` expect 
a Row of accumulators with exactly this layout as one of their input 
parameters. In the following, this parameter is called `accs`.
- `accumulate(accs, row)`: The `aggFields` parameter controls which fields 
of `row` are accumulated into which accumulator. We should rename this 
parameter to `accFields` though, IMO.
- `retract(accs, row)`: same as for `accumulate`. We should add a separate 
parameter `retractFields: Array[Int]` though.
- `setForwardedFields(input, output)`: The `fwdMapping` parameter controls 
which field of the input row is copied to which position of the output row. We 
could add an optional parameter to copy the `groupSetMapping` to the output as 
well.
- `setAggregationResults(accs, output)`: The `aggMapping` parameter 
controls to which output fields the aggregation results are copied. If we add 
another parameter `partialResults: Boolean`, we can control whether to copy 
final results (`AggregateFunction.getValue()`) or partial results (the 
accumulator).
- `createOutputRow()`: the `outputArity` parameter specfies the arity of 
the output row.
- `mergeAccumulatorsPair(accs, other)`: **This is the only inflexible 
method**. We could change the behavior of the method as follows: The method 
expects as first parameter (`accs`) a Row with the same layout as generated by 
`createAccumulators`. The second parameter can be any row with accumulators at 
arbitrary positions. To enable the merging, we add a parameter `mergeMapping: 
Array[Int]` to the code generating function which defines which fields of the 
`other` parameter are merged with the fields in the `accs` Row. The method 
returns a Row with the default layout (as generated by `createAccumulators()`).
- `resetAccumulator(accs)`: resets a Row of accumulators of the known 
layout.

I haven't checked this thoroughly, but I think with these parameters, we 
can control the generated code sufficiently to support all aggregation 
operators for DataSet and DataStream, i.e., we can generate the currently 
existing functions such that they behave as the more specialized ones that you 
added. Since all code gen parameters (`accFields`, `retractFields`, 
`fwdMapping`, `groupSetMapping`, `aggMapping`, `partialResults`, `outputArity`, 
`mergeMapping`) can be independently set for each type of operator, this should 
give us the flexibility for all types for operators. We only need to 
parameterize the code generation method appropriately. 

In addition, we could make all parameters `Option` and generate empty 
methods if the parameters for a function are not set. (This could also be a 
follow up issue, IMO)

What do you think @shaoxuan-wang ?



> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973423#comment-15973423
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r112002140
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala
 ---
@@ -30,78 +30,46 @@ import org.apache.flink.types.Row
   *
   * It is used for sliding on batch for both time and count-windows.
   *
-  * @param aggregates aggregate functions.
-  * @param groupKeysMapping index mapping of group keys between 
intermediate aggregate Row
-  * and output Row.
-  * @param aggregateMapping index mapping between aggregate function list 
and aggregated value
-  * index in output Row.
-  * @param finalRowArity output row field count
+  * @param genAggregations Code-generated [[GeneratedAggregations]]
+  * @param keysAndAggregatesArity The total arity of keys and aggregates
   * @param finalRowWindowStartPos relative window-start position to last 
field of output row
   * @param finalRowWindowEndPos relative window-end position to last field 
of output row
   * @param windowSize size of the window, used to determine window-end for 
output row
   */
 class DataSetSlideWindowAggReduceCombineFunction(
-aggregates: Array[AggregateFunction[_ <: Any]],
-groupKeysMapping: Array[(Int, Int)],
-aggregateMapping: Array[(Int, Int)],
-finalRowArity: Int,
+genAggregations: GeneratedAggregationsFunction,
+keysAndAggregatesArity: Int,
 finalRowWindowStartPos: Option[Int],
 finalRowWindowEndPos: Option[Int],
 windowSize: Long)
   extends DataSetSlideWindowAggReduceGroupFunction(
-aggregates,
-groupKeysMapping,
-aggregateMapping,
-finalRowArity,
+genAggregations,
+keysAndAggregatesArity,
 finalRowWindowStartPos,
 finalRowWindowEndPos,
 windowSize)
   with CombineFunction[Row, Row] {
 
-  private val intermediateRowArity: Int = groupKeysMapping.length + 
aggregateMapping.length + 1
-  private val intermediateRow: Row = new Row(intermediateRowArity)
+  private val intermediateRow: Row = new Row(keysAndAggregatesArity + 1)
 
   override def combine(records: Iterable[Row]): Row = {
 
-// reset first accumulator
-var i = 0
-while (i < aggregates.length) {
-  aggregates(i).resetAccumulator(accumulatorList(i).get(0))
-  i += 1
-}
+// reset accumulator
+function.resetAccumulator(accumulators)
 
 val iterator = records.iterator()
 while (iterator.hasNext) {
   val record = iterator.next()
--- End diff --

make `record` a `var` and declare it outside of the loop.


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973418#comment-15973418
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r112001966
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala
 ---
@@ -30,78 +30,46 @@ import org.apache.flink.types.Row
   *
   * It is used for sliding on batch for both time and count-windows.
   *
-  * @param aggregates aggregate functions.
-  * @param groupKeysMapping index mapping of group keys between 
intermediate aggregate Row
-  * and output Row.
-  * @param aggregateMapping index mapping between aggregate function list 
and aggregated value
-  * index in output Row.
-  * @param finalRowArity output row field count
+  * @param genAggregations Code-generated [[GeneratedAggregations]]
+  * @param keysAndAggregatesArity The total arity of keys and aggregates
   * @param finalRowWindowStartPos relative window-start position to last 
field of output row
   * @param finalRowWindowEndPos relative window-end position to last field 
of output row
   * @param windowSize size of the window, used to determine window-end for 
output row
   */
 class DataSetSlideWindowAggReduceCombineFunction(
-aggregates: Array[AggregateFunction[_ <: Any]],
-groupKeysMapping: Array[(Int, Int)],
-aggregateMapping: Array[(Int, Int)],
-finalRowArity: Int,
+genAggregations: GeneratedAggregationsFunction,
+keysAndAggregatesArity: Int,
 finalRowWindowStartPos: Option[Int],
 finalRowWindowEndPos: Option[Int],
 windowSize: Long)
   extends DataSetSlideWindowAggReduceGroupFunction(
-aggregates,
-groupKeysMapping,
-aggregateMapping,
-finalRowArity,
+genAggregations,
+keysAndAggregatesArity,
 finalRowWindowStartPos,
 finalRowWindowEndPos,
 windowSize)
   with CombineFunction[Row, Row] {
 
-  private val intermediateRowArity: Int = groupKeysMapping.length + 
aggregateMapping.length + 1
-  private val intermediateRow: Row = new Row(intermediateRowArity)
+  private val intermediateRow: Row = new Row(keysAndAggregatesArity + 1)
 
   override def combine(records: Iterable[Row]): Row = {
 
-// reset first accumulator
-var i = 0
-while (i < aggregates.length) {
-  aggregates(i).resetAccumulator(accumulatorList(i).get(0))
-  i += 1
-}
+// reset accumulator
+function.resetAccumulator(accumulators)
 
 val iterator = records.iterator()
 while (iterator.hasNext) {
   val record = iterator.next()
 
-  // accumulate
-  i = 0
-  while (i < aggregates.length) {
-// insert received accumulator into acc list
-val newAcc = record.getField(groupKeysMapping.length + 
i).asInstanceOf[Accumulator]
-accumulatorList(i).set(1, newAcc)
-// merge acc list
-val retAcc = aggregates(i).merge(accumulatorList(i))
-// insert result into acc list
-accumulatorList(i).set(0, retAcc)
-i += 1
-  }
+  function.mergeAccumulatorsPairWithKeyOffset(accumulators, record)
 
   // check if this record is the last record
   if (!iterator.hasNext) {
--- End diff --

move this behind the loop to save the check of the condition in the loop 
body.


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...

2017-04-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r111990633
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -263,33 +263,56 @@ class CodeGenerator(
  aggFields: Array[Array[Int]],
  aggMapping: Array[Int],
  fwdMapping: Array[(Int, Int)],
- outputArity: Int)
+ outputArity: Int,
+ groupingKeys: Array[Int])
   : GeneratedAggregationsFunction = {
 
 def genSetAggregationResults(
   accTypes: Array[String],
   aggs: Array[String],
   aggMapping: Array[Int]): String = {
 
-  val sig: String =
+  val sigHelper: String =
 j"""
-|  public void setAggregationResults(
-|org.apache.flink.types.Row accs,
-|org.apache.flink.types.Row output)""".stripMargin
+   |  private final void setAggregationResultsHelper(
+   |org.apache.flink.types.Row accs,
+   |org.apache.flink.types.Row output,
+   |java.lang.Integer offset)""".stripMargin
 
-  val setAggs: String = {
+  val setAggsHelper: String = {
 for (i <- aggs.indices) yield
   j"""
  |org.apache.flink.table.functions.AggregateFunction 
baseClass$i =
  |  (org.apache.flink.table.functions.AggregateFunction) 
${aggs(i)};
  |
  |output.setField(
- |  ${aggMapping(i)},
+ |  ${aggMapping(i)} + offset,
--- End diff --

`${aggMapping(i)} + offset` -> `${aggMapping(i) + offset}` to add the 
constant `offset` to the mapping before generating the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...

2017-04-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r112001966
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala
 ---
@@ -30,78 +30,46 @@ import org.apache.flink.types.Row
   *
   * It is used for sliding on batch for both time and count-windows.
   *
-  * @param aggregates aggregate functions.
-  * @param groupKeysMapping index mapping of group keys between 
intermediate aggregate Row
-  * and output Row.
-  * @param aggregateMapping index mapping between aggregate function list 
and aggregated value
-  * index in output Row.
-  * @param finalRowArity output row field count
+  * @param genAggregations Code-generated [[GeneratedAggregations]]
+  * @param keysAndAggregatesArity The total arity of keys and aggregates
   * @param finalRowWindowStartPos relative window-start position to last 
field of output row
   * @param finalRowWindowEndPos relative window-end position to last field 
of output row
   * @param windowSize size of the window, used to determine window-end for 
output row
   */
 class DataSetSlideWindowAggReduceCombineFunction(
-aggregates: Array[AggregateFunction[_ <: Any]],
-groupKeysMapping: Array[(Int, Int)],
-aggregateMapping: Array[(Int, Int)],
-finalRowArity: Int,
+genAggregations: GeneratedAggregationsFunction,
+keysAndAggregatesArity: Int,
 finalRowWindowStartPos: Option[Int],
 finalRowWindowEndPos: Option[Int],
 windowSize: Long)
   extends DataSetSlideWindowAggReduceGroupFunction(
-aggregates,
-groupKeysMapping,
-aggregateMapping,
-finalRowArity,
+genAggregations,
+keysAndAggregatesArity,
 finalRowWindowStartPos,
 finalRowWindowEndPos,
 windowSize)
   with CombineFunction[Row, Row] {
 
-  private val intermediateRowArity: Int = groupKeysMapping.length + 
aggregateMapping.length + 1
-  private val intermediateRow: Row = new Row(intermediateRowArity)
+  private val intermediateRow: Row = new Row(keysAndAggregatesArity + 1)
 
   override def combine(records: Iterable[Row]): Row = {
 
-// reset first accumulator
-var i = 0
-while (i < aggregates.length) {
-  aggregates(i).resetAccumulator(accumulatorList(i).get(0))
-  i += 1
-}
+// reset accumulator
+function.resetAccumulator(accumulators)
 
 val iterator = records.iterator()
 while (iterator.hasNext) {
   val record = iterator.next()
 
-  // accumulate
-  i = 0
-  while (i < aggregates.length) {
-// insert received accumulator into acc list
-val newAcc = record.getField(groupKeysMapping.length + 
i).asInstanceOf[Accumulator]
-accumulatorList(i).set(1, newAcc)
-// merge acc list
-val retAcc = aggregates(i).merge(accumulatorList(i))
-// insert result into acc list
-accumulatorList(i).set(0, retAcc)
-i += 1
-  }
+  function.mergeAccumulatorsPairWithKeyOffset(accumulators, record)
 
   // check if this record is the last record
   if (!iterator.hasNext) {
--- End diff --

move this behind the loop to save the check of the condition in the loop 
body.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973416#comment-15973416
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r111995370
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala
 ---
@@ -68,23 +72,16 @@ class DataSetAggFunction(
   override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = 
{
 
 // create accumulators
-var i = 0
-while (i < aggregates.length) {
-  accumulators(i) = aggregates(i).createAccumulator()
-  i += 1
-}
+accumulators = function.createAccumulators()
 
 val iterator = records.iterator()
 
 while (iterator.hasNext) {
   val record = iterator.next()
--- End diff --

we can make `record` a `var` and move its definition outside of the loop.
Then we can get rid of the `if (!iterator.hasNext)` check in the body of 
the while loop and set the `output` fields after the loop has terminated. 


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...

2017-04-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r112003763
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala
 ---
@@ -25,58 +25,56 @@ import 
org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
-import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.table.codegen.{Compiler, 
GeneratedAggregationsFunction}
 import org.apache.flink.types.Row
-import org.apache.flink.util.Preconditions
-
+import org.slf4j.LoggerFactory
 
 /**
   * This map function only works for windows on batch tables.
   * It appends an (aligned) rowtime field to the end of the output row.
+  *
+  * @param genAggregations  Code-generated [[GeneratedAggregations]]
+  * @param timeFieldPos Time field position in input row
+  * @param tumbleTimeWindowSize The size of tumble time window
   */
 class DataSetWindowAggMapFunction(
-private val aggregates: Array[AggregateFunction[_]],
-private val aggFields: Array[Array[Int]],
-private val groupingKeys: Array[Int],
-private val timeFieldPos: Int, // time field position in input row
+private val genAggregations: GeneratedAggregationsFunction,
+private val timeFieldPos: Int,
 private val tumbleTimeWindowSize: Option[Long],
 @transient private val returnType: TypeInformation[Row])
-  extends RichMapFunction[Row, Row] with ResultTypeQueryable[Row] {
-
-  Preconditions.checkNotNull(aggregates)
-  Preconditions.checkNotNull(aggFields)
-  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  extends RichMapFunction[Row, Row]
+with ResultTypeQueryable[Row]
+with Compiler[GeneratedAggregations] {
 
   private var output: Row = _
-  // add one more arity to store rowtime
-  private val partialRowLength = groupingKeys.length + aggregates.length + 
1
-  // rowtime index in the buffer output row
-  private val rowtimeIndex: Int = partialRowLength - 1
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+  private var function: GeneratedAggregations = _
 
   override def open(config: Configuration) {
-output = new Row(partialRowLength)
+LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
+s"Code:\n$genAggregations.code")
+val clazz = compile(
+  getRuntimeContext.getUserCodeClassLoader,
+  genAggregations.name,
+  genAggregations.code)
+LOG.debug("Instantiating AggregateHelper.")
+function = clazz.newInstance()
+
+output = function.createOutputRow()
   }
 
   override def map(input: Row): Row = {
 
-var i = 0
-while (i < aggregates.length) {
-  val agg = aggregates(i)
-  val fieldValue = input.getField(aggFields(i)(0))
-  val accumulator = agg.createAccumulator()
-  agg.accumulate(accumulator, fieldValue)
-  output.setField(groupingKeys.length + i, accumulator)
-  i += 1
-}
+function.createAccumulatorsAndSetToOutput(output)
--- End diff --

create an accumulator with `function.createAccumulator()` once in `open()`, 
reset it here, and copy it to `output` with `function.setAggregationResults()`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...

2017-04-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r112007816
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
 ---
@@ -36,6 +36,15 @@ abstract class GeneratedAggregations extends Function {
   def setAggregationResults(accumulators: Row, output: Row)
 
   /**
+* Calculates the results from accumulators, and set the results to the 
output (with key offset)
+*
+* @param accumulators the accumulators (saved in a row) which contains 
the current
+* aggregated results
+* @param output   output results collected in a row
+*/
+  def setAggregationResultsWithKeyOffset(accumulators: Row, output: Row)
--- End diff --

Actually, I'm not sure if we really need to implement a different code 
generation function. I had a look at the code generation code and think that we 
could just add a few more parameters to the current code gen method. Right now, 
the behavior of most generated methods can be exactly defined:

- `createAccumulators()`: generates a `Row` with the accumulators for each 
provided `AggregationFunction`. Some methods to `GeneratedAggregations` expect 
a Row of accumulators with exactly this layout as one of their input 
parameters. In the following, this parameter is called `accs`.
- `accumulate(accs, row)`: The `aggFields` parameter controls which fields 
of `row` are accumulated into which accumulator. We should rename this 
parameter to `accFields` though, IMO.
- `retract(accs, row)`: same as for `accumulate`. We should add a separate 
parameter `retractFields: Array[Int]` though.
- `setForwardedFields(input, output)`: The `fwdMapping` parameter controls 
which field of the input row is copied to which position of the output row. We 
could add an optional parameter to copy the `groupSetMapping` to the output as 
well.
- `setAggregationResults(accs, output)`: The `aggMapping` parameter 
controls to which output fields the aggregation results are copied. If we add 
another parameter `partialResults: Boolean`, we can control whether to copy 
final results (`AggregateFunction.getValue()`) or partial results (the 
accumulator).
- `createOutputRow()`: the `outputArity` parameter specfies the arity of 
the output row.
- `mergeAccumulatorsPair(accs, other)`: **This is the only inflexible 
method**. We could change the behavior of the method as follows: The method 
expects as first parameter (`accs`) a Row with the same layout as generated by 
`createAccumulators`. The second parameter can be any row with accumulators at 
arbitrary positions. To enable the merging, we add a parameter `mergeMapping: 
Array[Int]` to the code generating function which defines which fields of the 
`other` parameter are merged with the fields in the `accs` Row. The method 
returns a Row with the default layout (as generated by `createAccumulators()`).
- `resetAccumulator(accs)`: resets a Row of accumulators of the known 
layout.

I haven't checked this thoroughly, but I think with these parameters, we 
can control the generated code sufficiently to support all aggregation 
operators for DataSet and DataStream, i.e., we can generate the currently 
existing functions such that they behave as the more specialized ones that you 
added. Since all code gen parameters (`accFields`, `retractFields`, 
`fwdMapping`, `groupSetMapping`, `aggMapping`, `partialResults`, `outputArity`, 
`mergeMapping`) can be independently set for each type of operator, this should 
give us the flexibility for all types for operators. We only need to 
parameterize the code generation method appropriately. 

In addition, we could make all parameters `Option` and generate empty 
methods if the parameters for a function are not set. (This could also be a 
follow up issue, IMO)

What do you think @shaoxuan-wang ?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...

2017-04-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r112002140
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala
 ---
@@ -30,78 +30,46 @@ import org.apache.flink.types.Row
   *
   * It is used for sliding on batch for both time and count-windows.
   *
-  * @param aggregates aggregate functions.
-  * @param groupKeysMapping index mapping of group keys between 
intermediate aggregate Row
-  * and output Row.
-  * @param aggregateMapping index mapping between aggregate function list 
and aggregated value
-  * index in output Row.
-  * @param finalRowArity output row field count
+  * @param genAggregations Code-generated [[GeneratedAggregations]]
+  * @param keysAndAggregatesArity The total arity of keys and aggregates
   * @param finalRowWindowStartPos relative window-start position to last 
field of output row
   * @param finalRowWindowEndPos relative window-end position to last field 
of output row
   * @param windowSize size of the window, used to determine window-end for 
output row
   */
 class DataSetSlideWindowAggReduceCombineFunction(
-aggregates: Array[AggregateFunction[_ <: Any]],
-groupKeysMapping: Array[(Int, Int)],
-aggregateMapping: Array[(Int, Int)],
-finalRowArity: Int,
+genAggregations: GeneratedAggregationsFunction,
+keysAndAggregatesArity: Int,
 finalRowWindowStartPos: Option[Int],
 finalRowWindowEndPos: Option[Int],
 windowSize: Long)
   extends DataSetSlideWindowAggReduceGroupFunction(
-aggregates,
-groupKeysMapping,
-aggregateMapping,
-finalRowArity,
+genAggregations,
+keysAndAggregatesArity,
 finalRowWindowStartPos,
 finalRowWindowEndPos,
 windowSize)
   with CombineFunction[Row, Row] {
 
-  private val intermediateRowArity: Int = groupKeysMapping.length + 
aggregateMapping.length + 1
-  private val intermediateRow: Row = new Row(intermediateRowArity)
+  private val intermediateRow: Row = new Row(keysAndAggregatesArity + 1)
 
   override def combine(records: Iterable[Row]): Row = {
 
-// reset first accumulator
-var i = 0
-while (i < aggregates.length) {
-  aggregates(i).resetAccumulator(accumulatorList(i).get(0))
-  i += 1
-}
+// reset accumulator
+function.resetAccumulator(accumulators)
 
 val iterator = records.iterator()
 while (iterator.hasNext) {
   val record = iterator.next()
--- End diff --

make `record` a `var` and declare it outside of the loop.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...

2017-04-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r111995370
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala
 ---
@@ -68,23 +72,16 @@ class DataSetAggFunction(
   override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = 
{
 
 // create accumulators
-var i = 0
-while (i < aggregates.length) {
-  accumulators(i) = aggregates(i).createAccumulator()
-  i += 1
-}
+accumulators = function.createAccumulators()
 
 val iterator = records.iterator()
 
 while (iterator.hasNext) {
   val record = iterator.next()
--- End diff --

we can make `record` a `var` and move its definition outside of the loop.
Then we can get rid of the `if (!iterator.hasNext)` check in the body of 
the while loop and set the `output` fields after the loop has terminated. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973426#comment-15973426
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r111991648
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -451,13 +581,34 @@ class CodeGenerator(
   {
 for (i <- accTypes.indices) yield
   j"""
- |accList$i = new java.util.ArrayList<${accTypes(i)}>(2);
+ |accList$i = new java.util.ArrayList<${accTypes(i)}>();
--- End diff --

Why not creating the `ArrayList` with initial capacity 2?


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...

2017-04-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r111995303
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala
 ---
@@ -19,88 +19,71 @@
 package org.apache.flink.table.runtime.aggregate
 
 import java.lang.Iterable
-import java.util.{ArrayList => JArrayList}
 
 import org.apache.flink.api.common.functions.RichGroupReduceFunction
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.table.codegen.{Compiler, 
GeneratedAggregationsFunction}
 import org.apache.flink.types.Row
 import org.apache.flink.util.{Collector, Preconditions}
+import org.slf4j.LoggerFactory
 
 /**
   * [[RichGroupReduceFunction]] to compute the final result of a 
pre-aggregated aggregation
   * for batch (DataSet) queries.
   *
-  * @param aggregates The aggregate functions.
-  * @param aggOutFields The positions of the aggregation results in the 
output
+  * @param genAggregations Code-generated [[GeneratedAggregations]]
   * @param gkeyOutFields The positions of the grouping keys in the output
   * @param groupingSetsMapping The mapping of grouping set keys between 
input and output positions.
-  * @param finalRowArity The arity of the final resulting row
   */
 class DataSetFinalAggFunction(
-private val aggregates: Array[AggregateFunction[_ <: Any]],
-private val aggOutFields: Array[Int],
+private val genAggregations: GeneratedAggregationsFunction,
 private val gkeyOutFields: Array[Int],
-private val groupingSetsMapping: Array[(Int, Int)],
-private val finalRowArity: Int)
-  extends RichGroupReduceFunction[Row, Row] {
+private val groupingSetsMapping: Array[(Int, Int)])
+  extends RichGroupReduceFunction[Row, Row]
+with Compiler[GeneratedAggregations] {
 
-  Preconditions.checkNotNull(aggregates)
-  Preconditions.checkNotNull(aggOutFields)
   Preconditions.checkNotNull(gkeyOutFields)
   Preconditions.checkNotNull(groupingSetsMapping)
 
   private var output: Row = _
+  private var accumulators: Row = _
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+  private var function: GeneratedAggregations = _
 
   private val intermediateGKeys: Option[Array[Int]] = if 
(!groupingSetsMapping.isEmpty) {
 Some(gkeyOutFields)
   } else {
 None
   }
 
-  private val numAggs = aggregates.length
-  private val numGKeys = gkeyOutFields.length
-
-  private val accumulators: Array[JArrayList[Accumulator]] =
-Array.fill(numAggs)(new JArrayList[Accumulator](2))
-
   override def open(config: Configuration) {
-output = new Row(finalRowArity)
-
-// init lists with two empty accumulators
-for (i <- aggregates.indices) {
-  val accumulator = aggregates(i).createAccumulator()
-  accumulators(i).add(accumulator)
-  accumulators(i).add(accumulator)
-}
+LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
+s"Code:\n$genAggregations.code")
+val clazz = compile(
+  getClass.getClassLoader,
+  genAggregations.name,
+  genAggregations.code)
+LOG.debug("Instantiating AggregateHelper.")
+function = clazz.newInstance()
+
+output = function.createOutputRow()
+accumulators = function.createAccumulators()
   }
 
   override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = 
{
 
 val iterator = records.iterator()
 
 // reset first accumulator
-var i = 0
-while (i < aggregates.length) {
-  aggregates(i).resetAccumulator(accumulators(i).get(0))
-  i += 1
-}
+function.resetAccumulator(accumulators)
 
+var i = 0
 while (iterator.hasNext) {
   val record = iterator.next()
--- End diff --

we can make `record` a `var` and move its definition outside of the loop.
Then we can get rid of the `if (!iterator.hasNext)` check in the body of 
the while loop and set the `output` fields after the loop has terminated. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973415#comment-15973415
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r112002278
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala
 ---
@@ -145,44 +121,23 @@ class DataSetSlideTimeWindowAggReduceGroupFunction(
   override def combine(records: Iterable[Row]): Row = {
 
 // reset first accumulator
-var i = 0
-while (i < aggregates.length) {
-  aggregates(i).resetAccumulator(accumulatorList(i).get(0))
-  i += 1
-}
+function.resetAccumulator(accumulators)
 
 val iterator = records.iterator()
+
 while (iterator.hasNext) {
   val record = iterator.next()
 
-  i = 0
-  while (i < aggregates.length) {
-// insert received accumulator into acc list
-val newAcc = record.getField(groupingKeysLength + 
i).asInstanceOf[Accumulator]
-accumulatorList(i).set(1, newAcc)
-// merge acc list
-val retAcc = aggregates(i).merge(accumulatorList(i))
-// insert result into acc list
-accumulatorList(i).set(0, retAcc)
-i += 1
-  }
+  function.mergeAccumulatorsPairWithKeyOffset(accumulators, record)
 
   // check if this record is the last record
   if (!iterator.hasNext) {
--- End diff --

move this behind the loop


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...

2017-04-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r111991648
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -451,13 +581,34 @@ class CodeGenerator(
   {
 for (i <- accTypes.indices) yield
   j"""
- |accList$i = new java.util.ArrayList<${accTypes(i)}>(2);
+ |accList$i = new java.util.ArrayList<${accTypes(i)}>();
--- End diff --

Why not creating the `ArrayList` with initial capacity 2?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973414#comment-15973414
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r111994938
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala
 ---
@@ -64,38 +68,22 @@ class DataSetPreAggFunction(
   def preaggregate(records: Iterable[Row], out: Collector[Row]): Unit = {
 
 // create accumulators
-var i = 0
-while (i < aggregates.length) {
-  accumulators(i) = aggregates(i).createAccumulator()
-  i += 1
-}
+accumulators = function.createAccumulators()
 
 val iterator = records.iterator()
 
 while (iterator.hasNext) {
   val record = iterator.next()
--- End diff --

we can make `record` a `var` and move its definition outside of the loop.
Then we can get rid of the `if (!iterator.hasNext)` check in the body of 
the while loop and set the `output` fields after the loop has terminated. 


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973427#comment-15973427
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r112003763
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala
 ---
@@ -25,58 +25,56 @@ import 
org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
-import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.table.codegen.{Compiler, 
GeneratedAggregationsFunction}
 import org.apache.flink.types.Row
-import org.apache.flink.util.Preconditions
-
+import org.slf4j.LoggerFactory
 
 /**
   * This map function only works for windows on batch tables.
   * It appends an (aligned) rowtime field to the end of the output row.
+  *
+  * @param genAggregations  Code-generated [[GeneratedAggregations]]
+  * @param timeFieldPos Time field position in input row
+  * @param tumbleTimeWindowSize The size of tumble time window
   */
 class DataSetWindowAggMapFunction(
-private val aggregates: Array[AggregateFunction[_]],
-private val aggFields: Array[Array[Int]],
-private val groupingKeys: Array[Int],
-private val timeFieldPos: Int, // time field position in input row
+private val genAggregations: GeneratedAggregationsFunction,
+private val timeFieldPos: Int,
 private val tumbleTimeWindowSize: Option[Long],
 @transient private val returnType: TypeInformation[Row])
-  extends RichMapFunction[Row, Row] with ResultTypeQueryable[Row] {
-
-  Preconditions.checkNotNull(aggregates)
-  Preconditions.checkNotNull(aggFields)
-  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  extends RichMapFunction[Row, Row]
+with ResultTypeQueryable[Row]
+with Compiler[GeneratedAggregations] {
 
   private var output: Row = _
-  // add one more arity to store rowtime
-  private val partialRowLength = groupingKeys.length + aggregates.length + 
1
-  // rowtime index in the buffer output row
-  private val rowtimeIndex: Int = partialRowLength - 1
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+  private var function: GeneratedAggregations = _
 
   override def open(config: Configuration) {
-output = new Row(partialRowLength)
+LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
+s"Code:\n$genAggregations.code")
+val clazz = compile(
+  getRuntimeContext.getUserCodeClassLoader,
+  genAggregations.name,
+  genAggregations.code)
+LOG.debug("Instantiating AggregateHelper.")
+function = clazz.newInstance()
+
+output = function.createOutputRow()
   }
 
   override def map(input: Row): Row = {
 
-var i = 0
-while (i < aggregates.length) {
-  val agg = aggregates(i)
-  val fieldValue = input.getField(aggFields(i)(0))
-  val accumulator = agg.createAccumulator()
-  agg.accumulate(accumulator, fieldValue)
-  output.setField(groupingKeys.length + i, accumulator)
-  i += 1
-}
+function.createAccumulatorsAndSetToOutput(output)
--- End diff --

create an accumulator with `function.createAccumulator()` once in `open()`, 
reset it here, and copy it to `output` with `function.setAggregationResults()`?


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973428#comment-15973428
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r112002707
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
 ---
@@ -18,111 +18,77 @@
 package org.apache.flink.table.runtime.aggregate
 
 import java.lang.Iterable
-import java.util.{ArrayList => JArrayList}
 
 import org.apache.flink.api.common.functions.RichGroupReduceFunction
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.table.codegen.{Compiler, 
GeneratedAggregationsFunction}
 import org.apache.flink.types.Row
-import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
 
 /**
   * It wraps the aggregate logic inside of
   * [[org.apache.flink.api.java.operators.GroupReduceOperator]].
   *
   * It is used for sliding on batch for both time and count-windows.
   *
-  * @param aggregates aggregate functions.
-  * @param groupKeysMapping index mapping of group keys between 
intermediate aggregate Row
-  * and output Row.
-  * @param aggregateMapping index mapping between aggregate function list 
and aggregated value
-  * index in output Row.
-  * @param finalRowArity output row field count
+  * @param genAggregations Code-generated [[GeneratedAggregations]]
+  * @param keysAndAggregatesArity The total arity of keys and aggregates
   * @param finalRowWindowStartPos relative window-start position to last 
field of output row
   * @param finalRowWindowEndPos relative window-end position to last field 
of output row
   * @param windowSize size of the window, used to determine window-end for 
output row
   */
 class DataSetSlideWindowAggReduceGroupFunction(
-aggregates: Array[AggregateFunction[_ <: Any]],
-groupKeysMapping: Array[(Int, Int)],
-aggregateMapping: Array[(Int, Int)],
-finalRowArity: Int,
+genAggregations: GeneratedAggregationsFunction,
+keysAndAggregatesArity: Int,
 finalRowWindowStartPos: Option[Int],
 finalRowWindowEndPos: Option[Int],
 windowSize: Long)
-  extends RichGroupReduceFunction[Row, Row] {
-
-  Preconditions.checkNotNull(aggregates)
-  Preconditions.checkNotNull(groupKeysMapping)
+  extends RichGroupReduceFunction[Row, Row]
+with Compiler[GeneratedAggregations] {
 
   private var collector: TimeWindowPropertyCollector = _
+  protected val windowStartPos: Int = keysAndAggregatesArity
+
   private var output: Row = _
-  private val accumulatorStartPos: Int = groupKeysMapping.length
-  protected val windowStartPos: Int = accumulatorStartPos + 
aggregates.length
+  protected var accumulators: Row = _
 
-  val accumulatorList: Array[JArrayList[Accumulator]] = 
Array.fill(aggregates.length) {
-new JArrayList[Accumulator](2)
-  }
+  val LOG = LoggerFactory.getLogger(this.getClass)
+  protected var function: GeneratedAggregations = _
 
   override def open(config: Configuration) {
-output = new Row(finalRowArity)
+LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
+s"Code:\n$genAggregations.code")
+val clazz = compile(
+  getClass.getClassLoader,
+  genAggregations.name,
+  genAggregations.code)
+LOG.debug("Instantiating AggregateHelper.")
+function = clazz.newInstance()
+
+output = function.createOutputRow()
+accumulators = function.createAccumulators()
 collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, 
finalRowWindowEndPos)
-
-// init lists with two empty accumulators
-var i = 0
-while (i < aggregates.length) {
-  val accumulator = aggregates(i).createAccumulator()
-  accumulatorList(i).add(accumulator)
-  accumulatorList(i).add(accumulator)
-  i += 1
-}
   }
 
   override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = 
{
 
-// reset first accumulator
-var i = 0
-while (i < aggregates.length) {
-  aggregates(i).resetAccumulator(accumulatorList(i).get(0))
-  i += 1
-}
+// reset accumulator
+function.resetAccumulator(accumulators)
 
 val iterator = records.iterator()
 while (iterator.hasNext) {
   

[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973417#comment-15973417
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r111993502
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala
 ---
@@ -64,38 +68,22 @@ class DataSetPreAggFunction(
   def preaggregate(records: Iterable[Row], out: Collector[Row]): Unit = {
 
 // create accumulators
-var i = 0
-while (i < aggregates.length) {
-  accumulators(i) = aggregates(i).createAccumulator()
-  i += 1
-}
+accumulators = function.createAccumulators()
--- End diff --

create accumulators once and use `function.resetAccumulators()`?


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973422#comment-15973422
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r111992686
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala
 ---
@@ -21,44 +21,48 @@ import java.lang.Iterable
 
 import org.apache.flink.api.common.functions.RichGroupReduceFunction
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.table.codegen.{Compiler, 
GeneratedAggregationsFunction}
 import org.apache.flink.types.Row
 import org.apache.flink.util.{Collector, Preconditions}
+import org.slf4j.LoggerFactory
 
 /**
   * [[RichGroupReduceFunction]] to compute aggregates that do not support 
pre-aggregation for batch
   * (DataSet) queries.
   *
-  * @param aggregates The aggregate functions.
-  * @param aggInFields The positions of the aggregation input fields.
+  * @param genAggregations Code-generated [[GeneratedAggregations]]
   * @param gkeyOutMapping The mapping of group keys between input and 
output positions.
-  * @param aggOutMapping  The mapping of aggregates to output positions.
   * @param groupingSetsMapping The mapping of grouping set keys between 
input and output positions.
-  * @param finalRowArity The arity of the final resulting row.
   */
 class DataSetAggFunction(
-private val aggregates: Array[AggregateFunction[_ <: Any]],
-private val aggInFields: Array[Array[Int]],
-private val aggOutMapping: Array[(Int, Int)],
+private val genAggregations: GeneratedAggregationsFunction,
 private val gkeyOutMapping: Array[(Int, Int)],
--- End diff --

It would be good if we could parameterize the method that generates the 
code such that we can do the grouping keys and grouping set copies with 
`GeneratedAggregations.setForwardFields()`. This should be possible as it is 
actually just setting constant boolean flags at certain positions in the output 
Row.


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973420#comment-15973420
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r111995303
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala
 ---
@@ -19,88 +19,71 @@
 package org.apache.flink.table.runtime.aggregate
 
 import java.lang.Iterable
-import java.util.{ArrayList => JArrayList}
 
 import org.apache.flink.api.common.functions.RichGroupReduceFunction
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.table.codegen.{Compiler, 
GeneratedAggregationsFunction}
 import org.apache.flink.types.Row
 import org.apache.flink.util.{Collector, Preconditions}
+import org.slf4j.LoggerFactory
 
 /**
   * [[RichGroupReduceFunction]] to compute the final result of a 
pre-aggregated aggregation
   * for batch (DataSet) queries.
   *
-  * @param aggregates The aggregate functions.
-  * @param aggOutFields The positions of the aggregation results in the 
output
+  * @param genAggregations Code-generated [[GeneratedAggregations]]
   * @param gkeyOutFields The positions of the grouping keys in the output
   * @param groupingSetsMapping The mapping of grouping set keys between 
input and output positions.
-  * @param finalRowArity The arity of the final resulting row
   */
 class DataSetFinalAggFunction(
-private val aggregates: Array[AggregateFunction[_ <: Any]],
-private val aggOutFields: Array[Int],
+private val genAggregations: GeneratedAggregationsFunction,
 private val gkeyOutFields: Array[Int],
-private val groupingSetsMapping: Array[(Int, Int)],
-private val finalRowArity: Int)
-  extends RichGroupReduceFunction[Row, Row] {
+private val groupingSetsMapping: Array[(Int, Int)])
+  extends RichGroupReduceFunction[Row, Row]
+with Compiler[GeneratedAggregations] {
 
-  Preconditions.checkNotNull(aggregates)
-  Preconditions.checkNotNull(aggOutFields)
   Preconditions.checkNotNull(gkeyOutFields)
   Preconditions.checkNotNull(groupingSetsMapping)
 
   private var output: Row = _
+  private var accumulators: Row = _
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+  private var function: GeneratedAggregations = _
 
   private val intermediateGKeys: Option[Array[Int]] = if 
(!groupingSetsMapping.isEmpty) {
 Some(gkeyOutFields)
   } else {
 None
   }
 
-  private val numAggs = aggregates.length
-  private val numGKeys = gkeyOutFields.length
-
-  private val accumulators: Array[JArrayList[Accumulator]] =
-Array.fill(numAggs)(new JArrayList[Accumulator](2))
-
   override def open(config: Configuration) {
-output = new Row(finalRowArity)
-
-// init lists with two empty accumulators
-for (i <- aggregates.indices) {
-  val accumulator = aggregates(i).createAccumulator()
-  accumulators(i).add(accumulator)
-  accumulators(i).add(accumulator)
-}
+LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
+s"Code:\n$genAggregations.code")
+val clazz = compile(
+  getClass.getClassLoader,
+  genAggregations.name,
+  genAggregations.code)
+LOG.debug("Instantiating AggregateHelper.")
+function = clazz.newInstance()
+
+output = function.createOutputRow()
+accumulators = function.createAccumulators()
   }
 
   override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = 
{
 
 val iterator = records.iterator()
 
 // reset first accumulator
-var i = 0
-while (i < aggregates.length) {
-  aggregates(i).resetAccumulator(accumulators(i).get(0))
-  i += 1
-}
+function.resetAccumulator(accumulators)
 
+var i = 0
 while (iterator.hasNext) {
   val record = iterator.next()
--- End diff --

we can make `record` a `var` and move its definition outside of the loop.
Then we can get rid of the `if (!iterator.hasNext)` check in the body of 
the while loop and set the `output` fields after the loop has terminated. 


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>

[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973413#comment-15973413
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r111990633
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -263,33 +263,56 @@ class CodeGenerator(
  aggFields: Array[Array[Int]],
  aggMapping: Array[Int],
  fwdMapping: Array[(Int, Int)],
- outputArity: Int)
+ outputArity: Int,
+ groupingKeys: Array[Int])
   : GeneratedAggregationsFunction = {
 
 def genSetAggregationResults(
   accTypes: Array[String],
   aggs: Array[String],
   aggMapping: Array[Int]): String = {
 
-  val sig: String =
+  val sigHelper: String =
 j"""
-|  public void setAggregationResults(
-|org.apache.flink.types.Row accs,
-|org.apache.flink.types.Row output)""".stripMargin
+   |  private final void setAggregationResultsHelper(
+   |org.apache.flink.types.Row accs,
+   |org.apache.flink.types.Row output,
+   |java.lang.Integer offset)""".stripMargin
 
-  val setAggs: String = {
+  val setAggsHelper: String = {
 for (i <- aggs.indices) yield
   j"""
  |org.apache.flink.table.functions.AggregateFunction 
baseClass$i =
  |  (org.apache.flink.table.functions.AggregateFunction) 
${aggs(i)};
  |
  |output.setField(
- |  ${aggMapping(i)},
+ |  ${aggMapping(i)} + offset,
--- End diff --

`${aggMapping(i)} + offset` -> `${aggMapping(i) + offset}` to add the 
constant `offset` to the mapping before generating the code.


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


  1   2   3   4   5   >