[jira] [Updated] (FLINK-5132) Introduce the ResourceSpec for grouping different resource factors in API

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5132:
-
Description: 
This is part of the fine-grained resource configuration.
The current resource factors include cpu cores, heap memory, direct memory, 
native memory and state size.
The *ResourceSpec* will provide some basic constructions for grouping different 
resource factors as needed and the construction can also be expanded easily for 
further requirements.


> Introduce the ResourceSpec for grouping different resource factors in API
> -
>
> Key: FLINK-5132
> URL: https://issues.apache.org/jira/browse/FLINK-5132
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Zhijiang Wang
>
> This is part of the fine-grained resource configuration.
> The current resource factors include cpu cores, heap memory, direct memory, 
> native memory and state size.
> The *ResourceSpec* will provide some basic constructions for grouping 
> different resource factors as needed and the construction can also be 
> expanded easily for further requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5132) Introduce the ResourceSpec for grouping different resource factors in API

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5132:
-
Description: 
This is part of the fine-grained resource configuration.

The current resource factors include cpu cores, heap memory, direct memory, 
native memory and state size.
The *ResourceSpec* will provide some basic constructions for grouping different 
resource factors as needed and the construction can also be expanded easily for 
further requirements.


  was:
This is part of the fine-grained resource configuration.
The current resource factors include cpu cores, heap memory, direct memory, 
native memory and state size.
The *ResourceSpec* will provide some basic constructions for grouping different 
resource factors as needed and the construction can also be expanded easily for 
further requirements.



> Introduce the ResourceSpec for grouping different resource factors in API
> -
>
> Key: FLINK-5132
> URL: https://issues.apache.org/jira/browse/FLINK-5132
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Zhijiang Wang
>
> This is part of the fine-grained resource configuration.
> The current resource factors include cpu cores, heap memory, direct memory, 
> native memory and state size.
> The *ResourceSpec* will provide some basic constructions for grouping 
> different resource factors as needed and the construction can also be 
> expanded easily for further requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5023) Add get() method in State interface

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688809#comment-15688809
 ] 

ASF GitHub Bot commented on FLINK-5023:
---

Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/2768
  
@aljoscha Thanks for your review. I have updated the PR according to your 
suggestion. 


> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2768: [FLINK-5023][FLINK-5024] Add SimpleStateDescriptor to cla...

2016-11-22 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/2768
  
@aljoscha Thanks for your review. I have updated the PR according to your 
suggestion. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-5133) Add new setResource API for DataStream and DataSet

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5133:
-
Description: 
This is part of the fine-grained resource configuration.

For *DataStream*, the *setResource* API will be setted onto 
*SingleOutputStreamOperator* similar with other existing properties like 
parallelism, name, etc.
For *DataSet*, the *setResource* API will be setted onto *Operator* in the 
similar way.

There are two parameters described with minimum *ResourceSpec* and maximum 
*ResourceSpec* separately in the API for considering resource resize in future 
improvements.

> Add new setResource API for DataStream and DataSet
> --
>
> Key: FLINK-5133
> URL: https://issues.apache.org/jira/browse/FLINK-5133
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataSet API, DataStream API
>Reporter: Zhijiang Wang
>
> This is part of the fine-grained resource configuration.
> For *DataStream*, the *setResource* API will be setted onto 
> *SingleOutputStreamOperator* similar with other existing properties like 
> parallelism, name, etc.
> For *DataSet*, the *setResource* API will be setted onto *Operator* in the 
> similar way.
> There are two parameters described with minimum *ResourceSpec* and maximum 
> *ResourceSpec* separately in the API for considering resource resize in 
> future improvements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating job graph

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5134:
-
Description: 
In *JobGraph* generation, each *JobVertex* corresponds to a series of chained 
operators.
The resource of *JobVertex* should be aggregation of individual resource in 
chained operators.
For memory resource in *JobVertex*, the aggregation is the sum formula for 
chained operators. 
And for cpu cores resource in *JobVertex*, the aggregation is the maximum 
formula for chained operators.

  was:
In *JobGraph* generation, each *JobVertex* corresponds to a series of chained 
operators.
The resource of *JobVertex* should be aggregation of individual resource in 
chained operators.
For memory resource in *JobVertex*, the aggregation is the sum formula for 
chained operators. And for cpu cores resource in *JobVertex*, the aggregation 
is the maximum formula for chained operators.


> Aggregate ResourceSpe for chained operators when generating job graph
> -
>
> Key: FLINK-5134
> URL: https://issues.apache.org/jira/browse/FLINK-5134
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataStream API
>Reporter: Zhijiang Wang
>
> In *JobGraph* generation, each *JobVertex* corresponds to a series of chained 
> operators.
> The resource of *JobVertex* should be aggregation of individual resource in 
> chained operators.
> For memory resource in *JobVertex*, the aggregation is the sum formula for 
> chained operators. 
> And for cpu cores resource in *JobVertex*, the aggregation is the maximum 
> formula for chained operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5136) Aggregation of slot ResourceProfile and framework memory for requesting resource from cluster by ResourceManager

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5136:
-
Summary: Aggregation of slot ResourceProfile and framework memory for 
requesting resource from cluster by ResourceManager  (was: ResourceManager 
should consider both slot resource profile and framework memory for requesting 
resource from cluster)

> Aggregation of slot ResourceProfile and framework memory for requesting 
> resource from cluster by ResourceManager
> 
>
> Key: FLINK-5136
> URL: https://issues.apache.org/jira/browse/FLINK-5136
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Zhijiang Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5132) Introduce the ResourceSpec for grouping different resource factors in API

2016-11-22 Thread Zhijiang Wang (JIRA)
Zhijiang Wang created FLINK-5132:


 Summary: Introduce the ResourceSpec for grouping different 
resource factors in API
 Key: FLINK-5132
 URL: https://issues.apache.org/jira/browse/FLINK-5132
 Project: Flink
  Issue Type: Sub-task
  Components: Core
Reporter: Zhijiang Wang






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5131) Fine-grained Resource Configuration

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5131:
-
Description: 
Normally the UDF just creates short-life small objects and these can be 
recycled quickly by JVM, so most of the memory resource is controlled and 
managed by *TaskManager* framework.  But for some special cases, the UDF may 
consume much resource to create long-live big objects, so it is necessary to 
provide the options for professional users to define the resource usages if 
needed.

The basic approach is the following:
  - Introduce the *ResourceSpec* structure to describe the different resource 
factors (cpu cores, heap memory, direct memory, native memory, etc) and provide 
some basic construction methods for resource group.
  - The *ResourceSpec* can be setted onto the internal transformation in 
DataStream and base operator in DataSet separately.
  - In stream graph generation, the *ResourceSpec* will be aggregated  for 
chained operators.
  - When *JobManager* requests slot for submitting task from *ResourceManager*, 
the *ResourceProfile* will be expanded  to correspondence with *ResourceSpec*.
  - The *ResourceManager* requests resource for container from cluster, it 
should consider extra framework memory except for slot *ResourceProfile*.
  - The framework memory is mainly used by *NetworkBufferPool* and 
*MemoryManager* in *TaskManager*, and it can be configured in job level.
  - Apart from resource, The JVM options attached with container should be 
supported and could also be configured in job level.

This feature will be implemented directly into flip-6 branch.

  was:
Normally the UDF just creates short-life small objects and these can be 
recycled quickly by JVM, so most of the memory resource is controlled and 
managed by *TaskManager* framework.  But for some special cases, the UDF may 
consume much resource to create long-live big objects, so it is necessary to 
provide the options for professional users to define the resource usages if 
needed.

The basic approach is the following:
  - Introduce the *ResourceSpec* structure to describe the different resource 
factors (cpu cores, heap memory, direct memory, native memory, etc) and provide 
some basic construction methods for resource group.
  - The *ResourceSpec* can be setted onto the internal transformation in 
DataStream and base operator in DataSet separately.
  - In stream graph generation, the *ResourceSpec* will be aggregated  for 
chained operators.
  - When *JobManager* requests slot for submitting task from *ResourceManager*, 
the *ResourceProfile* will be expanded  to correspondence with *ResourceSpec*.
  - The *ResourceManager* requests resource for container from cluster, it 
should consider extra framework memory except for slot *ResourceProfile*.
  - The framework memory is mainly used by *NetworkBufferPool* and 
*MemoryManager* in *TaskManager*, and it can be configured in job level.
  - Apart from resource, The JVM options attached with container should be 
supported and could also be configured in job level.


> Fine-grained Resource Configuration
> ---
>
> Key: FLINK-5131
> URL: https://issues.apache.org/jira/browse/FLINK-5131
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API, DataStream API, JobManager, ResourceManager
>Reporter: Zhijiang Wang
>
> Normally the UDF just creates short-life small objects and these can be 
> recycled quickly by JVM, so most of the memory resource is controlled and 
> managed by *TaskManager* framework.  But for some special cases, the UDF may 
> consume much resource to create long-live big objects, so it is necessary to 
> provide the options for professional users to define the resource usages if 
> needed.
> The basic approach is the following:
>   - Introduce the *ResourceSpec* structure to describe the different resource 
> factors (cpu cores, heap memory, direct memory, native memory, etc) and 
> provide some basic construction methods for resource group.
>   - The *ResourceSpec* can be setted onto the internal transformation in 
> DataStream and base operator in DataSet separately.
>   - In stream graph generation, the *ResourceSpec* will be aggregated  for 
> chained operators.
>   - When *JobManager* requests slot for submitting task from 
> *ResourceManager*, the *ResourceProfile* will be expanded  to correspondence 
> with *ResourceSpec*.
>   - The *ResourceManager* requests resource for container from cluster, it 
> should consider extra framework memory except for slot *ResourceProfile*.
>   - The framework memory is mainly used by *NetworkBufferPool* and 
> *MemoryManager* in *TaskManager*, and it can be configured in job level.
>   - Apart from resource, The JVM options attached with container should be 
> supported and could also be 

[jira] [Updated] (FLINK-5131) Fine-grained Resource Configuration

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5131:
-
Description: 
Normally the UDF just creates short-life small objects and these can be 
recycled quickly by JVM, so most of the memory resource is controlled and 
managed by *TaskManager* framework.  But for some special cases, the UDF may 
consume much resource to create long-live big objects, so it is necessary to 
provide the options for professional users to define the resource usages if 
needed.

The basic approach is the following:
  - Introduce the *ResourceSpec* structure to describe the different resource 
factors (cpu cores, heap memory, direct memory, native memory, etc) and provide 
some basic construction methods for resource group.
  - The *ResourceSpec* can be setted onto the internal transformation in 
DataStream and base operator in DataSet separately.
  - In job graph generation, the *ResourceSpec* will be aggregated  for chained 
operators.
  - When *JobManager* requests slot for submitting task from *ResourceManager*, 
the *ResourceProfile* will be expanded  to correspond with *ResourceSpec*.
  - The *ResourceManager* requests resource for container from cluster, it 
should consider extra framework memory except for slot *ResourceProfile*.
  - The framework memory is mainly used by *NetworkBufferPool* and 
*MemoryManager* in *TaskManager*, and it can be configured in job level.
  - Apart from resource, The JVM options attached with container should be 
supported and could also be configured in job level.

This feature will be implemented directly into flip-6 branch.

  was:
Normally the UDF just creates short-life small objects and these can be 
recycled quickly by JVM, so most of the memory resource is controlled and 
managed by *TaskManager* framework.  But for some special cases, the UDF may 
consume much resource to create long-live big objects, so it is necessary to 
provide the options for professional users to define the resource usages if 
needed.

The basic approach is the following:
  - Introduce the *ResourceSpec* structure to describe the different resource 
factors (cpu cores, heap memory, direct memory, native memory, etc) and provide 
some basic construction methods for resource group.
  - The *ResourceSpec* can be setted onto the internal transformation in 
DataStream and base operator in DataSet separately.
  - In job graph generation, the *ResourceSpec* will be aggregated  for chained 
operators.
  - When *JobManager* requests slot for submitting task from *ResourceManager*, 
the *ResourceProfile* will be expanded  to correspondence with *ResourceSpec*.
  - The *ResourceManager* requests resource for container from cluster, it 
should consider extra framework memory except for slot *ResourceProfile*.
  - The framework memory is mainly used by *NetworkBufferPool* and 
*MemoryManager* in *TaskManager*, and it can be configured in job level.
  - Apart from resource, The JVM options attached with container should be 
supported and could also be configured in job level.

This feature will be implemented directly into flip-6 branch.


> Fine-grained Resource Configuration
> ---
>
> Key: FLINK-5131
> URL: https://issues.apache.org/jira/browse/FLINK-5131
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API, DataStream API, JobManager, ResourceManager
>Reporter: Zhijiang Wang
>
> Normally the UDF just creates short-life small objects and these can be 
> recycled quickly by JVM, so most of the memory resource is controlled and 
> managed by *TaskManager* framework.  But for some special cases, the UDF may 
> consume much resource to create long-live big objects, so it is necessary to 
> provide the options for professional users to define the resource usages if 
> needed.
> The basic approach is the following:
>   - Introduce the *ResourceSpec* structure to describe the different resource 
> factors (cpu cores, heap memory, direct memory, native memory, etc) and 
> provide some basic construction methods for resource group.
>   - The *ResourceSpec* can be setted onto the internal transformation in 
> DataStream and base operator in DataSet separately.
>   - In job graph generation, the *ResourceSpec* will be aggregated  for 
> chained operators.
>   - When *JobManager* requests slot for submitting task from 
> *ResourceManager*, the *ResourceProfile* will be expanded  to correspond with 
> *ResourceSpec*.
>   - The *ResourceManager* requests resource for container from cluster, it 
> should consider extra framework memory except for slot *ResourceProfile*.
>   - The framework memory is mainly used by *NetworkBufferPool* and 
> *MemoryManager* in *TaskManager*, and it can be configured in job level.
>   - Apart from resource, The JVM options attached with 

[jira] [Updated] (FLINK-5135) ResourceProfile for slot request should be expanded to correspond with ResourceSpec

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5135:
-
Summary: ResourceProfile for slot request should be expanded to correspond 
with ResourceSpec  (was: ResourceProfile should be expanded to correspond with 
ResourceSpec)

> ResourceProfile for slot request should be expanded to correspond with 
> ResourceSpec
> ---
>
> Key: FLINK-5135
> URL: https://issues.apache.org/jira/browse/FLINK-5135
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, ResourceManager
>Reporter: Zhijiang Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5136) Aggregation of slot ResourceProfile and framework memory by ResourceManager

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5136:
-
Summary: Aggregation of slot ResourceProfile and framework memory by 
ResourceManager  (was: Aggregation of slot ResourceProfile and framework memory 
for requesting resource from cluster by ResourceManager)

> Aggregation of slot ResourceProfile and framework memory by ResourceManager
> ---
>
> Key: FLINK-5136
> URL: https://issues.apache.org/jira/browse/FLINK-5136
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Zhijiang Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5139) consum kafka can set offset

2016-11-22 Thread jing lining (JIRA)
jing lining created FLINK-5139:
--

 Summary: consum kafka can set offset
 Key: FLINK-5139
 URL: https://issues.apache.org/jira/browse/FLINK-5139
 Project: Flink
  Issue Type: Improvement
  Components: Kafka Connector
Affects Versions: 1.1.3
Reporter: jing lining


now get offset by this 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08#getInvalidOffsetBehavior
 class

whether can put in offset which is number



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5133) Add new setResource API for DataStream and DataSet

2016-11-22 Thread Zhijiang Wang (JIRA)
Zhijiang Wang created FLINK-5133:


 Summary: Add new setResource API for DataStream and DataSet
 Key: FLINK-5133
 URL: https://issues.apache.org/jira/browse/FLINK-5133
 Project: Flink
  Issue Type: Sub-task
  Components: DataSet API, DataStream API
Reporter: Zhijiang Wang






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating stream graph

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5134:
-
Description: In datastream API,

> Aggregate ResourceSpe for chained operators when generating stream graph
> 
>
> Key: FLINK-5134
> URL: https://issues.apache.org/jira/browse/FLINK-5134
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataStream API
>Reporter: Zhijiang Wang
>
> In datastream API,



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating job graph

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5134:
-
Description: 
In *JobGraph* generation, each *JobVertex* corresponds to a series of chained 
operators.
The resource of *JobVertex* should be aggregation of individual resource in 
chained operators.
For memory resource in *JobVertex*, the aggregation is the sum formula for 
chained operators. And for cpu cores resource in *JobVertex*, the aggregation 
is the maximum formula for chained operators.

  was:In DataStream API, the *ResourceSpec* is setted onto the internal 
transformation


> Aggregate ResourceSpe for chained operators when generating job graph
> -
>
> Key: FLINK-5134
> URL: https://issues.apache.org/jira/browse/FLINK-5134
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataStream API
>Reporter: Zhijiang Wang
>
> In *JobGraph* generation, each *JobVertex* corresponds to a series of chained 
> operators.
> The resource of *JobVertex* should be aggregation of individual resource in 
> chained operators.
> For memory resource in *JobVertex*, the aggregation is the sum formula for 
> chained operators. And for cpu cores resource in *JobVertex*, the aggregation 
> is the maximum formula for chained operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89247312
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -39,66 +45,69 @@ object AggregateUtil {
   type JavaList[T] = java.util.List[T]
 
   /**
-   * Create Flink operator functions for aggregates. It includes 2 
implementations of Flink 
-   * operator functions:
-   * [[org.apache.flink.api.common.functions.MapFunction]] and 
-   * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's 
partial aggregate,
-   * should also implement 
[[org.apache.flink.api.common.functions.CombineFunction]] as well). 
-   * The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the 
-   * intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
-   * format:
-   *
-   * {{{
-   *   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
-   * |  |
-   * v  v
-   *+-+-+++++
-   *|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
-   *+-+-+++++
-   *  ^
-   *  |
-   *   sum(y) aggOffsetInRow = 4
-   * }}}
-   *
-   */
-  def createOperatorFunctionsForAggregates(
-  namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-  inputType: RelDataType,
-  outputType: RelDataType,
-  groupings: Array[Int])
-: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = {
-
-val aggregateFunctionsAndFieldIndexes =
-  transformToAggregateFunctions(namedAggregates.map(_.getKey), 
inputType, groupings.length)
-// store the aggregate fields of each aggregate function, by the same 
order of aggregates.
-val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
-val aggregates = aggregateFunctionsAndFieldIndexes._2
+* Create prepare MapFunction for aggregates.
+* The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the
+* intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
+* format:
+*
+* {{{
+*   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+* |  |
+* v  v
+*+-+-+++++
+*|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
+*+-+-+++++
+*  ^
+*  |
+*   sum(y) aggOffsetInRow = 4
+* }}}
+*
+*/
+  private[flink] def createPrepareMapFunction(
+aggregates: Array[Aggregate[_ <: Any]],
--- End diff --

Cool,I think  use Calcite's namedAggregates is a good idea.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688606#comment-15688606
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89247312
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -39,66 +45,69 @@ object AggregateUtil {
   type JavaList[T] = java.util.List[T]
 
   /**
-   * Create Flink operator functions for aggregates. It includes 2 
implementations of Flink 
-   * operator functions:
-   * [[org.apache.flink.api.common.functions.MapFunction]] and 
-   * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's 
partial aggregate,
-   * should also implement 
[[org.apache.flink.api.common.functions.CombineFunction]] as well). 
-   * The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the 
-   * intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
-   * format:
-   *
-   * {{{
-   *   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
-   * |  |
-   * v  v
-   *+-+-+++++
-   *|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
-   *+-+-+++++
-   *  ^
-   *  |
-   *   sum(y) aggOffsetInRow = 4
-   * }}}
-   *
-   */
-  def createOperatorFunctionsForAggregates(
-  namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-  inputType: RelDataType,
-  outputType: RelDataType,
-  groupings: Array[Int])
-: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = {
-
-val aggregateFunctionsAndFieldIndexes =
-  transformToAggregateFunctions(namedAggregates.map(_.getKey), 
inputType, groupings.length)
-// store the aggregate fields of each aggregate function, by the same 
order of aggregates.
-val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
-val aggregates = aggregateFunctionsAndFieldIndexes._2
+* Create prepare MapFunction for aggregates.
+* The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the
+* intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
+* format:
+*
+* {{{
+*   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+* |  |
+* v  v
+*+-+-+++++
+*|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
+*+-+-+++++
+*  ^
+*  |
+*   sum(y) aggOffsetInRow = 4
+* }}}
+*
+*/
+  private[flink] def createPrepareMapFunction(
+aggregates: Array[Aggregate[_ <: Any]],
--- End diff --

Cool,I think  use Calcite's namedAggregates is a good idea.


> Add incremental group window aggregation for streaming Table API
> 
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an 
> incremental fashion. This means that the window collects all records and 
> performs the aggregation when the window is closed instead of eagerly 
> updating a partial aggregate for every added record. Since records are 
> buffered, non-incremental aggregation requires more storage space than 
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API 
> features [incremental 
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
>  using a {{ReduceFunction}}.
> We should add support for incremental aggregation in group-windows.
> This is a follow-up task of 

[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89247362
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -39,66 +45,69 @@ object AggregateUtil {
   type JavaList[T] = java.util.List[T]
 
   /**
-   * Create Flink operator functions for aggregates. It includes 2 
implementations of Flink 
-   * operator functions:
-   * [[org.apache.flink.api.common.functions.MapFunction]] and 
-   * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's 
partial aggregate,
-   * should also implement 
[[org.apache.flink.api.common.functions.CombineFunction]] as well). 
-   * The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the 
-   * intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
-   * format:
-   *
-   * {{{
-   *   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
-   * |  |
-   * v  v
-   *+-+-+++++
-   *|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
-   *+-+-+++++
-   *  ^
-   *  |
-   *   sum(y) aggOffsetInRow = 4
-   * }}}
-   *
-   */
-  def createOperatorFunctionsForAggregates(
-  namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-  inputType: RelDataType,
-  outputType: RelDataType,
-  groupings: Array[Int])
-: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = {
-
-val aggregateFunctionsAndFieldIndexes =
-  transformToAggregateFunctions(namedAggregates.map(_.getKey), 
inputType, groupings.length)
-// store the aggregate fields of each aggregate function, by the same 
order of aggregates.
-val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
-val aggregates = aggregateFunctionsAndFieldIndexes._2
+* Create prepare MapFunction for aggregates.
+* The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the
+* intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
+* format:
+*
+* {{{
+*   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+* |  |
+* v  v
+*+-+-+++++
+*|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
+*+-+-+++++
+*  ^
+*  |
+*   sum(y) aggOffsetInRow = 4
+* }}}
+*
+*/
+  private[flink] def createPrepareMapFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+aggFieldIndexes: Array[Int],
+groupings: Array[Int],
+inputType:
+RelDataType): MapFunction[Any, Row] = {
 
 val mapReturnType: RowTypeInfo =
   createAggregateBufferDataType(groupings, aggregates, inputType)
 
 val mapFunction = new AggregateMapFunction[Row, Row](
-aggregates, aggFieldIndexes, groupings,
-
mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
-
-// the mapping relation between field index of intermediate aggregate 
Row and output Row.
-val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
-
-// the mapping relation between aggregate function index in list and 
its corresponding
-// field index in output Row.
-val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
-
-if (groupingOffsetMapping.length != groupings.length ||
-aggOffsetMapping.length != namedAggregates.length) {
-  throw new TableException("Could not find output field in input data 
type " +
-  "or aggregate functions.")
-}
-
-val allPartialAggregate = aggregates.map(_.supportPartial).forall(x => 
x)
+  aggregates,
+  aggFieldIndexes,
+  groupings,
+  
mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
 
-val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+

[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688608#comment-15688608
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89247362
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -39,66 +45,69 @@ object AggregateUtil {
   type JavaList[T] = java.util.List[T]
 
   /**
-   * Create Flink operator functions for aggregates. It includes 2 
implementations of Flink 
-   * operator functions:
-   * [[org.apache.flink.api.common.functions.MapFunction]] and 
-   * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's 
partial aggregate,
-   * should also implement 
[[org.apache.flink.api.common.functions.CombineFunction]] as well). 
-   * The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the 
-   * intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
-   * format:
-   *
-   * {{{
-   *   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
-   * |  |
-   * v  v
-   *+-+-+++++
-   *|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
-   *+-+-+++++
-   *  ^
-   *  |
-   *   sum(y) aggOffsetInRow = 4
-   * }}}
-   *
-   */
-  def createOperatorFunctionsForAggregates(
-  namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-  inputType: RelDataType,
-  outputType: RelDataType,
-  groupings: Array[Int])
-: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = {
-
-val aggregateFunctionsAndFieldIndexes =
-  transformToAggregateFunctions(namedAggregates.map(_.getKey), 
inputType, groupings.length)
-// store the aggregate fields of each aggregate function, by the same 
order of aggregates.
-val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
-val aggregates = aggregateFunctionsAndFieldIndexes._2
+* Create prepare MapFunction for aggregates.
+* The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the
+* intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
+* format:
+*
+* {{{
+*   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+* |  |
+* v  v
+*+-+-+++++
+*|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
+*+-+-+++++
+*  ^
+*  |
+*   sum(y) aggOffsetInRow = 4
+* }}}
+*
+*/
+  private[flink] def createPrepareMapFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+aggFieldIndexes: Array[Int],
+groupings: Array[Int],
+inputType:
+RelDataType): MapFunction[Any, Row] = {
 
 val mapReturnType: RowTypeInfo =
   createAggregateBufferDataType(groupings, aggregates, inputType)
 
 val mapFunction = new AggregateMapFunction[Row, Row](
-aggregates, aggFieldIndexes, groupings,
-
mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
-
-// the mapping relation between field index of intermediate aggregate 
Row and output Row.
-val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
-
-// the mapping relation between aggregate function index in list and 
its corresponding
-// field index in output Row.
-val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
-
-if (groupingOffsetMapping.length != groupings.length ||
-aggOffsetMapping.length != namedAggregates.length) {
-  throw new TableException("Could not find output field in input data 
type " +
-  "or aggregate functions.")
-}
-
-val allPartialAggregate = aggregates.map(_.supportPartial).forall(x => 
x)
+  aggregates,
+  aggFieldIndexes,
  

[jira] [Updated] (FLINK-5131) Fine-grained Resource Configuration

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5131:
-
Description: 
Normally the UDF just creates short-life small objects and these can be 
recycled quickly by JVM, so most of the memory resource is controlled and 
managed by *TaskManager* framework.  But for some special cases, the UDF may 
consume much resource to create long-live big objects, so it is necessary to 
provide the options for professional users to define the resource usages if 
needed.

The basic approach is the following:
  - Introduce the *ResourceSpec* structure to describe the different resource 
factors (cpu cores, heap memory, direct memory, native memory, etc) and provide 
some basic construction methods for resource group.
  - The *ResourceSpec* can be setted onto the internal transformation in 
DataStream and base operator in DataSet separately.
  - In stream graph generation, the *ResourceSpec* will be aggregated  for 
chained operators.
  - When *JobManager* requests slot for submitting task from *ResourceManager*, 
the *ResourceProfile* will be expanded  to correspondence with *ResourceSpec*.
  - The *ResourceManager* requests resource for container from cluster, it 
should consider extra framework memory except for slot *ResourceProfile*.
  - The framework memory is mainly used by *NetworkBufferPool* and 
*MemoryManager* in *TaskManager*, and it can be configured in job level.
  - Apart from resource, The JVM options attached with container should be 
supported and could also be configured in job level.

> Fine-grained Resource Configuration
> ---
>
> Key: FLINK-5131
> URL: https://issues.apache.org/jira/browse/FLINK-5131
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API, DataStream API, JobManager, ResourceManager
>Reporter: Zhijiang Wang
>
> Normally the UDF just creates short-life small objects and these can be 
> recycled quickly by JVM, so most of the memory resource is controlled and 
> managed by *TaskManager* framework.  But for some special cases, the UDF may 
> consume much resource to create long-live big objects, so it is necessary to 
> provide the options for professional users to define the resource usages if 
> needed.
> The basic approach is the following:
>   - Introduce the *ResourceSpec* structure to describe the different resource 
> factors (cpu cores, heap memory, direct memory, native memory, etc) and 
> provide some basic construction methods for resource group.
>   - The *ResourceSpec* can be setted onto the internal transformation in 
> DataStream and base operator in DataSet separately.
>   - In stream graph generation, the *ResourceSpec* will be aggregated  for 
> chained operators.
>   - When *JobManager* requests slot for submitting task from 
> *ResourceManager*, the *ResourceProfile* will be expanded  to correspondence 
> with *ResourceSpec*.
>   - The *ResourceManager* requests resource for container from cluster, it 
> should consider extra framework memory except for slot *ResourceProfile*.
>   - The framework memory is mainly used by *NetworkBufferPool* and 
> *MemoryManager* in *TaskManager*, and it can be configured in job level.
>   - Apart from resource, The JVM options attached with container should be 
> supported and could also be configured in job level.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688781#comment-15688781
 ] 

ASF GitHub Bot commented on FLINK-4469:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r89252908
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -135,6 +138,32 @@ object UserDefinedFunctionUtils {
   }
 
   /**
+* Returns eval method matching the given signature of 
[[TypeInformation]].
+*/
+  def getEvalMethod(
+function: EvaluableFunction,
--- End diff --

hi @fhueske When we declare the ScalarFunction and TableFunction as 
follows: trait ScalarFunction, trait TableFunction [T], it will become very 
useful. The current version is optional.


> Add support for user defined table function in Table API & SQL
> --
>
> Key: FLINK-4469
> URL: https://issues.apache.org/jira/browse/FLINK-4469
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Normal user-defined functions, such as concat(), take in a single input row 
> and output a single output row. In contrast, table-generating functions 
> transform a single input row to multiple output rows. It is very useful in 
> some cases, such as look up in HBase by rowkey and return one or more rows.
> Adding a user defined table function should:
> 1. inherit from UDTF class with specific generic type T
> 2. define one or more evel function. 
> NOTE: 
> 1. the eval method must be public and non-static.
> 2. the generic type T is the row type returned by table function. Because of 
> Java type erasure, we can’t extract T from the Iterable.
> 3. use {{collect(T)}} to emit table row
> 4. eval method can be overload. Blink will choose the best match eval method 
> to call according to parameter types and number.
> {code}
> public class Word {
>   public String word;
>   public Integer length;
> }
> public class SplitStringUDTF extends UDTF {
> public Iterable eval(String str) {
> if (str != null) {
> for (String s : str.split(",")) {
> collect(new Word(s, s.length()));
> }
> }
> }
> }
> // in SQL
> tableEnv.registerFunction("split", new SplitStringUDTF())
> tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS 
> t(w,l)")
> // in Java Table API
> tableEnv.registerFunction("split", new SplitStringUDTF())
> // rename split table columns to “w” and “l”
> table.crossApply("split(c) as (w, l)")
>  .select("a, b, w, l")
> // without renaming, we will use the origin field names in the POJO/case/...
> table.crossApply("split(c)")
>  .select("a, b, word, length")
> // in Scala Table API
> val split = new SplitStringUDTF()
> table.crossApply(split('c) as ('w, 'l))
>  .select('a, 'b, 'w, 'l)
> // outerApply for outer join to a UDTF
> table.outerApply(split('c))
>  .select('a, 'b, 'word, 'length)
> {code}
> See [1] for more information about UDTF design.
> [1] 
> https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit#



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-22 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r89252908
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -135,6 +138,32 @@ object UserDefinedFunctionUtils {
   }
 
   /**
+* Returns eval method matching the given signature of 
[[TypeInformation]].
+*/
+  def getEvalMethod(
+function: EvaluableFunction,
--- End diff --

hi @fhueske When we declare the ScalarFunction and TableFunction as 
follows: trait ScalarFunction, trait TableFunction [T], it will become very 
useful. The current version is optional.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating job graph

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5134:
-
Summary: Aggregate ResourceSpe for chained operators when generating job 
graph  (was: Aggregate ResourceSpe for chained operators when generating stream 
graph)

> Aggregate ResourceSpe for chained operators when generating job graph
> -
>
> Key: FLINK-5134
> URL: https://issues.apache.org/jira/browse/FLINK-5134
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataStream API
>Reporter: Zhijiang Wang
>
> In DataStream API, the *ResourceSpec* is setted onto the internal 
> transformation



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating stream graph

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5134:
-
Description: In DataStream API, the *ResourceSpec* is setted onto the 
internal transformation  (was: In datastream API,)

> Aggregate ResourceSpe for chained operators when generating stream graph
> 
>
> Key: FLINK-5134
> URL: https://issues.apache.org/jira/browse/FLINK-5134
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataStream API
>Reporter: Zhijiang Wang
>
> In DataStream API, the *ResourceSpec* is setted onto the internal 
> transformation



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5136) ResourceManager should consider both slot resource profile and framework memory for requesting resource from cluster

2016-11-22 Thread Zhijiang Wang (JIRA)
Zhijiang Wang created FLINK-5136:


 Summary: ResourceManager should consider both slot resource 
profile and framework memory for requesting resource from cluster
 Key: FLINK-5136
 URL: https://issues.apache.org/jira/browse/FLINK-5136
 Project: Flink
  Issue Type: Sub-task
  Components: ResourceManager
Reporter: Zhijiang Wang






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5131) Fine-grained Resource Configuration

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5131:
-
Description: 
Normally the UDF just creates short-life small objects and these can be 
recycled quickly by JVM, so most of the memory resource is controlled and 
managed by *TaskManager* framework.  But for some special cases, the UDF may 
consume much resource to create long-live big objects, so it is necessary to 
provide the options for professional users to define the resource usages if 
needed.

The basic approach is the following:
  - Introduce the *ResourceSpec* structure to describe the different resource 
factors (cpu cores, heap memory, direct memory, native memory, etc) and provide 
some basic construction methods for resource group.
  - The *ResourceSpec* can be setted onto the internal transformation in 
DataStream and base operator in DataSet separately.
  - In job graph generation, the *ResourceSpec* will be aggregated  for chained 
operators.
  - When *JobManager* requests slot for submitting task from *ResourceManager*, 
the *ResourceProfile* will be expanded  to correspondence with *ResourceSpec*.
  - The *ResourceManager* requests resource for container from cluster, it 
should consider extra framework memory except for slot *ResourceProfile*.
  - The framework memory is mainly used by *NetworkBufferPool* and 
*MemoryManager* in *TaskManager*, and it can be configured in job level.
  - Apart from resource, The JVM options attached with container should be 
supported and could also be configured in job level.

This feature will be implemented directly into flip-6 branch.

  was:
Normally the UDF just creates short-life small objects and these can be 
recycled quickly by JVM, so most of the memory resource is controlled and 
managed by *TaskManager* framework.  But for some special cases, the UDF may 
consume much resource to create long-live big objects, so it is necessary to 
provide the options for professional users to define the resource usages if 
needed.

The basic approach is the following:
  - Introduce the *ResourceSpec* structure to describe the different resource 
factors (cpu cores, heap memory, direct memory, native memory, etc) and provide 
some basic construction methods for resource group.
  - The *ResourceSpec* can be setted onto the internal transformation in 
DataStream and base operator in DataSet separately.
  - In stream graph generation, the *ResourceSpec* will be aggregated  for 
chained operators.
  - When *JobManager* requests slot for submitting task from *ResourceManager*, 
the *ResourceProfile* will be expanded  to correspondence with *ResourceSpec*.
  - The *ResourceManager* requests resource for container from cluster, it 
should consider extra framework memory except for slot *ResourceProfile*.
  - The framework memory is mainly used by *NetworkBufferPool* and 
*MemoryManager* in *TaskManager*, and it can be configured in job level.
  - Apart from resource, The JVM options attached with container should be 
supported and could also be configured in job level.

This feature will be implemented directly into flip-6 branch.


> Fine-grained Resource Configuration
> ---
>
> Key: FLINK-5131
> URL: https://issues.apache.org/jira/browse/FLINK-5131
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API, DataStream API, JobManager, ResourceManager
>Reporter: Zhijiang Wang
>
> Normally the UDF just creates short-life small objects and these can be 
> recycled quickly by JVM, so most of the memory resource is controlled and 
> managed by *TaskManager* framework.  But for some special cases, the UDF may 
> consume much resource to create long-live big objects, so it is necessary to 
> provide the options for professional users to define the resource usages if 
> needed.
> The basic approach is the following:
>   - Introduce the *ResourceSpec* structure to describe the different resource 
> factors (cpu cores, heap memory, direct memory, native memory, etc) and 
> provide some basic construction methods for resource group.
>   - The *ResourceSpec* can be setted onto the internal transformation in 
> DataStream and base operator in DataSet separately.
>   - In job graph generation, the *ResourceSpec* will be aggregated  for 
> chained operators.
>   - When *JobManager* requests slot for submitting task from 
> *ResourceManager*, the *ResourceProfile* will be expanded  to correspondence 
> with *ResourceSpec*.
>   - The *ResourceManager* requests resource for container from cluster, it 
> should consider extra framework memory except for slot *ResourceProfile*.
>   - The framework memory is mainly used by *NetworkBufferPool* and 
> *MemoryManager* in *TaskManager*, and it can be configured in job level.
>   - Apart from resource, The JVM options attached 

[jira] [Updated] (FLINK-5131) Fine-grained Resource Configuration

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5131:
-
Description: 
Normally the UDF just creates short-life small objects and these can be 
recycled quickly by JVM, so most of the memory resource is controlled and 
managed by *TaskManager* framework.  But for some special cases, the UDF may 
consume much resource to create long-live big objects, so it is necessary to 
provide the options for professional users to define the resource usages if 
needed.

The basic approach is the following:
  - Introduce the *ResourceSpec* structure to describe the different resource 
factors (cpu cores, heap memory, direct memory, native memory, etc) and provide 
some basic construction methods for resource group.
  - The *ResourceSpec* can be setted onto the internal transformation in 
DataStream and base operator in DataSet separately.
  - The *StateBackend* should provide the method to estimate the memory usage 
based on hint of state size in *ResourceSpec*.
  - In job graph generation, the *ResourceSpec* will be aggregated  for chained 
operators.
  - When *JobManager* requests slot for submitting task from *ResourceManager*, 
the *ResourceProfile* will be expanded  to correspond with *ResourceSpec*.
  - The *ResourceManager* requests resource for container from cluster, it 
should consider extra framework memory except for slot *ResourceProfile*.
  - The framework memory is mainly used by *NetworkBufferPool* and 
*MemoryManager* in *TaskManager*, and it can be configured in job level.
  - Apart from resource, The JVM options attached with container should be 
supported and could also be configured in job level.

This feature will be implemented directly into flip-6 branch.

  was:
Normally the UDF just creates short-life small objects and these can be 
recycled quickly by JVM, so most of the memory resource is controlled and 
managed by *TaskManager* framework.  But for some special cases, the UDF may 
consume much resource to create long-live big objects, so it is necessary to 
provide the options for professional users to define the resource usages if 
needed.

The basic approach is the following:
  - Introduce the *ResourceSpec* structure to describe the different resource 
factors (cpu cores, heap memory, direct memory, native memory, etc) and provide 
some basic construction methods for resource group.
  - The *ResourceSpec* can be setted onto the internal transformation in 
DataStream and base operator in DataSet separately.
  - In job graph generation, the *ResourceSpec* will be aggregated  for chained 
operators.
  - When *JobManager* requests slot for submitting task from *ResourceManager*, 
the *ResourceProfile* will be expanded  to correspond with *ResourceSpec*.
  - The *ResourceManager* requests resource for container from cluster, it 
should consider extra framework memory except for slot *ResourceProfile*.
  - The framework memory is mainly used by *NetworkBufferPool* and 
*MemoryManager* in *TaskManager*, and it can be configured in job level.
  - Apart from resource, The JVM options attached with container should be 
supported and could also be configured in job level.

This feature will be implemented directly into flip-6 branch.


> Fine-grained Resource Configuration
> ---
>
> Key: FLINK-5131
> URL: https://issues.apache.org/jira/browse/FLINK-5131
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API, DataStream API, JobManager, ResourceManager
>Reporter: Zhijiang Wang
>
> Normally the UDF just creates short-life small objects and these can be 
> recycled quickly by JVM, so most of the memory resource is controlled and 
> managed by *TaskManager* framework.  But for some special cases, the UDF may 
> consume much resource to create long-live big objects, so it is necessary to 
> provide the options for professional users to define the resource usages if 
> needed.
> The basic approach is the following:
>   - Introduce the *ResourceSpec* structure to describe the different resource 
> factors (cpu cores, heap memory, direct memory, native memory, etc) and 
> provide some basic construction methods for resource group.
>   - The *ResourceSpec* can be setted onto the internal transformation in 
> DataStream and base operator in DataSet separately.
>   - The *StateBackend* should provide the method to estimate the memory usage 
> based on hint of state size in *ResourceSpec*.
>   - In job graph generation, the *ResourceSpec* will be aggregated  for 
> chained operators.
>   - When *JobManager* requests slot for submitting task from 
> *ResourceManager*, the *ResourceProfile* will be expanded  to correspond with 
> *ResourceSpec*.
>   - The *ResourceManager* requests resource for container from cluster, it 
> should consider extra framework 

[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688661#comment-15688661
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89249046
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = {
+
+// the mapping relation between field index of intermediate aggregate 
Row and output Row.
+val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
+
+// the mapping relation between aggregate function index in list and 
its corresponding
+// field index in output Row.
+val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
 
-(mapFunction, reduceGroupFunction)
+if (groupingOffsetMapping.length != groupings.length ||
+  aggOffsetMapping.length != namedAggregates.length) {
+  throw new TableException(
+"Could not find output field in input data type " +
+  "or aggregate functions.")
+}
+(groupingOffsetMapping, aggOffsetMapping)
+  }
+
+
+  private[flink] def createAllWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
--- End diff --

yes.


> Add incremental group window aggregation for streaming Table API
> 
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an 
> incremental fashion. This means that the window collects all records and 
> performs the aggregation when the window is closed instead of eagerly 
> updating a partial aggregate for every added record. Since records are 
> buffered, non-incremental aggregation requires more storage space than 
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API 
> features [incremental 
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
>  using a {{ReduceFunction}}.
> We should add support for incremental aggregation in group-windows.
> This is a follow-up task of FLINK-4691.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89249046
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = {
+
+// the mapping relation between field index of intermediate aggregate 
Row and output Row.
+val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
+
+// the mapping relation between aggregate function index in list and 
its corresponding
+// field index in output Row.
+val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
 
-(mapFunction, reduceGroupFunction)
+if (groupingOffsetMapping.length != groupings.length ||
+  aggOffsetMapping.length != namedAggregates.length) {
+  throw new TableException(
+"Could not find output field in input data type " +
+  "or aggregate functions.")
+}
+(groupingOffsetMapping, aggOffsetMapping)
+  }
+
+
+  private[flink] def createAllWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
--- End diff --

yes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5061) Remove ContinuousEventTimeTrigger

2016-11-22 Thread Manu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688654#comment-15688654
 ] 

Manu Zhang commented on FLINK-5061:
---

Agreed. Just some of my thoughts and should not block the changes. 

> Remove ContinuousEventTimeTrigger
> -
>
> Key: FLINK-5061
> URL: https://issues.apache.org/jira/browse/FLINK-5061
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.2.0
>
>
> The Trigger does not do what people think it does. The problem is that a 
> watermark T signals that we won't see elements with timestamp < T in the 
> future. It does not signal that we have not yet seen elements with timestamp 
> > T. So it cannot really be used to trigger at different stages of processing 
> an event-time window.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating stream graph

2016-11-22 Thread Zhijiang Wang (JIRA)
Zhijiang Wang created FLINK-5134:


 Summary: Aggregate ResourceSpe for chained operators when 
generating stream graph
 Key: FLINK-5134
 URL: https://issues.apache.org/jira/browse/FLINK-5134
 Project: Flink
  Issue Type: Sub-task
  Components: DataStream API
Reporter: Zhijiang Wang






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5135) ResourceProfile should be expanded to correspond with ResourceSpec

2016-11-22 Thread Zhijiang Wang (JIRA)
Zhijiang Wang created FLINK-5135:


 Summary: ResourceProfile should be expanded to correspond with 
ResourceSpec
 Key: FLINK-5135
 URL: https://issues.apache.org/jira/browse/FLINK-5135
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager, ResourceManager
Reporter: Zhijiang Wang






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5137) Config JVM options and set onto the TaskManager container

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5137:
-
Description: The users can config JVM options in job level. And the 
*ResourceManager* will set related JVM options for launching *TaskManager* 
container based on framework memory and slot resource profile.

> Config JVM options and set onto the TaskManager container
> -
>
> Key: FLINK-5137
> URL: https://issues.apache.org/jira/browse/FLINK-5137
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager, TaskManager
>Reporter: Zhijiang Wang
>
> The users can config JVM options in job level. And the *ResourceManager* will 
> set related JVM options for launching *TaskManager* container based on 
> framework memory and slot resource profile.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5136) Aggregation of slot ResourceProfile and framework memory by ResourceManager

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang reassigned FLINK-5136:


Assignee: Zhijiang Wang

> Aggregation of slot ResourceProfile and framework memory by ResourceManager
> ---
>
> Key: FLINK-5136
> URL: https://issues.apache.org/jira/browse/FLINK-5136
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> When *ResourceManager* requests container resource from cluster, the 
> framework memory should be considered together with slot resource.
> Currently the framework memory is mainly used by *MemoryManager* and 
> *NetworkBufferPool* in *TaskManager*, and this memory can be got from 
> configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5135) ResourceProfile for slot request should be expanded to correspond with ResourceSpec

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang reassigned FLINK-5135:


Assignee: Zhijiang Wang

> ResourceProfile for slot request should be expanded to correspond with 
> ResourceSpec
> ---
>
> Key: FLINK-5135
> URL: https://issues.apache.org/jira/browse/FLINK-5135
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, ResourceManager
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> The *JobManager* requests slot by *ResourceProfile* from *ResourceManager* 
> before submitting tasks. Currently the *ResourceProfile* only contains cpu 
> cores and memory properties. The memory should be expanded to different types 
> such as heap memory, direct memory and native memory which corresponds with 
> memory in *ResourceSpec*.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating job graph

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang reassigned FLINK-5134:


Assignee: Zhijiang Wang

> Aggregate ResourceSpe for chained operators when generating job graph
> -
>
> Key: FLINK-5134
> URL: https://issues.apache.org/jira/browse/FLINK-5134
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataStream API
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> In *JobGraph* generation, each *JobVertex* corresponds to a series of chained 
> operators.
> The resource of *JobVertex* should be aggregation of individual resource in 
> chained operators.
> For memory resource in *JobVertex*, the aggregation is the sum formula for 
> chained operators. 
> And for cpu cores resource in *JobVertex*, the aggregation is the maximum 
> formula for chained operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5133) Add new setResource API for DataStream and DataSet

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang reassigned FLINK-5133:


Assignee: Zhijiang Wang

> Add new setResource API for DataStream and DataSet
> --
>
> Key: FLINK-5133
> URL: https://issues.apache.org/jira/browse/FLINK-5133
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataSet API, DataStream API
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> This is part of the fine-grained resource configuration.
> For *DataStream*, the *setResource* API will be setted onto 
> *SingleOutputStreamOperator* similar with other existing properties like 
> parallelism, name, etc.
> For *DataSet*, the *setResource* API will be setted onto *Operator* in the 
> similar way.
> There are two parameters described with minimum *ResourceSpec* and maximum 
> *ResourceSpec* separately in the API for considering resource resize in 
> future improvements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5132) Introduce the ResourceSpec for grouping different resource factors in API

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang reassigned FLINK-5132:


Assignee: Zhijiang Wang

> Introduce the ResourceSpec for grouping different resource factors in API
> -
>
> Key: FLINK-5132
> URL: https://issues.apache.org/jira/browse/FLINK-5132
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> This is part of the fine-grained resource configuration.
> The current resource factors include cpu cores, heap memory, direct memory, 
> native memory and state size.
> The *ResourceSpec* will provide some basic constructions for grouping 
> different resource factors as needed and the construction can also be 
> expanded easily for further requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5137) Config JVM options and set onto the TaskManager container

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang reassigned FLINK-5137:


Assignee: Zhijiang Wang

> Config JVM options and set onto the TaskManager container
> -
>
> Key: FLINK-5137
> URL: https://issues.apache.org/jira/browse/FLINK-5137
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager, TaskManager
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> The users can config JVM options in job level. And the *ResourceManager* will 
> set related JVM options for launching *TaskManager* container based on 
> framework memory and slot resource profile.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688612#comment-15688612
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89247407
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = {
+
+// the mapping relation between field index of intermediate aggregate 
Row and output Row.
+val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
+
+// the mapping relation between aggregate function index in list and 
its corresponding
+// field index in output Row.
+val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
 
-(mapFunction, reduceGroupFunction)
+if (groupingOffsetMapping.length != groupings.length ||
+  aggOffsetMapping.length != namedAggregates.length) {
+  throw new TableException(
+"Could not find output field in input data type " +
+  "or aggregate functions.")
+}
+(groupingOffsetMapping, aggOffsetMapping)
+  }
+
+
+  private[flink] def createAllWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
+} else {
+  new AggregateAllWindowFunction(aggFunction)
+}
+
+  }
+
+
+  private[flink] def createWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
+} else {
+  new AggregateWindowFunction(aggFunction)
+}
+
+  }
+
+  private[flink] def createAllWindowIncrementalAggregationFunction(
+window: LogicalWindow,
+aggregates: Array[Aggregate[_ <: Any]],
--- End diff --

sure. agree.


> Add incremental group window aggregation for streaming Table API
> 
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>

[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688613#comment-15688613
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89247416
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = {
+
+// the mapping relation between field index of intermediate aggregate 
Row and output Row.
+val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
+
+// the mapping relation between aggregate function index in list and 
its corresponding
+// field index in output Row.
+val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
 
-(mapFunction, reduceGroupFunction)
+if (groupingOffsetMapping.length != groupings.length ||
+  aggOffsetMapping.length != namedAggregates.length) {
+  throw new TableException(
+"Could not find output field in input data type " +
+  "or aggregate functions.")
+}
+(groupingOffsetMapping, aggOffsetMapping)
+  }
+
+
+  private[flink] def createAllWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
+} else {
+  new AggregateAllWindowFunction(aggFunction)
+}
+
+  }
+
+
+  private[flink] def createWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
+} else {
+  new AggregateWindowFunction(aggFunction)
+}
+
+  }
+
+  private[flink] def createAllWindowIncrementalAggregationFunction(
+window: LogicalWindow,
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int],
+properties: Seq[NamedWindowProperty])
+  : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+val (groupingOffsetMapping, aggOffsetMapping) =
+  getGroupingOffsetAndaggOffsetMapping(
+  namedAggregates,
+  inputType,

[jira] [Created] (FLINK-5131) Fine-grained Resource Configuration

2016-11-22 Thread Zhijiang Wang (JIRA)
Zhijiang Wang created FLINK-5131:


 Summary: Fine-grained Resource Configuration
 Key: FLINK-5131
 URL: https://issues.apache.org/jira/browse/FLINK-5131
 Project: Flink
  Issue Type: New Feature
  Components: DataSet API, DataStream API, JobManager, ResourceManager
Reporter: Zhijiang Wang






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5137) Config JVM options and set onto the TaskManager container

2016-11-22 Thread Zhijiang Wang (JIRA)
Zhijiang Wang created FLINK-5137:


 Summary: Config JVM options and set onto the TaskManager container
 Key: FLINK-5137
 URL: https://issues.apache.org/jira/browse/FLINK-5137
 Project: Flink
  Issue Type: Sub-task
  Components: ResourceManager, TaskManager
Reporter: Zhijiang Wang






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5138) The StateBackend provides the method to estimate memory usage based on hint of state size in ResourceSpec

2016-11-22 Thread Zhijiang Wang (JIRA)
Zhijiang Wang created FLINK-5138:


 Summary: The StateBackend provides the method to estimate memory 
usage based on hint of state size in ResourceSpec
 Key: FLINK-5138
 URL: https://issues.apache.org/jira/browse/FLINK-5138
 Project: Flink
  Issue Type: Sub-task
  Components: DataSet API, DataStream API
Reporter: Zhijiang Wang






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5138) The StateBackend provides the method to estimate memory usage based on hint of state size in ResourceSpec

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5138:
-
Component/s: Core

> The StateBackend provides the method to estimate memory usage based on hint 
> of state size in ResourceSpec
> -
>
> Key: FLINK-5138
> URL: https://issues.apache.org/jira/browse/FLINK-5138
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core, DataSet API, DataStream API
>Reporter: Zhijiang Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5138) The StateBackend provides the method to estimate memory usage based on hint of state size in ResourceSpec

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5138:
-
Description: 
Users may specify the state size in *setResource* API, then the different 
*StateBackend* implementation should roughly estimate the different kinds of 
memory usages based on the state size. This part of estimate memory will be 
aggregated with other memories in *ResourceSpec*. There are two advantages to 
do this:
  - For RocksDB backend, the proper memory setting for RocksDB can get better 
performance in  read and write.
  - This estimate memory will be considered when requesting resource for 
container, so the total memory usage will not exceed the container limit.

> The StateBackend provides the method to estimate memory usage based on hint 
> of state size in ResourceSpec
> -
>
> Key: FLINK-5138
> URL: https://issues.apache.org/jira/browse/FLINK-5138
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core, DataSet API, DataStream API
>Reporter: Zhijiang Wang
>
> Users may specify the state size in *setResource* API, then the different 
> *StateBackend* implementation should roughly estimate the different kinds of 
> memory usages based on the state size. This part of estimate memory will be 
> aggregated with other memories in *ResourceSpec*. There are two advantages to 
> do this:
>   - For RocksDB backend, the proper memory setting for RocksDB can get better 
> performance in  read and write.
>   - This estimate memory will be considered when requesting resource for 
> container, so the total memory usage will not exceed the container limit.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5140) NoResourceAvailableException is thrown while starting a new job

2016-11-22 Thread Biao Liu (JIRA)
Biao Liu created FLINK-5140:
---

 Summary: NoResourceAvailableException is thrown while starting a 
new job
 Key: FLINK-5140
 URL: https://issues.apache.org/jira/browse/FLINK-5140
 Project: Flink
  Issue Type: Bug
  Components: Cluster Management
 Environment: FLIP-6 feature branch
Reporter: Biao Liu
Priority: Minor


Here is the stack trace:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: not 
connected to ResourceManager and no slot available
at 
org.apache.flink.runtime.instance.SlotPool.internalAllocateSlot(SlotPool.java:281)
at 
org.apache.flink.runtime.instance.SlotPool.allocateSlot(SlotPool.java:256)
...
Currently I have to set RestartStrategy to handle this exception. But in some 
test cases, I want to test failure about the cluster. It will make this much 
more complicated. After discussing with [~tiemsn], maybe we can fix this 
problem by executing executionGraph after registered to resource manager.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89247416
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = {
+
+// the mapping relation between field index of intermediate aggregate 
Row and output Row.
+val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
+
+// the mapping relation between aggregate function index in list and 
its corresponding
+// field index in output Row.
+val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
 
-(mapFunction, reduceGroupFunction)
+if (groupingOffsetMapping.length != groupings.length ||
+  aggOffsetMapping.length != namedAggregates.length) {
+  throw new TableException(
+"Could not find output field in input data type " +
+  "or aggregate functions.")
+}
+(groupingOffsetMapping, aggOffsetMapping)
+  }
+
+
+  private[flink] def createAllWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
+} else {
+  new AggregateAllWindowFunction(aggFunction)
+}
+
+  }
+
+
+  private[flink] def createWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
+} else {
+  new AggregateWindowFunction(aggFunction)
+}
+
+  }
+
+  private[flink] def createAllWindowIncrementalAggregationFunction(
+window: LogicalWindow,
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int],
+properties: Seq[NamedWindowProperty])
+  : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+val (groupingOffsetMapping, aggOffsetMapping) =
+  getGroupingOffsetAndaggOffsetMapping(
+  namedAggregates,
+  inputType,
+  outputType,
+  groupings)
+
+val finalRowArity = outputType.getFieldCount
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+ 

[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89247407
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = {
+
+// the mapping relation between field index of intermediate aggregate 
Row and output Row.
+val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
+
+// the mapping relation between aggregate function index in list and 
its corresponding
+// field index in output Row.
+val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
 
-(mapFunction, reduceGroupFunction)
+if (groupingOffsetMapping.length != groupings.length ||
+  aggOffsetMapping.length != namedAggregates.length) {
+  throw new TableException(
+"Could not find output field in input data type " +
+  "or aggregate functions.")
+}
+(groupingOffsetMapping, aggOffsetMapping)
+  }
+
+
+  private[flink] def createAllWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
+} else {
+  new AggregateAllWindowFunction(aggFunction)
+}
+
+  }
+
+
+  private[flink] def createWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
+} else {
+  new AggregateWindowFunction(aggFunction)
+}
+
+  }
+
+  private[flink] def createAllWindowIncrementalAggregationFunction(
+window: LogicalWindow,
+aggregates: Array[Aggregate[_ <: Any]],
--- End diff --

sure. agree.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-5136) Aggregation of slot ResourceProfile and framework memory by ResourceManager

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5136:
-
Description: 
When *ResourceManager* requests container resource from cluster, the framework 
memory should be considered together with slot resource.
Currently the framework memory is mainly used by *MemoryManager* and 
*NetworkBufferPool* in *TaskManager*, and this memory can be got from 
configuration.

> Aggregation of slot ResourceProfile and framework memory by ResourceManager
> ---
>
> Key: FLINK-5136
> URL: https://issues.apache.org/jira/browse/FLINK-5136
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Zhijiang Wang
>
> When *ResourceManager* requests container resource from cluster, the 
> framework memory should be considered together with slot resource.
> Currently the framework memory is mainly used by *MemoryManager* and 
> *NetworkBufferPool* in *TaskManager*, and this memory can be got from 
> configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688860#comment-15688860
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/2792
  
@fhueske  thanks a lot for the review. I have refactored AggregateUtil and 
updated the PR .


> Add incremental group window aggregation for streaming Table API
> 
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an 
> incremental fashion. This means that the window collects all records and 
> performs the aggregation when the window is closed instead of eagerly 
> updating a partial aggregate for every added record. Since records are 
> buffered, non-incremental aggregation requires more storage space than 
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API 
> features [incremental 
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
>  using a {{ReduceFunction}}.
> We should add support for incremental aggregation in group-windows.
> This is a follow-up task of FLINK-4691.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2792: [FLINK-4937] [Table] Add incremental group window aggrega...

2016-11-22 Thread sunjincheng121
Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/2792
  
@fhueske  thanks a lot for the review. I have refactored AggregateUtil and 
updated the PR .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-5135) ResourceProfile for slot request should be expanded to correspond with ResourceSpec

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5135:
-
Description: The *JobManager* requests slot by *ResourceProfile* from 
*ResourceManager* before submitting tasks. Currently the *ResourceProfile* only 
contains cpu cores and memory properties. The memory should be expanded to 
different types such as heap memory, direct memory and native memory which 
corresponds with memory in *ResourceSpec*.

> ResourceProfile for slot request should be expanded to correspond with 
> ResourceSpec
> ---
>
> Key: FLINK-5135
> URL: https://issues.apache.org/jira/browse/FLINK-5135
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, ResourceManager
>Reporter: Zhijiang Wang
>
> The *JobManager* requests slot by *ResourceProfile* from *ResourceManager* 
> before submitting tasks. Currently the *ResourceProfile* only contains cpu 
> cores and memory properties. The memory should be expanded to different types 
> such as heap memory, direct memory and native memory which corresponds with 
> memory in *ResourceSpec*.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89242998
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -61,25 +61,108 @@ object AggregateUtil {
* }}}
*
*/
-  def createOperatorFunctionsForAggregates(
+def createOperatorFunctionsForAggregates(
   namedAggregates: Seq[CalcitePair[AggregateCall, String]],
   inputType: RelDataType,
   outputType: RelDataType,
   groupings: Array[Int])
 : (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = {
 
-val aggregateFunctionsAndFieldIndexes =
-  transformToAggregateFunctions(namedAggregates.map(_.getKey), 
inputType, groupings.length)
-// store the aggregate fields of each aggregate function, by the same 
order of aggregates.
-val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
-val aggregates = aggregateFunctionsAndFieldIndexes._2
+   val (aggFieldIndexes, aggregates)  =
+   transformToAggregateFunctions(namedAggregates.map(_.getKey),
+ inputType, groupings.length)
 
-val mapReturnType: RowTypeInfo =
-  createAggregateBufferDataType(groupings, aggregates, inputType)
+createOperatorFunctionsForAggregates(namedAggregates,
+  inputType,
+  outputType,
+  groupings,
+  aggregates,aggFieldIndexes)
+}
 
-val mapFunction = new AggregateMapFunction[Row, Row](
-aggregates, aggFieldIndexes, groupings,
-
mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
+def createOperatorFunctionsForAggregates(
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int],
+aggregates:Array[Aggregate[_ <: Any]],
+aggFieldIndexes:Array[Int])
+: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row])= {
+
+  val mapFunction = createAggregateMapFunction(aggregates,
+aggFieldIndexes, groupings, inputType)
+
+  // the mapping relation between field index of intermediate 
aggregate Row and output Row.
+  val groupingOffsetMapping = getGroupKeysMapping(inputType, 
outputType, groupings)
+
+  // the mapping relation between aggregate function index in list and 
its corresponding
+  // field index in output Row.
+  val aggOffsetMapping = getAggregateMapping(namedAggregates, 
outputType)
+
+  if (groupingOffsetMapping.length != groupings.length ||
+aggOffsetMapping.length != namedAggregates.length) {
+throw new TableException("Could not find output field in input 
data type " +
+  "or aggregate functions.")
+  }
+
+  val allPartialAggregate = aggregates.map(_.supportPartial).forall(x 
=> x)
+
+  val intermediateRowArity = groupings.length +
+aggregates.map(_.intermediateDataType.length).sum
+
+  val reduceGroupFunction =
+if (allPartialAggregate) {
+  new AggregateReduceCombineFunction(
+aggregates,
+groupingOffsetMapping,
+aggOffsetMapping,
+intermediateRowArity,
+outputType.getFieldCount)
+}
+else {
+  new AggregateReduceGroupFunction(
+aggregates,
+groupingOffsetMapping,
+aggOffsetMapping,
+intermediateRowArity,
+outputType.getFieldCount)
+}
+
+  (mapFunction, reduceGroupFunction)
+  }
+
+  /**
+* Create Flink operator functions for Incremental aggregates.
+* It includes 2 implementations of Flink operator functions:
+* [[org.apache.flink.api.common.functions.MapFunction]] and
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+* The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the
+* intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
+* format:
+*
+* {{{
+*   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+* |  |
+* v  v
+*+-+-+++++
+*|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
+*+-+-+++++
+*

[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688485#comment-15688485
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89242998
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -61,25 +61,108 @@ object AggregateUtil {
* }}}
*
*/
-  def createOperatorFunctionsForAggregates(
+def createOperatorFunctionsForAggregates(
   namedAggregates: Seq[CalcitePair[AggregateCall, String]],
   inputType: RelDataType,
   outputType: RelDataType,
   groupings: Array[Int])
 : (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = {
 
-val aggregateFunctionsAndFieldIndexes =
-  transformToAggregateFunctions(namedAggregates.map(_.getKey), 
inputType, groupings.length)
-// store the aggregate fields of each aggregate function, by the same 
order of aggregates.
-val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
-val aggregates = aggregateFunctionsAndFieldIndexes._2
+   val (aggFieldIndexes, aggregates)  =
+   transformToAggregateFunctions(namedAggregates.map(_.getKey),
+ inputType, groupings.length)
 
-val mapReturnType: RowTypeInfo =
-  createAggregateBufferDataType(groupings, aggregates, inputType)
+createOperatorFunctionsForAggregates(namedAggregates,
+  inputType,
+  outputType,
+  groupings,
+  aggregates,aggFieldIndexes)
+}
 
-val mapFunction = new AggregateMapFunction[Row, Row](
-aggregates, aggFieldIndexes, groupings,
-
mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
+def createOperatorFunctionsForAggregates(
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int],
+aggregates:Array[Aggregate[_ <: Any]],
+aggFieldIndexes:Array[Int])
+: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row])= {
+
+  val mapFunction = createAggregateMapFunction(aggregates,
+aggFieldIndexes, groupings, inputType)
+
+  // the mapping relation between field index of intermediate 
aggregate Row and output Row.
+  val groupingOffsetMapping = getGroupKeysMapping(inputType, 
outputType, groupings)
+
+  // the mapping relation between aggregate function index in list and 
its corresponding
+  // field index in output Row.
+  val aggOffsetMapping = getAggregateMapping(namedAggregates, 
outputType)
+
+  if (groupingOffsetMapping.length != groupings.length ||
+aggOffsetMapping.length != namedAggregates.length) {
+throw new TableException("Could not find output field in input 
data type " +
+  "or aggregate functions.")
+  }
+
+  val allPartialAggregate = aggregates.map(_.supportPartial).forall(x 
=> x)
+
+  val intermediateRowArity = groupings.length +
+aggregates.map(_.intermediateDataType.length).sum
+
+  val reduceGroupFunction =
+if (allPartialAggregate) {
+  new AggregateReduceCombineFunction(
+aggregates,
+groupingOffsetMapping,
+aggOffsetMapping,
+intermediateRowArity,
+outputType.getFieldCount)
+}
+else {
+  new AggregateReduceGroupFunction(
+aggregates,
+groupingOffsetMapping,
+aggOffsetMapping,
+intermediateRowArity,
+outputType.getFieldCount)
+}
+
+  (mapFunction, reduceGroupFunction)
+  }
+
+  /**
+* Create Flink operator functions for Incremental aggregates.
+* It includes 2 implementations of Flink operator functions:
+* [[org.apache.flink.api.common.functions.MapFunction]] and
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+* The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the
+* intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
+* format:
+*
+* {{{
+*   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+* |  |
+* v  v
+*

[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89244761
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
--- End diff --

yes.i'd do it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688533#comment-15688533
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89244761
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
--- End diff --

yes.i'd do it.


> Add incremental group window aggregation for streaming Table API
> 
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an 
> incremental fashion. This means that the window collects all records and 
> performs the aggregation when the window is closed instead of eagerly 
> updating a partial aggregate for every added record. Since records are 
> buffered, non-incremental aggregation requires more storage space than 
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API 
> features [incremental 
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
>  using a {{ReduceFunction}}.
> We should add support for incremental aggregation in group-windows.
> This is a follow-up task of FLINK-4691.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5122) Elasticsearch Sink loses documents when cluster has high load

2016-11-22 Thread Flavio Pompermaier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686075#comment-15686075
 ] 

Flavio Pompermaier commented on FLINK-5122:
---

Can you try and see if our pull request fix your issue 
(https://github.com/apache/flink/pull/2790/)? We also had this problem of 
missing documents and the problem was the RunTimeException thrown by the 
close().

> Elasticsearch Sink loses documents when cluster has high load
> -
>
> Key: FLINK-5122
> URL: https://issues.apache.org/jira/browse/FLINK-5122
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.2.0
>Reporter: static-max
>
> My cluster had high load and documents got not indexed. This violates the "at 
> least once" semantics in the ES connector.
> I gave pressure on my cluster to test Flink, causing new indices to be 
> created and balanced. On those errors the bulk should be tried again instead 
> of being discarded.
> Primary shard not active because ES decided to rebalance the index:
> 2016-11-15 15:35:16,123 ERROR 
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink  - 
> Failed to index document in Elasticsearch: 
> UnavailableShardsException[[index-name][3] primary shard is not active 
> Timeout: [1m], request: [BulkShardRequest to [index-name] containing [20] 
> requests]]
> Bulk queue on node full (I set queue to a low value to reproduce error):
> 22:37:57,702 ERROR 
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink  - 
> Failed to index document in Elasticsearch: 
> RemoteTransportException[[node1][192.168.1.240:9300][indices:data/write/bulk[s][p]]];
>  nested: EsRejectedExecutionException[rejected execution of 
> org.elasticsearch.transport.TransportService$4@727e677c on 
> EsThreadPoolExecutor[bulk, queue capacity = 1, 
> org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@51322d37[Running,
>  pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 
> 2939]]];
> I can try to propose a PR for this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4895) Drop support for Hadoop 1

2016-11-22 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger reassigned FLINK-4895:
-

Assignee: Robert Metzger

> Drop support for Hadoop 1
> -
>
> Key: FLINK-4895
> URL: https://issues.apache.org/jira/browse/FLINK-4895
> Project: Flink
>  Issue Type: Task
>  Components: Build System
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>
> As per this mailing list discussion: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DISCUSS-Drop-Hadoop-1-support-with-Flink-1-2-td9530.html
>  the community agreed to drop support for Hadoop 1.
> The task includes
> - removing the hadoop-1 / hadoop-2 build profiles, 
> - removing the scripts for generating hadoop-x poms
> - updating the release script
> - updating the nightly build script
> - updating the travis configuration file
> - updating the documentation
> - updating the website



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5057) Cancellation timeouts are picked from wrong config

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686230#comment-15686230
 ] 

ASF GitHub Bot commented on FLINK-5057:
---

Github user uce closed the pull request at:

https://github.com/apache/flink/pull/2794


> Cancellation timeouts are picked from wrong config
> --
>
> Key: FLINK-5057
> URL: https://issues.apache.org/jira/browse/FLINK-5057
> Project: Flink
>  Issue Type: Bug
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
> Fix For: 1.2.0, 1.1.4
>
>
> The cancellation timeouts are read from the Task configuration instead of the 
> TaskManager configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89101873
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = {
+
+// the mapping relation between field index of intermediate aggregate 
Row and output Row.
+val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
+
+// the mapping relation between aggregate function index in list and 
its corresponding
+// field index in output Row.
+val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
 
-(mapFunction, reduceGroupFunction)
+if (groupingOffsetMapping.length != groupings.length ||
+  aggOffsetMapping.length != namedAggregates.length) {
+  throw new TableException(
+"Could not find output field in input data type " +
+  "or aggregate functions.")
+}
+(groupingOffsetMapping, aggOffsetMapping)
+  }
+
+
+  private[flink] def createAllWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
+} else {
+  new AggregateAllWindowFunction(aggFunction)
+}
+
+  }
+
+
+  private[flink] def createWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
--- End diff --

Same as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686702#comment-15686702
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89088322
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -39,66 +45,69 @@ object AggregateUtil {
   type JavaList[T] = java.util.List[T]
 
   /**
-   * Create Flink operator functions for aggregates. It includes 2 
implementations of Flink 
-   * operator functions:
-   * [[org.apache.flink.api.common.functions.MapFunction]] and 
-   * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's 
partial aggregate,
-   * should also implement 
[[org.apache.flink.api.common.functions.CombineFunction]] as well). 
-   * The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the 
-   * intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
-   * format:
-   *
-   * {{{
-   *   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
-   * |  |
-   * v  v
-   *+-+-+++++
-   *|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
-   *+-+-+++++
-   *  ^
-   *  |
-   *   sum(y) aggOffsetInRow = 4
-   * }}}
-   *
-   */
-  def createOperatorFunctionsForAggregates(
-  namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-  inputType: RelDataType,
-  outputType: RelDataType,
-  groupings: Array[Int])
-: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = {
-
-val aggregateFunctionsAndFieldIndexes =
-  transformToAggregateFunctions(namedAggregates.map(_.getKey), 
inputType, groupings.length)
-// store the aggregate fields of each aggregate function, by the same 
order of aggregates.
-val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
-val aggregates = aggregateFunctionsAndFieldIndexes._2
+* Create prepare MapFunction for aggregates.
+* The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the
+* intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
+* format:
+*
+* {{{
+*   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+* |  |
+* v  v
+*+-+-+++++
+*|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
+*+-+-+++++
+*  ^
+*  |
+*   sum(y) aggOffsetInRow = 4
+* }}}
+*
+*/
+  private[flink] def createPrepareMapFunction(
+aggregates: Array[Aggregate[_ <: Any]],
--- End diff --

Let's just use `namedAggregates: Seq[CalcitePair[AggregateCall, String]]` 
and compute `aggregates` and `aggFieldIndexes` from it.


> Add incremental group window aggregation for streaming Table API
> 
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an 
> incremental fashion. This means that the window collects all records and 
> performs the aggregation when the window is closed instead of eagerly 
> updating a partial aggregate for every added record. Since records are 
> buffered, non-incremental aggregation requires more storage space than 
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API 
> features [incremental 
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
>  using a {{ReduceFunction}}.
> We should add support for incremental 

[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686703#comment-15686703
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89101976
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = {
+
+// the mapping relation between field index of intermediate aggregate 
Row and output Row.
+val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
+
+// the mapping relation between aggregate function index in list and 
its corresponding
+// field index in output Row.
+val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
 
-(mapFunction, reduceGroupFunction)
+if (groupingOffsetMapping.length != groupings.length ||
+  aggOffsetMapping.length != namedAggregates.length) {
+  throw new TableException(
+"Could not find output field in input data type " +
+  "or aggregate functions.")
+}
+(groupingOffsetMapping, aggOffsetMapping)
+  }
+
+
+  private[flink] def createAllWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
+} else {
+  new AggregateAllWindowFunction(aggFunction)
+}
+
+  }
+
+
+  private[flink] def createWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
+} else {
+  new AggregateWindowFunction(aggFunction)
+}
+
+  }
+
+  private[flink] def createAllWindowIncrementalAggregationFunction(
+window: LogicalWindow,
+aggregates: Array[Aggregate[_ <: Any]],
--- End diff --

compute `aggregates` from `namedAggregates`.


> Add incremental group window aggregation for streaming Table API
> 
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 

[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89090109
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = {
+
+// the mapping relation between field index of intermediate aggregate 
Row and output Row.
+val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
+
+// the mapping relation between aggregate function index in list and 
its corresponding
+// field index in output Row.
+val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
 
-(mapFunction, reduceGroupFunction)
+if (groupingOffsetMapping.length != groupings.length ||
+  aggOffsetMapping.length != namedAggregates.length) {
+  throw new TableException(
+"Could not find output field in input data type " +
+  "or aggregate functions.")
+}
+(groupingOffsetMapping, aggOffsetMapping)
+  }
+
+
+  private[flink] def createAllWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
+} else {
+  new AggregateAllWindowFunction(aggFunction)
+}
+
+  }
+
+
+  private[flink] def createWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
+} else {
+  new AggregateWindowFunction(aggFunction)
+}
+
+  }
+
+  private[flink] def createAllWindowIncrementalAggregationFunction(
+window: LogicalWindow,
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int],
+properties: Seq[NamedWindowProperty])
+  : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+val (groupingOffsetMapping, aggOffsetMapping) =
+  getGroupingOffsetAndaggOffsetMapping(
+  namedAggregates,
+  inputType,
+  outputType,
+  groupings)
+
+val finalRowArity = outputType.getFieldCount
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new 

[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686709#comment-15686709
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89088819
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -39,66 +45,69 @@ object AggregateUtil {
   type JavaList[T] = java.util.List[T]
 
   /**
-   * Create Flink operator functions for aggregates. It includes 2 
implementations of Flink 
-   * operator functions:
-   * [[org.apache.flink.api.common.functions.MapFunction]] and 
-   * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's 
partial aggregate,
-   * should also implement 
[[org.apache.flink.api.common.functions.CombineFunction]] as well). 
-   * The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the 
-   * intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
-   * format:
-   *
-   * {{{
-   *   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
-   * |  |
-   * v  v
-   *+-+-+++++
-   *|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
-   *+-+-+++++
-   *  ^
-   *  |
-   *   sum(y) aggOffsetInRow = 4
-   * }}}
-   *
-   */
-  def createOperatorFunctionsForAggregates(
-  namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-  inputType: RelDataType,
-  outputType: RelDataType,
-  groupings: Array[Int])
-: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = {
-
-val aggregateFunctionsAndFieldIndexes =
-  transformToAggregateFunctions(namedAggregates.map(_.getKey), 
inputType, groupings.length)
-// store the aggregate fields of each aggregate function, by the same 
order of aggregates.
-val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
-val aggregates = aggregateFunctionsAndFieldIndexes._2
+* Create prepare MapFunction for aggregates.
+* The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the
+* intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
+* format:
+*
+* {{{
+*   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+* |  |
+* v  v
+*+-+-+++++
+*|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
+*+-+-+++++
+*  ^
+*  |
+*   sum(y) aggOffsetInRow = 4
+* }}}
+*
+*/
+  private[flink] def createPrepareMapFunction(
+aggregates: Array[Aggregate[_ <: Any]],
--- End diff --

I think we should make the arguments of the `create*Function` methods more 
consistent. Either we use Calcite's `namedAggregates` or we use the transformed 
`aggregates` and `aggFieldIndexes`.


> Add incremental group window aggregation for streaming Table API
> 
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an 
> incremental fashion. This means that the window collects all records and 
> performs the aggregation when the window is closed instead of eagerly 
> updating a partial aggregate for every added record. Since records are 
> buffered, non-incremental aggregation requires more storage space than 
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API 
> features [incremental 
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
>  using a 

[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89088166
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -39,66 +45,69 @@ object AggregateUtil {
   type JavaList[T] = java.util.List[T]
 
   /**
-   * Create Flink operator functions for aggregates. It includes 2 
implementations of Flink 
-   * operator functions:
-   * [[org.apache.flink.api.common.functions.MapFunction]] and 
-   * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's 
partial aggregate,
-   * should also implement 
[[org.apache.flink.api.common.functions.CombineFunction]] as well). 
-   * The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the 
-   * intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
-   * format:
-   *
-   * {{{
-   *   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
-   * |  |
-   * v  v
-   *+-+-+++++
-   *|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
-   *+-+-+++++
-   *  ^
-   *  |
-   *   sum(y) aggOffsetInRow = 4
-   * }}}
-   *
-   */
-  def createOperatorFunctionsForAggregates(
-  namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-  inputType: RelDataType,
-  outputType: RelDataType,
-  groupings: Array[Int])
-: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = {
-
-val aggregateFunctionsAndFieldIndexes =
-  transformToAggregateFunctions(namedAggregates.map(_.getKey), 
inputType, groupings.length)
-// store the aggregate fields of each aggregate function, by the same 
order of aggregates.
-val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
-val aggregates = aggregateFunctionsAndFieldIndexes._2
+* Create prepare MapFunction for aggregates.
+* The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the
+* intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
+* format:
+*
+* {{{
+*   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+* |  |
+* v  v
+*+-+-+++++
+*|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
+*+-+-+++++
+*  ^
+*  |
+*   sum(y) aggOffsetInRow = 4
+* }}}
+*
+*/
+  private[flink] def createPrepareMapFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+aggFieldIndexes: Array[Int],
+groupings: Array[Int],
+inputType:
+RelDataType): MapFunction[Any, Row] = {
 
 val mapReturnType: RowTypeInfo =
   createAggregateBufferDataType(groupings, aggregates, inputType)
 
 val mapFunction = new AggregateMapFunction[Row, Row](
-aggregates, aggFieldIndexes, groupings,
-
mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
-
-// the mapping relation between field index of intermediate aggregate 
Row and output Row.
-val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
-
-// the mapping relation between aggregate function index in list and 
its corresponding
-// field index in output Row.
-val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
-
-if (groupingOffsetMapping.length != groupings.length ||
-aggOffsetMapping.length != namedAggregates.length) {
-  throw new TableException("Could not find output field in input data 
type " +
-  "or aggregate functions.")
-}
-
-val allPartialAggregate = aggregates.map(_.supportPartial).forall(x => 
x)
+  aggregates,
+  aggFieldIndexes,
+  groupings,
+  
mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
 
-val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+mapFunction

[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89089242
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
--- End diff --

Can you move this method below the `create*Function` methods and make it 
`private`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89102066
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = {
+
+// the mapping relation between field index of intermediate aggregate 
Row and output Row.
+val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
+
+// the mapping relation between aggregate function index in list and 
its corresponding
+// field index in output Row.
+val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
 
-(mapFunction, reduceGroupFunction)
+if (groupingOffsetMapping.length != groupings.length ||
+  aggOffsetMapping.length != namedAggregates.length) {
+  throw new TableException(
+"Could not find output field in input data type " +
+  "or aggregate functions.")
+}
+(groupingOffsetMapping, aggOffsetMapping)
+  }
+
+
+  private[flink] def createAllWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
+} else {
+  new AggregateAllWindowFunction(aggFunction)
+}
+
+  }
+
+
+  private[flink] def createWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
+} else {
+  new AggregateWindowFunction(aggFunction)
+}
+
+  }
+
+  private[flink] def createAllWindowIncrementalAggregationFunction(
+window: LogicalWindow,
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int],
+properties: Seq[NamedWindowProperty])
+  : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+val (groupingOffsetMapping, aggOffsetMapping) =
+  getGroupingOffsetAndaggOffsetMapping(
+  namedAggregates,
+  inputType,
+  outputType,
+  groupings)
+
+val finalRowArity = outputType.getFieldCount
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new 

[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89101806
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = {
+
+// the mapping relation between field index of intermediate aggregate 
Row and output Row.
+val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
+
+// the mapping relation between aggregate function index in list and 
its corresponding
+// field index in output Row.
+val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
 
-(mapFunction, reduceGroupFunction)
+if (groupingOffsetMapping.length != groupings.length ||
+  aggOffsetMapping.length != namedAggregates.length) {
+  throw new TableException(
+"Could not find output field in input data type " +
+  "or aggregate functions.")
+}
+(groupingOffsetMapping, aggOffsetMapping)
+  }
+
+
+  private[flink] def createAllWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
--- End diff --

I'd generate the `GroupReduceFunction` directly in this method. That will 
make the calling code simpler.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89086106
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -39,66 +45,69 @@ object AggregateUtil {
   type JavaList[T] = java.util.List[T]
 
   /**
-   * Create Flink operator functions for aggregates. It includes 2 
implementations of Flink 
-   * operator functions:
-   * [[org.apache.flink.api.common.functions.MapFunction]] and 
-   * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's 
partial aggregate,
-   * should also implement 
[[org.apache.flink.api.common.functions.CombineFunction]] as well). 
-   * The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the 
-   * intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
-   * format:
-   *
-   * {{{
-   *   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
-   * |  |
-   * v  v
-   *+-+-+++++
-   *|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
-   *+-+-+++++
-   *  ^
-   *  |
-   *   sum(y) aggOffsetInRow = 4
-   * }}}
-   *
-   */
-  def createOperatorFunctionsForAggregates(
-  namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-  inputType: RelDataType,
-  outputType: RelDataType,
-  groupings: Array[Int])
-: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = {
-
-val aggregateFunctionsAndFieldIndexes =
-  transformToAggregateFunctions(namedAggregates.map(_.getKey), 
inputType, groupings.length)
-// store the aggregate fields of each aggregate function, by the same 
order of aggregates.
-val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
-val aggregates = aggregateFunctionsAndFieldIndexes._2
+* Create prepare MapFunction for aggregates.
+* The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the
+* intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
+* format:
+*
+* {{{
+*   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+* |  |
+* v  v
+*+-+-+++++
+*|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
+*+-+-+++++
+*  ^
+*  |
+*   sum(y) aggOffsetInRow = 4
+* }}}
+*
+*/
+  private[flink] def createPrepareMapFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+aggFieldIndexes: Array[Int],
+groupings: Array[Int],
+inputType:
+RelDataType): MapFunction[Any, Row] = {
--- End diff --

remove this line break.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89088443
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
--- End diff --

can we just use `namedAggregates` and compute `aggregates` from it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2822: [FLINK-5075] [kinesis] Make Kinesis consumer fail-proof t...

2016-11-22 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2822
  
Thanks for the review @StephanEwen. I'm pretty sure this doesn't affect the 
normal Kinesis shard discovery. I'll give it some final tests before merging 
(would like to get this in before the next 1.1.4 RC).

Yes, using `describeStream(streamName)` only will be problematic for users 
with large numbers of shards, because the whole list may not be able to be 
returned in a single call. So that's most likely not a solution we can consider.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5075) Kinesis consumer incorrectly determines shards as newly discovered when tested against Kinesalite

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686551#comment-15686551
 ] 

ASF GitHub Bot commented on FLINK-5075:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2822#discussion_r89100942
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -283,7 +285,7 @@ public String getShardIterator(KinesisStreamShard 
shard, String shardIteratorTyp
 * @param startShardId which shard to start with for this describe 
operation (earlier shard's infos will not appear in result)
 * @return the result of the describe stream operation
 */
-   private DescribeStreamResult describeStream(String streamName, String 
startShardId) throws InterruptedException {
+   private DescribeStreamResult describeStream(String streamName, 
@Nullable String startShardId) throws InterruptedException {
--- End diff --

Does only Kinesalite supply the `startShardId` parameter?


> Kinesis consumer incorrectly determines shards as newly discovered when 
> tested against Kinesalite
> -
>
> Key: FLINK-5075
> URL: https://issues.apache.org/jira/browse/FLINK-5075
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> A user reported that when our Kinesis connector is used against Kinesalite 
> (https://github.com/mhart/kinesalite), we're incorrectly determining already 
> found shards as newly discovered:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Subtask-keeps-on-discovering-new-Kinesis-shard-when-using-Kinesalite-td10133.html
> I suspect the problem to be the mock Kinesis API implementations of 
> Kinesalite doesn't completely match with the official AWS Kinesis behaviour.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5061) Remove ContinuousEventTimeTrigger

2016-11-22 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686800#comment-15686800
 ] 

Aljoscha Krettek commented on FLINK-5061:
-

We can also leave it in since it's not a maintenance overhead. My main 
motivation for removing this was that I saw too many people on the mailing 
lists that were using this but were not aware of what it actually does.

> Remove ContinuousEventTimeTrigger
> -
>
> Key: FLINK-5061
> URL: https://issues.apache.org/jira/browse/FLINK-5061
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.2.0
>
>
> The Trigger does not do what people think it does. The problem is that a 
> watermark T signals that we won't see elements with timestamp < T in the 
> future. It does not signal that we have not yet seen elements with timestamp 
> > T. So it cannot really be used to trigger at different stages of processing 
> an event-time window.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2822: [FLINK-5075] [kinesis] Make Kinesis consumer fail-...

2016-11-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2822#discussion_r89119126
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -283,7 +285,7 @@ public String getShardIterator(KinesisStreamShard 
shard, String shardIteratorTyp
 * @param startShardId which shard to start with for this describe 
operation (earlier shard's infos will not appear in result)
 * @return the result of the describe stream operation
 */
-   private DescribeStreamResult describeStream(String streamName, String 
startShardId) throws InterruptedException {
+   private DescribeStreamResult describeStream(String streamName, 
@Nullable String startShardId) throws InterruptedException {
--- End diff --

Only Kinesis does. Kinesalite incorrectly ignores `startShardId`.
It's marked `Nullable` here, because on fresh startups this method will be 
called with a `null` as the start ID (on startup there will be no shard Id to 
start from).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5075) Kinesis consumer incorrectly determines shards as newly discovered when tested against Kinesalite

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686816#comment-15686816
 ] 

ASF GitHub Bot commented on FLINK-5075:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2822#discussion_r89119126
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -283,7 +285,7 @@ public String getShardIterator(KinesisStreamShard 
shard, String shardIteratorTyp
 * @param startShardId which shard to start with for this describe 
operation (earlier shard's infos will not appear in result)
 * @return the result of the describe stream operation
 */
-   private DescribeStreamResult describeStream(String streamName, String 
startShardId) throws InterruptedException {
+   private DescribeStreamResult describeStream(String streamName, 
@Nullable String startShardId) throws InterruptedException {
--- End diff --

Only Kinesis does. Kinesalite incorrectly ignores `startShardId`.
It's marked `Nullable` here, because on fresh startups this method will be 
called with a `null` as the start ID (on startup there will be no shard Id to 
start from).


> Kinesis consumer incorrectly determines shards as newly discovered when 
> tested against Kinesalite
> -
>
> Key: FLINK-5075
> URL: https://issues.apache.org/jira/browse/FLINK-5075
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> A user reported that when our Kinesis connector is used against Kinesalite 
> (https://github.com/mhart/kinesalite), we're incorrectly determining already 
> found shards as newly discovered:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Subtask-keeps-on-discovering-new-Kinesis-shard-when-using-Kinesalite-td10133.html
> I suspect the problem to be the mock Kinesis API implementations of 
> Kinesalite doesn't completely match with the official AWS Kinesis behaviour.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686760#comment-15686760
 ] 

ASF GitHub Bot commented on FLINK-4964:
---

Github user tfournier314 commented on the issue:

https://github.com/apache/flink/pull/2740
  
Hello @thvasilo @greghogan  

Ok I've updated documentation. I stay tuned for updating code. 

Regards
Thomas


> FlinkML - Add StringIndexer
> ---
>
> Key: FLINK-4964
> URL: https://issues.apache.org/jira/browse/FLINK-4964
> Project: Flink
>  Issue Type: New Feature
>Reporter: Thomas FOURNIER
>Priority: Minor
>
> Add StringIndexer as described here:
> http://spark.apache.org/docs/latest/ml-features.html#stringindexer
> This will be added in package preprocessing of FlinkML



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2740: [FLINK-4964] [ml]

2016-11-22 Thread tfournier314
Github user tfournier314 commented on the issue:

https://github.com/apache/flink/pull/2740
  
Hello @thvasilo @greghogan  

Ok I've updated documentation. I stay tuned for updating code. 

Regards
Thomas


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89088819
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -39,66 +45,69 @@ object AggregateUtil {
   type JavaList[T] = java.util.List[T]
 
   /**
-   * Create Flink operator functions for aggregates. It includes 2 
implementations of Flink 
-   * operator functions:
-   * [[org.apache.flink.api.common.functions.MapFunction]] and 
-   * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's 
partial aggregate,
-   * should also implement 
[[org.apache.flink.api.common.functions.CombineFunction]] as well). 
-   * The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the 
-   * intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
-   * format:
-   *
-   * {{{
-   *   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
-   * |  |
-   * v  v
-   *+-+-+++++
-   *|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
-   *+-+-+++++
-   *  ^
-   *  |
-   *   sum(y) aggOffsetInRow = 4
-   * }}}
-   *
-   */
-  def createOperatorFunctionsForAggregates(
-  namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-  inputType: RelDataType,
-  outputType: RelDataType,
-  groupings: Array[Int])
-: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = {
-
-val aggregateFunctionsAndFieldIndexes =
-  transformToAggregateFunctions(namedAggregates.map(_.getKey), 
inputType, groupings.length)
-// store the aggregate fields of each aggregate function, by the same 
order of aggregates.
-val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
-val aggregates = aggregateFunctionsAndFieldIndexes._2
+* Create prepare MapFunction for aggregates.
+* The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the
+* intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
+* format:
+*
+* {{{
+*   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+* |  |
+* v  v
+*+-+-+++++
+*|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
+*+-+-+++++
+*  ^
+*  |
+*   sum(y) aggOffsetInRow = 4
+* }}}
+*
+*/
+  private[flink] def createPrepareMapFunction(
+aggregates: Array[Aggregate[_ <: Any]],
--- End diff --

I think we should make the arguments of the `create*Function` methods more 
consistent. Either we use Calcite's `namedAggregates` or we use the transformed 
`aggregates` and `aggFieldIndexes`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686711#comment-15686711
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89102066
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = {
+
+// the mapping relation between field index of intermediate aggregate 
Row and output Row.
+val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
+
+// the mapping relation between aggregate function index in list and 
its corresponding
+// field index in output Row.
+val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
 
-(mapFunction, reduceGroupFunction)
+if (groupingOffsetMapping.length != groupings.length ||
+  aggOffsetMapping.length != namedAggregates.length) {
+  throw new TableException(
+"Could not find output field in input data type " +
+  "or aggregate functions.")
+}
+(groupingOffsetMapping, aggOffsetMapping)
+  }
+
+
+  private[flink] def createAllWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
+} else {
+  new AggregateAllWindowFunction(aggFunction)
+}
+
+  }
+
+
+  private[flink] def createWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
+} else {
+  new AggregateWindowFunction(aggFunction)
+}
+
+  }
+
+  private[flink] def createAllWindowIncrementalAggregationFunction(
+window: LogicalWindow,
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int],
+properties: Seq[NamedWindowProperty])
+  : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+val (groupingOffsetMapping, aggOffsetMapping) =
+  getGroupingOffsetAndaggOffsetMapping(
+  namedAggregates,
+  inputType,
+  

[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686705#comment-15686705
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89088443
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
--- End diff --

can we just use `namedAggregates` and compute `aggregates` from it?


> Add incremental group window aggregation for streaming Table API
> 
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an 
> incremental fashion. This means that the window collects all records and 
> performs the aggregation when the window is closed instead of eagerly 
> updating a partial aggregate for every added record. Since records are 
> buffered, non-incremental aggregation requires more storage space than 
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API 
> features [incremental 
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
>  using a {{ReduceFunction}}.
> We should add support for incremental aggregation in group-windows.
> This is a follow-up task of FLINK-4691.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686700#comment-15686700
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89089095
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
--- End diff --

Please fix camel case: `getGroupingOffsetAndAggOffsetMapping`


> Add incremental group window aggregation for streaming Table API
> 
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an 
> incremental fashion. This means that the window collects all records and 
> performs the aggregation when the window is closed instead of eagerly 
> updating a partial aggregate for every added record. Since records are 
> buffered, non-incremental aggregation requires more storage space than 
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API 
> features [incremental 
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
>  using a {{ReduceFunction}}.
> We should add support for incremental aggregation in group-windows.
> This is a follow-up task of FLINK-4691.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89086247
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -39,66 +45,69 @@ object AggregateUtil {
   type JavaList[T] = java.util.List[T]
 
   /**
-   * Create Flink operator functions for aggregates. It includes 2 
implementations of Flink 
-   * operator functions:
-   * [[org.apache.flink.api.common.functions.MapFunction]] and 
-   * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's 
partial aggregate,
-   * should also implement 
[[org.apache.flink.api.common.functions.CombineFunction]] as well). 
-   * The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the 
-   * intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
-   * format:
-   *
-   * {{{
-   *   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
-   * |  |
-   * v  v
-   *+-+-+++++
-   *|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
-   *+-+-+++++
-   *  ^
-   *  |
-   *   sum(y) aggOffsetInRow = 4
-   * }}}
-   *
-   */
-  def createOperatorFunctionsForAggregates(
-  namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-  inputType: RelDataType,
-  outputType: RelDataType,
-  groupings: Array[Int])
-: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = {
-
-val aggregateFunctionsAndFieldIndexes =
-  transformToAggregateFunctions(namedAggregates.map(_.getKey), 
inputType, groupings.length)
-// store the aggregate fields of each aggregate function, by the same 
order of aggregates.
-val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
-val aggregates = aggregateFunctionsAndFieldIndexes._2
+* Create prepare MapFunction for aggregates.
+* The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the
+* intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
+* format:
+*
+* {{{
+*   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+* |  |
+* v  v
+*+-+-+++++
+*|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
+*+-+-+++++
+*  ^
+*  |
+*   sum(y) aggOffsetInRow = 4
+* }}}
+*
+*/
+  private[flink] def createPrepareMapFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+aggFieldIndexes: Array[Int],
+groupings: Array[Int],
+inputType:
+RelDataType): MapFunction[Any, Row] = {
 
 val mapReturnType: RowTypeInfo =
   createAggregateBufferDataType(groupings, aggregates, inputType)
 
 val mapFunction = new AggregateMapFunction[Row, Row](
-aggregates, aggFieldIndexes, groupings,
-
mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
-
-// the mapping relation between field index of intermediate aggregate 
Row and output Row.
-val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
-
-// the mapping relation between aggregate function index in list and 
its corresponding
-// field index in output Row.
-val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
-
-if (groupingOffsetMapping.length != groupings.length ||
-aggOffsetMapping.length != namedAggregates.length) {
-  throw new TableException("Could not find output field in input data 
type " +
-  "or aggregate functions.")
-}
-
-val allPartialAggregate = aggregates.map(_.supportPartial).forall(x => 
x)
+  aggregates,
+  aggFieldIndexes,
+  groupings,
+  
mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
 
-val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+mapFunction

[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89088322
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -39,66 +45,69 @@ object AggregateUtil {
   type JavaList[T] = java.util.List[T]
 
   /**
-   * Create Flink operator functions for aggregates. It includes 2 
implementations of Flink 
-   * operator functions:
-   * [[org.apache.flink.api.common.functions.MapFunction]] and 
-   * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's 
partial aggregate,
-   * should also implement 
[[org.apache.flink.api.common.functions.CombineFunction]] as well). 
-   * The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the 
-   * intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
-   * format:
-   *
-   * {{{
-   *   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
-   * |  |
-   * v  v
-   *+-+-+++++
-   *|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
-   *+-+-+++++
-   *  ^
-   *  |
-   *   sum(y) aggOffsetInRow = 4
-   * }}}
-   *
-   */
-  def createOperatorFunctionsForAggregates(
-  namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-  inputType: RelDataType,
-  outputType: RelDataType,
-  groupings: Array[Int])
-: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = {
-
-val aggregateFunctionsAndFieldIndexes =
-  transformToAggregateFunctions(namedAggregates.map(_.getKey), 
inputType, groupings.length)
-// store the aggregate fields of each aggregate function, by the same 
order of aggregates.
-val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
-val aggregates = aggregateFunctionsAndFieldIndexes._2
+* Create prepare MapFunction for aggregates.
+* The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the
+* intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
+* format:
+*
+* {{{
+*   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+* |  |
+* v  v
+*+-+-+++++
+*|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
+*+-+-+++++
+*  ^
+*  |
+*   sum(y) aggOffsetInRow = 4
+* }}}
+*
+*/
+  private[flink] def createPrepareMapFunction(
+aggregates: Array[Aggregate[_ <: Any]],
--- End diff --

Let's just use `namedAggregates: Seq[CalcitePair[AggregateCall, String]]` 
and compute `aggregates` and `aggFieldIndexes` from it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686712#comment-15686712
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89090109
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = {
+
+// the mapping relation between field index of intermediate aggregate 
Row and output Row.
+val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
+
+// the mapping relation between aggregate function index in list and 
its corresponding
+// field index in output Row.
+val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
 
-(mapFunction, reduceGroupFunction)
+if (groupingOffsetMapping.length != groupings.length ||
+  aggOffsetMapping.length != namedAggregates.length) {
+  throw new TableException(
+"Could not find output field in input data type " +
+  "or aggregate functions.")
+}
+(groupingOffsetMapping, aggOffsetMapping)
+  }
+
+
+  private[flink] def createAllWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
+} else {
+  new AggregateAllWindowFunction(aggFunction)
+}
+
+  }
+
+
+  private[flink] def createWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
+} else {
+  new AggregateWindowFunction(aggFunction)
+}
+
+  }
+
+  private[flink] def createAllWindowIncrementalAggregationFunction(
+window: LogicalWindow,
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int],
+properties: Seq[NamedWindowProperty])
+  : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+val (groupingOffsetMapping, aggOffsetMapping) =
+  getGroupingOffsetAndaggOffsetMapping(
+  namedAggregates,
+  inputType,
+  

[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686706#comment-15686706
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89088166
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -39,66 +45,69 @@ object AggregateUtil {
   type JavaList[T] = java.util.List[T]
 
   /**
-   * Create Flink operator functions for aggregates. It includes 2 
implementations of Flink 
-   * operator functions:
-   * [[org.apache.flink.api.common.functions.MapFunction]] and 
-   * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's 
partial aggregate,
-   * should also implement 
[[org.apache.flink.api.common.functions.CombineFunction]] as well). 
-   * The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the 
-   * intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
-   * format:
-   *
-   * {{{
-   *   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
-   * |  |
-   * v  v
-   *+-+-+++++
-   *|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
-   *+-+-+++++
-   *  ^
-   *  |
-   *   sum(y) aggOffsetInRow = 4
-   * }}}
-   *
-   */
-  def createOperatorFunctionsForAggregates(
-  namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-  inputType: RelDataType,
-  outputType: RelDataType,
-  groupings: Array[Int])
-: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = {
-
-val aggregateFunctionsAndFieldIndexes =
-  transformToAggregateFunctions(namedAggregates.map(_.getKey), 
inputType, groupings.length)
-// store the aggregate fields of each aggregate function, by the same 
order of aggregates.
-val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
-val aggregates = aggregateFunctionsAndFieldIndexes._2
+* Create prepare MapFunction for aggregates.
+* The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the
+* intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
+* format:
+*
+* {{{
+*   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+* |  |
+* v  v
+*+-+-+++++
+*|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
+*+-+-+++++
+*  ^
+*  |
+*   sum(y) aggOffsetInRow = 4
+* }}}
+*
+*/
+  private[flink] def createPrepareMapFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+aggFieldIndexes: Array[Int],
+groupings: Array[Int],
+inputType:
+RelDataType): MapFunction[Any, Row] = {
 
 val mapReturnType: RowTypeInfo =
   createAggregateBufferDataType(groupings, aggregates, inputType)
 
 val mapFunction = new AggregateMapFunction[Row, Row](
-aggregates, aggFieldIndexes, groupings,
-
mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
-
-// the mapping relation between field index of intermediate aggregate 
Row and output Row.
-val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
-
-// the mapping relation between aggregate function index in list and 
its corresponding
-// field index in output Row.
-val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
-
-if (groupingOffsetMapping.length != groupings.length ||
-aggOffsetMapping.length != namedAggregates.length) {
-  throw new TableException("Could not find output field in input data 
type " +
-  "or aggregate functions.")
-}
-
-val allPartialAggregate = aggregates.map(_.supportPartial).forall(x => 
x)
+  aggregates,
+  aggFieldIndexes,
+

[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89089095
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
--- End diff --

Please fix camel case: `getGroupingOffsetAndAggOffsetMapping`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686707#comment-15686707
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89086247
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -39,66 +45,69 @@ object AggregateUtil {
   type JavaList[T] = java.util.List[T]
 
   /**
-   * Create Flink operator functions for aggregates. It includes 2 
implementations of Flink 
-   * operator functions:
-   * [[org.apache.flink.api.common.functions.MapFunction]] and 
-   * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's 
partial aggregate,
-   * should also implement 
[[org.apache.flink.api.common.functions.CombineFunction]] as well). 
-   * The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the 
-   * intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
-   * format:
-   *
-   * {{{
-   *   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
-   * |  |
-   * v  v
-   *+-+-+++++
-   *|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
-   *+-+-+++++
-   *  ^
-   *  |
-   *   sum(y) aggOffsetInRow = 4
-   * }}}
-   *
-   */
-  def createOperatorFunctionsForAggregates(
-  namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-  inputType: RelDataType,
-  outputType: RelDataType,
-  groupings: Array[Int])
-: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = {
-
-val aggregateFunctionsAndFieldIndexes =
-  transformToAggregateFunctions(namedAggregates.map(_.getKey), 
inputType, groupings.length)
-// store the aggregate fields of each aggregate function, by the same 
order of aggregates.
-val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
-val aggregates = aggregateFunctionsAndFieldIndexes._2
+* Create prepare MapFunction for aggregates.
+* The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the
+* intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
+* format:
+*
+* {{{
+*   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+* |  |
+* v  v
+*+-+-+++++
+*|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
+*+-+-+++++
+*  ^
+*  |
+*   sum(y) aggOffsetInRow = 4
+* }}}
+*
+*/
+  private[flink] def createPrepareMapFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+aggFieldIndexes: Array[Int],
+groupings: Array[Int],
+inputType:
+RelDataType): MapFunction[Any, Row] = {
 
 val mapReturnType: RowTypeInfo =
   createAggregateBufferDataType(groupings, aggregates, inputType)
 
 val mapFunction = new AggregateMapFunction[Row, Row](
-aggregates, aggFieldIndexes, groupings,
-
mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
-
-// the mapping relation between field index of intermediate aggregate 
Row and output Row.
-val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
-
-// the mapping relation between aggregate function index in list and 
its corresponding
-// field index in output Row.
-val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
-
-if (groupingOffsetMapping.length != groupings.length ||
-aggOffsetMapping.length != namedAggregates.length) {
-  throw new TableException("Could not find output field in input data 
type " +
-  "or aggregate functions.")
-}
-
-val allPartialAggregate = aggregates.map(_.supportPartial).forall(x => 
x)
+  aggregates,
+  aggFieldIndexes,
+

[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686708#comment-15686708
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89086106
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -39,66 +45,69 @@ object AggregateUtil {
   type JavaList[T] = java.util.List[T]
 
   /**
-   * Create Flink operator functions for aggregates. It includes 2 
implementations of Flink 
-   * operator functions:
-   * [[org.apache.flink.api.common.functions.MapFunction]] and 
-   * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's 
partial aggregate,
-   * should also implement 
[[org.apache.flink.api.common.functions.CombineFunction]] as well). 
-   * The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the 
-   * intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
-   * format:
-   *
-   * {{{
-   *   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
-   * |  |
-   * v  v
-   *+-+-+++++
-   *|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
-   *+-+-+++++
-   *  ^
-   *  |
-   *   sum(y) aggOffsetInRow = 4
-   * }}}
-   *
-   */
-  def createOperatorFunctionsForAggregates(
-  namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-  inputType: RelDataType,
-  outputType: RelDataType,
-  groupings: Array[Int])
-: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = {
-
-val aggregateFunctionsAndFieldIndexes =
-  transformToAggregateFunctions(namedAggregates.map(_.getKey), 
inputType, groupings.length)
-// store the aggregate fields of each aggregate function, by the same 
order of aggregates.
-val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
-val aggregates = aggregateFunctionsAndFieldIndexes._2
+* Create prepare MapFunction for aggregates.
+* The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the
+* intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
+* format:
+*
+* {{{
+*   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+* |  |
+* v  v
+*+-+-+++++
+*|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
+*+-+-+++++
+*  ^
+*  |
+*   sum(y) aggOffsetInRow = 4
+* }}}
+*
+*/
+  private[flink] def createPrepareMapFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+aggFieldIndexes: Array[Int],
+groupings: Array[Int],
+inputType:
+RelDataType): MapFunction[Any, Row] = {
--- End diff --

remove this line break.


> Add incremental group window aggregation for streaming Table API
> 
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an 
> incremental fashion. This means that the window collects all records and 
> performs the aggregation when the window is closed instead of eagerly 
> updating a partial aggregate for every added record. Since records are 
> buffered, non-incremental aggregation requires more storage space than 
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API 
> features [incremental 
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
>  using a {{ReduceFunction}}.
> We should 

[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686704#comment-15686704
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89101806
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = {
+
+// the mapping relation between field index of intermediate aggregate 
Row and output Row.
+val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
+
+// the mapping relation between aggregate function index in list and 
its corresponding
+// field index in output Row.
+val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
 
-(mapFunction, reduceGroupFunction)
+if (groupingOffsetMapping.length != groupings.length ||
+  aggOffsetMapping.length != namedAggregates.length) {
+  throw new TableException(
+"Could not find output field in input data type " +
+  "or aggregate functions.")
+}
+(groupingOffsetMapping, aggOffsetMapping)
+  }
+
+
+  private[flink] def createAllWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
--- End diff --

I'd generate the `GroupReduceFunction` directly in this method. That will 
make the calling code simpler.


> Add incremental group window aggregation for streaming Table API
> 
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an 
> incremental fashion. This means that the window collects all records and 
> performs the aggregation when the window is closed instead of eagerly 
> updating a partial aggregate for every added record. Since records are 
> buffered, non-incremental aggregation requires more storage space than 
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API 
> features [incremental 
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
>  using a {{ReduceFunction}}.
> We should add support for incremental aggregation in group-windows.
> This is a follow-up task of FLINK-4691.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89101976
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = {
+
+// the mapping relation between field index of intermediate aggregate 
Row and output Row.
+val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
+
+// the mapping relation between aggregate function index in list and 
its corresponding
+// field index in output Row.
+val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
 
-(mapFunction, reduceGroupFunction)
+if (groupingOffsetMapping.length != groupings.length ||
+  aggOffsetMapping.length != namedAggregates.length) {
+  throw new TableException(
+"Could not find output field in input data type " +
+  "or aggregate functions.")
+}
+(groupingOffsetMapping, aggOffsetMapping)
+  }
+
+
+  private[flink] def createAllWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
+} else {
+  new AggregateAllWindowFunction(aggFunction)
+}
+
+  }
+
+
+  private[flink] def createWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
+} else {
+  new AggregateWindowFunction(aggFunction)
+}
+
+  }
+
+  private[flink] def createAllWindowIncrementalAggregationFunction(
+window: LogicalWindow,
+aggregates: Array[Aggregate[_ <: Any]],
--- End diff --

compute `aggregates` from `namedAggregates`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686710#comment-15686710
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89101873
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = {
+
+// the mapping relation between field index of intermediate aggregate 
Row and output Row.
+val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
+
+// the mapping relation between aggregate function index in list and 
its corresponding
+// field index in output Row.
+val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
 
-(mapFunction, reduceGroupFunction)
+if (groupingOffsetMapping.length != groupings.length ||
+  aggOffsetMapping.length != namedAggregates.length) {
+  throw new TableException(
+"Could not find output field in input data type " +
+  "or aggregate functions.")
+}
+(groupingOffsetMapping, aggOffsetMapping)
+  }
+
+
+  private[flink] def createAllWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
+} else {
+  new AggregateAllWindowFunction(aggFunction)
+}
+
+  }
+
+
+  private[flink] def createWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
--- End diff --

Same as above.


> Add incremental group window aggregation for streaming Table API
> 
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an 
> incremental fashion. This means that the window collects all records and 
> performs the aggregation when the window is closed instead of eagerly 
> updating a partial aggregate for every added record. Since records are 
> buffered, non-incremental aggregation requires more storage space than 
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API 
> features [incremental 
> 

[GitHub] flink issue #2842: [FLINK-5097][gelly] Add missing input type information to...

2016-11-22 Thread vasia
Github user vasia commented on the issue:

https://github.com/apache/flink/pull/2842
  
Hi @greghogan,
I was able to fix the problem in `fromDataSet()` and `groupReduceOnEdges()` 
with `EdgesFunction`. In the rest of the uses, I don't seem to find a way to 
pass all the input types correctly. The remaining cases all have 3 input types 
and the `createTypeInfo()` method only accepts two. I have also tried 
extracting the input types from the wrapping functions, but that didn't work 
either. Any ideas?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5097) The TypeExtractor is missing input type information in some Graph methods

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686757#comment-15686757
 ] 

ASF GitHub Bot commented on FLINK-5097:
---

Github user vasia commented on the issue:

https://github.com/apache/flink/pull/2842
  
Hi @greghogan,
I was able to fix the problem in `fromDataSet()` and `groupReduceOnEdges()` 
with `EdgesFunction`. In the rest of the uses, I don't seem to find a way to 
pass all the input types correctly. The remaining cases all have 3 input types 
and the `createTypeInfo()` method only accepts two. I have also tried 
extracting the input types from the wrapping functions, but that didn't work 
either. Any ideas?


> The TypeExtractor is missing input type information in some Graph methods
> -
>
> Key: FLINK-5097
> URL: https://issues.apache.org/jira/browse/FLINK-5097
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Reporter: Vasia Kalavri
>Assignee: Vasia Kalavri
>
> The TypeExtractor is called without information about the input type in 
> {{mapVertices}} and {{mapEdges}} although this information can be easily 
> retrieved.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4155) Get Kafka producer partition info in open method instead of constructor

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686810#comment-15686810
 ] 

ASF GitHub Bot commented on FLINK-4155:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2681
  
Thanks for the review @rmetzger.

No, actually I haven't refactored the table tests yet. I'll file up the 
follow-up JIRAs mentioned in the comments, and merge the PR as is.


> Get Kafka producer partition info in open method instead of constructor
> ---
>
> Key: FLINK-4155
> URL: https://issues.apache.org/jira/browse/FLINK-4155
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.0, 1.0.3
>Reporter: Gyula Fora
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0
>
>
> Currently the Flink Kafka producer does not really do any error handling if 
> something is wrong with the partition metadata as it is serialized with the 
> user function.
> This means that in some cases the job can go into an error loop when using 
> the checkpoints. Getting the partition info in the open method would solve 
> this problem (like restarting from a savepoint which re-runs the constructor).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2822: [FLINK-5075] [kinesis] Make Kinesis consumer fail-...

2016-11-22 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2822#discussion_r89100942
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -283,7 +285,7 @@ public String getShardIterator(KinesisStreamShard 
shard, String shardIteratorTyp
 * @param startShardId which shard to start with for this describe 
operation (earlier shard's infos will not appear in result)
 * @return the result of the describe stream operation
 */
-   private DescribeStreamResult describeStream(String streamName, String 
startShardId) throws InterruptedException {
+   private DescribeStreamResult describeStream(String streamName, 
@Nullable String startShardId) throws InterruptedException {
--- End diff --

Does only Kinesalite supply the `startShardId` parameter?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5049) Instability in QueryableStateITCase.testQueryableStateWithTaskManagerFailure

2016-11-22 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1568#comment-1568
 ] 

Robert Metzger commented on FLINK-5049:
---

Another instance of the issue: 
https://api.travis-ci.org/jobs/177925657/log.txt?deansi=true

> Instability in QueryableStateITCase.testQueryableStateWithTaskManagerFailure
> 
>
> Key: FLINK-5049
> URL: https://issues.apache.org/jira/browse/FLINK-5049
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>  Labels: test-stability
>
> https://api.travis-ci.org/jobs/174843846/log.txt?deansi=true
> {code}
> org.apache.flink.test.query.QueryableStateITCase
> testQueryableStateWithTaskManagerFailure(org.apache.flink.test.query.QueryableStateITCase)
>   Time elapsed: 8.096 sec  <<< FAILURE!
> java.lang.AssertionError: Count moved backwards
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at 
> org.apache.flink.test.query.QueryableStateITCase.testQueryableStateWithTaskManagerFailure(QueryableStateITCase.java:470)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2681: [FLINK-4155] [kafka] Move partition list fetching to open...

2016-11-22 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2681
  
Thanks for the review @rmetzger.

No, actually I haven't refactored the table tests yet. I'll file up the 
follow-up JIRAs mentioned in the comments, and merge the PR as is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2810: [FLINK-3848] Add ProjectableTableSource interface and tra...

2016-11-22 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/2810
  
> In addition, the TableSource should know project information (including 
order) not just fieldIncluded. So maybe we should also adapt RowCsvInputFormat.

Do you mean we need to shuffle field in row according to projection while 
scanning a file? Why not to map row fields after that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   4   >