[jira] [Created] (FLINK-5140) NoResourceAvailableException is thrown while starting a new job

2016-11-22 Thread Biao Liu (JIRA)
Biao Liu created FLINK-5140:
---

 Summary: NoResourceAvailableException is thrown while starting a 
new job
 Key: FLINK-5140
 URL: https://issues.apache.org/jira/browse/FLINK-5140
 Project: Flink
  Issue Type: Bug
  Components: Cluster Management
 Environment: FLIP-6 feature branch
Reporter: Biao Liu
Priority: Minor


Here is the stack trace:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: not 
connected to ResourceManager and no slot available
at 
org.apache.flink.runtime.instance.SlotPool.internalAllocateSlot(SlotPool.java:281)
at 
org.apache.flink.runtime.instance.SlotPool.allocateSlot(SlotPool.java:256)
...
Currently I have to set RestartStrategy to handle this exception. But in some 
test cases, I want to test failure about the cluster. It will make this much 
more complicated. After discussing with [~tiemsn], maybe we can fix this 
problem by executing executionGraph after registered to resource manager.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5139) consum kafka can set offset

2016-11-22 Thread jing lining (JIRA)
jing lining created FLINK-5139:
--

 Summary: consum kafka can set offset
 Key: FLINK-5139
 URL: https://issues.apache.org/jira/browse/FLINK-5139
 Project: Flink
  Issue Type: Improvement
  Components: Kafka Connector
Affects Versions: 1.1.3
Reporter: jing lining


now get offset by this 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08#getInvalidOffsetBehavior
 class

whether can put in offset which is number



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5138) The StateBackend provides the method to estimate memory usage based on hint of state size in ResourceSpec

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5138:
-
Description: 
Users may specify the state size in *setResource* API, then the different 
*StateBackend* implementation should roughly estimate the different kinds of 
memory usages based on the state size. This part of estimate memory will be 
aggregated with other memories in *ResourceSpec*. There are two advantages to 
do this:
  - For RocksDB backend, the proper memory setting for RocksDB can get better 
performance in  read and write.
  - This estimate memory will be considered when requesting resource for 
container, so the total memory usage will not exceed the container limit.

> The StateBackend provides the method to estimate memory usage based on hint 
> of state size in ResourceSpec
> -
>
> Key: FLINK-5138
> URL: https://issues.apache.org/jira/browse/FLINK-5138
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core, DataSet API, DataStream API
>Reporter: Zhijiang Wang
>
> Users may specify the state size in *setResource* API, then the different 
> *StateBackend* implementation should roughly estimate the different kinds of 
> memory usages based on the state size. This part of estimate memory will be 
> aggregated with other memories in *ResourceSpec*. There are two advantages to 
> do this:
>   - For RocksDB backend, the proper memory setting for RocksDB can get better 
> performance in  read and write.
>   - This estimate memory will be considered when requesting resource for 
> container, so the total memory usage will not exceed the container limit.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5138) The StateBackend provides the method to estimate memory usage based on hint of state size in ResourceSpec

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5138:
-
Component/s: Core

> The StateBackend provides the method to estimate memory usage based on hint 
> of state size in ResourceSpec
> -
>
> Key: FLINK-5138
> URL: https://issues.apache.org/jira/browse/FLINK-5138
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core, DataSet API, DataStream API
>Reporter: Zhijiang Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5138) The StateBackend provides the method to estimate memory usage based on hint of state size in ResourceSpec

2016-11-22 Thread Zhijiang Wang (JIRA)
Zhijiang Wang created FLINK-5138:


 Summary: The StateBackend provides the method to estimate memory 
usage based on hint of state size in ResourceSpec
 Key: FLINK-5138
 URL: https://issues.apache.org/jira/browse/FLINK-5138
 Project: Flink
  Issue Type: Sub-task
  Components: DataSet API, DataStream API
Reporter: Zhijiang Wang






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5131) Fine-grained Resource Configuration

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5131:
-
Description: 
Normally the UDF just creates short-life small objects and these can be 
recycled quickly by JVM, so most of the memory resource is controlled and 
managed by *TaskManager* framework.  But for some special cases, the UDF may 
consume much resource to create long-live big objects, so it is necessary to 
provide the options for professional users to define the resource usages if 
needed.

The basic approach is the following:
  - Introduce the *ResourceSpec* structure to describe the different resource 
factors (cpu cores, heap memory, direct memory, native memory, etc) and provide 
some basic construction methods for resource group.
  - The *ResourceSpec* can be setted onto the internal transformation in 
DataStream and base operator in DataSet separately.
  - The *StateBackend* should provide the method to estimate the memory usage 
based on hint of state size in *ResourceSpec*.
  - In job graph generation, the *ResourceSpec* will be aggregated  for chained 
operators.
  - When *JobManager* requests slot for submitting task from *ResourceManager*, 
the *ResourceProfile* will be expanded  to correspond with *ResourceSpec*.
  - The *ResourceManager* requests resource for container from cluster, it 
should consider extra framework memory except for slot *ResourceProfile*.
  - The framework memory is mainly used by *NetworkBufferPool* and 
*MemoryManager* in *TaskManager*, and it can be configured in job level.
  - Apart from resource, The JVM options attached with container should be 
supported and could also be configured in job level.

This feature will be implemented directly into flip-6 branch.

  was:
Normally the UDF just creates short-life small objects and these can be 
recycled quickly by JVM, so most of the memory resource is controlled and 
managed by *TaskManager* framework.  But for some special cases, the UDF may 
consume much resource to create long-live big objects, so it is necessary to 
provide the options for professional users to define the resource usages if 
needed.

The basic approach is the following:
  - Introduce the *ResourceSpec* structure to describe the different resource 
factors (cpu cores, heap memory, direct memory, native memory, etc) and provide 
some basic construction methods for resource group.
  - The *ResourceSpec* can be setted onto the internal transformation in 
DataStream and base operator in DataSet separately.
  - In job graph generation, the *ResourceSpec* will be aggregated  for chained 
operators.
  - When *JobManager* requests slot for submitting task from *ResourceManager*, 
the *ResourceProfile* will be expanded  to correspond with *ResourceSpec*.
  - The *ResourceManager* requests resource for container from cluster, it 
should consider extra framework memory except for slot *ResourceProfile*.
  - The framework memory is mainly used by *NetworkBufferPool* and 
*MemoryManager* in *TaskManager*, and it can be configured in job level.
  - Apart from resource, The JVM options attached with container should be 
supported and could also be configured in job level.

This feature will be implemented directly into flip-6 branch.


> Fine-grained Resource Configuration
> ---
>
> Key: FLINK-5131
> URL: https://issues.apache.org/jira/browse/FLINK-5131
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API, DataStream API, JobManager, ResourceManager
>Reporter: Zhijiang Wang
>
> Normally the UDF just creates short-life small objects and these can be 
> recycled quickly by JVM, so most of the memory resource is controlled and 
> managed by *TaskManager* framework.  But for some special cases, the UDF may 
> consume much resource to create long-live big objects, so it is necessary to 
> provide the options for professional users to define the resource usages if 
> needed.
> The basic approach is the following:
>   - Introduce the *ResourceSpec* structure to describe the different resource 
> factors (cpu cores, heap memory, direct memory, native memory, etc) and 
> provide some basic construction methods for resource group.
>   - The *ResourceSpec* can be setted onto the internal transformation in 
> DataStream and base operator in DataSet separately.
>   - The *StateBackend* should provide the method to estimate the memory usage 
> based on hint of state size in *ResourceSpec*.
>   - In job graph generation, the *ResourceSpec* will be aggregated  for 
> chained operators.
>   - When *JobManager* requests slot for submitting task from 
> *ResourceManager*, the *ResourceProfile* will be expanded  to correspond with 
> *ResourceSpec*.
>   - The *ResourceManager* requests resource for container from cluster, it 
> should consider extra framework 

[jira] [Assigned] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating job graph

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang reassigned FLINK-5134:


Assignee: Zhijiang Wang

> Aggregate ResourceSpe for chained operators when generating job graph
> -
>
> Key: FLINK-5134
> URL: https://issues.apache.org/jira/browse/FLINK-5134
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataStream API
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> In *JobGraph* generation, each *JobVertex* corresponds to a series of chained 
> operators.
> The resource of *JobVertex* should be aggregation of individual resource in 
> chained operators.
> For memory resource in *JobVertex*, the aggregation is the sum formula for 
> chained operators. 
> And for cpu cores resource in *JobVertex*, the aggregation is the maximum 
> formula for chained operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5133) Add new setResource API for DataStream and DataSet

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang reassigned FLINK-5133:


Assignee: Zhijiang Wang

> Add new setResource API for DataStream and DataSet
> --
>
> Key: FLINK-5133
> URL: https://issues.apache.org/jira/browse/FLINK-5133
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataSet API, DataStream API
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> This is part of the fine-grained resource configuration.
> For *DataStream*, the *setResource* API will be setted onto 
> *SingleOutputStreamOperator* similar with other existing properties like 
> parallelism, name, etc.
> For *DataSet*, the *setResource* API will be setted onto *Operator* in the 
> similar way.
> There are two parameters described with minimum *ResourceSpec* and maximum 
> *ResourceSpec* separately in the API for considering resource resize in 
> future improvements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5136) Aggregation of slot ResourceProfile and framework memory by ResourceManager

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang reassigned FLINK-5136:


Assignee: Zhijiang Wang

> Aggregation of slot ResourceProfile and framework memory by ResourceManager
> ---
>
> Key: FLINK-5136
> URL: https://issues.apache.org/jira/browse/FLINK-5136
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> When *ResourceManager* requests container resource from cluster, the 
> framework memory should be considered together with slot resource.
> Currently the framework memory is mainly used by *MemoryManager* and 
> *NetworkBufferPool* in *TaskManager*, and this memory can be got from 
> configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5135) ResourceProfile for slot request should be expanded to correspond with ResourceSpec

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang reassigned FLINK-5135:


Assignee: Zhijiang Wang

> ResourceProfile for slot request should be expanded to correspond with 
> ResourceSpec
> ---
>
> Key: FLINK-5135
> URL: https://issues.apache.org/jira/browse/FLINK-5135
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, ResourceManager
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> The *JobManager* requests slot by *ResourceProfile* from *ResourceManager* 
> before submitting tasks. Currently the *ResourceProfile* only contains cpu 
> cores and memory properties. The memory should be expanded to different types 
> such as heap memory, direct memory and native memory which corresponds with 
> memory in *ResourceSpec*.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5132) Introduce the ResourceSpec for grouping different resource factors in API

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang reassigned FLINK-5132:


Assignee: Zhijiang Wang

> Introduce the ResourceSpec for grouping different resource factors in API
> -
>
> Key: FLINK-5132
> URL: https://issues.apache.org/jira/browse/FLINK-5132
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> This is part of the fine-grained resource configuration.
> The current resource factors include cpu cores, heap memory, direct memory, 
> native memory and state size.
> The *ResourceSpec* will provide some basic constructions for grouping 
> different resource factors as needed and the construction can also be 
> expanded easily for further requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5137) Config JVM options and set onto the TaskManager container

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang reassigned FLINK-5137:


Assignee: Zhijiang Wang

> Config JVM options and set onto the TaskManager container
> -
>
> Key: FLINK-5137
> URL: https://issues.apache.org/jira/browse/FLINK-5137
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager, TaskManager
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> The users can config JVM options in job level. And the *ResourceManager* will 
> set related JVM options for launching *TaskManager* container based on 
> framework memory and slot resource profile.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5137) Config JVM options and set onto the TaskManager container

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5137:
-
Description: The users can config JVM options in job level. And the 
*ResourceManager* will set related JVM options for launching *TaskManager* 
container based on framework memory and slot resource profile.

> Config JVM options and set onto the TaskManager container
> -
>
> Key: FLINK-5137
> URL: https://issues.apache.org/jira/browse/FLINK-5137
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager, TaskManager
>Reporter: Zhijiang Wang
>
> The users can config JVM options in job level. And the *ResourceManager* will 
> set related JVM options for launching *TaskManager* container based on 
> framework memory and slot resource profile.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5137) Config JVM options and set onto the TaskManager container

2016-11-22 Thread Zhijiang Wang (JIRA)
Zhijiang Wang created FLINK-5137:


 Summary: Config JVM options and set onto the TaskManager container
 Key: FLINK-5137
 URL: https://issues.apache.org/jira/browse/FLINK-5137
 Project: Flink
  Issue Type: Sub-task
  Components: ResourceManager, TaskManager
Reporter: Zhijiang Wang






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5136) Aggregation of slot ResourceProfile and framework memory by ResourceManager

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5136:
-
Description: 
When *ResourceManager* requests container resource from cluster, the framework 
memory should be considered together with slot resource.
Currently the framework memory is mainly used by *MemoryManager* and 
*NetworkBufferPool* in *TaskManager*, and this memory can be got from 
configuration.

> Aggregation of slot ResourceProfile and framework memory by ResourceManager
> ---
>
> Key: FLINK-5136
> URL: https://issues.apache.org/jira/browse/FLINK-5136
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Zhijiang Wang
>
> When *ResourceManager* requests container resource from cluster, the 
> framework memory should be considered together with slot resource.
> Currently the framework memory is mainly used by *MemoryManager* and 
> *NetworkBufferPool* in *TaskManager*, and this memory can be got from 
> configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5136) Aggregation of slot ResourceProfile and framework memory by ResourceManager

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5136:
-
Summary: Aggregation of slot ResourceProfile and framework memory by 
ResourceManager  (was: Aggregation of slot ResourceProfile and framework memory 
for requesting resource from cluster by ResourceManager)

> Aggregation of slot ResourceProfile and framework memory by ResourceManager
> ---
>
> Key: FLINK-5136
> URL: https://issues.apache.org/jira/browse/FLINK-5136
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Zhijiang Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5136) Aggregation of slot ResourceProfile and framework memory for requesting resource from cluster by ResourceManager

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5136:
-
Summary: Aggregation of slot ResourceProfile and framework memory for 
requesting resource from cluster by ResourceManager  (was: ResourceManager 
should consider both slot resource profile and framework memory for requesting 
resource from cluster)

> Aggregation of slot ResourceProfile and framework memory for requesting 
> resource from cluster by ResourceManager
> 
>
> Key: FLINK-5136
> URL: https://issues.apache.org/jira/browse/FLINK-5136
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Zhijiang Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5136) ResourceManager should consider both slot resource profile and framework memory for requesting resource from cluster

2016-11-22 Thread Zhijiang Wang (JIRA)
Zhijiang Wang created FLINK-5136:


 Summary: ResourceManager should consider both slot resource 
profile and framework memory for requesting resource from cluster
 Key: FLINK-5136
 URL: https://issues.apache.org/jira/browse/FLINK-5136
 Project: Flink
  Issue Type: Sub-task
  Components: ResourceManager
Reporter: Zhijiang Wang






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5135) ResourceProfile for slot request should be expanded to correspond with ResourceSpec

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5135:
-
Description: The *JobManager* requests slot by *ResourceProfile* from 
*ResourceManager* before submitting tasks. Currently the *ResourceProfile* only 
contains cpu cores and memory properties. The memory should be expanded to 
different types such as heap memory, direct memory and native memory which 
corresponds with memory in *ResourceSpec*.

> ResourceProfile for slot request should be expanded to correspond with 
> ResourceSpec
> ---
>
> Key: FLINK-5135
> URL: https://issues.apache.org/jira/browse/FLINK-5135
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, ResourceManager
>Reporter: Zhijiang Wang
>
> The *JobManager* requests slot by *ResourceProfile* from *ResourceManager* 
> before submitting tasks. Currently the *ResourceProfile* only contains cpu 
> cores and memory properties. The memory should be expanded to different types 
> such as heap memory, direct memory and native memory which corresponds with 
> memory in *ResourceSpec*.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5135) ResourceProfile for slot request should be expanded to correspond with ResourceSpec

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5135:
-
Summary: ResourceProfile for slot request should be expanded to correspond 
with ResourceSpec  (was: ResourceProfile should be expanded to correspond with 
ResourceSpec)

> ResourceProfile for slot request should be expanded to correspond with 
> ResourceSpec
> ---
>
> Key: FLINK-5135
> URL: https://issues.apache.org/jira/browse/FLINK-5135
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, ResourceManager
>Reporter: Zhijiang Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5131) Fine-grained Resource Configuration

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5131:
-
Description: 
Normally the UDF just creates short-life small objects and these can be 
recycled quickly by JVM, so most of the memory resource is controlled and 
managed by *TaskManager* framework.  But for some special cases, the UDF may 
consume much resource to create long-live big objects, so it is necessary to 
provide the options for professional users to define the resource usages if 
needed.

The basic approach is the following:
  - Introduce the *ResourceSpec* structure to describe the different resource 
factors (cpu cores, heap memory, direct memory, native memory, etc) and provide 
some basic construction methods for resource group.
  - The *ResourceSpec* can be setted onto the internal transformation in 
DataStream and base operator in DataSet separately.
  - In job graph generation, the *ResourceSpec* will be aggregated  for chained 
operators.
  - When *JobManager* requests slot for submitting task from *ResourceManager*, 
the *ResourceProfile* will be expanded  to correspond with *ResourceSpec*.
  - The *ResourceManager* requests resource for container from cluster, it 
should consider extra framework memory except for slot *ResourceProfile*.
  - The framework memory is mainly used by *NetworkBufferPool* and 
*MemoryManager* in *TaskManager*, and it can be configured in job level.
  - Apart from resource, The JVM options attached with container should be 
supported and could also be configured in job level.

This feature will be implemented directly into flip-6 branch.

  was:
Normally the UDF just creates short-life small objects and these can be 
recycled quickly by JVM, so most of the memory resource is controlled and 
managed by *TaskManager* framework.  But for some special cases, the UDF may 
consume much resource to create long-live big objects, so it is necessary to 
provide the options for professional users to define the resource usages if 
needed.

The basic approach is the following:
  - Introduce the *ResourceSpec* structure to describe the different resource 
factors (cpu cores, heap memory, direct memory, native memory, etc) and provide 
some basic construction methods for resource group.
  - The *ResourceSpec* can be setted onto the internal transformation in 
DataStream and base operator in DataSet separately.
  - In job graph generation, the *ResourceSpec* will be aggregated  for chained 
operators.
  - When *JobManager* requests slot for submitting task from *ResourceManager*, 
the *ResourceProfile* will be expanded  to correspondence with *ResourceSpec*.
  - The *ResourceManager* requests resource for container from cluster, it 
should consider extra framework memory except for slot *ResourceProfile*.
  - The framework memory is mainly used by *NetworkBufferPool* and 
*MemoryManager* in *TaskManager*, and it can be configured in job level.
  - Apart from resource, The JVM options attached with container should be 
supported and could also be configured in job level.

This feature will be implemented directly into flip-6 branch.


> Fine-grained Resource Configuration
> ---
>
> Key: FLINK-5131
> URL: https://issues.apache.org/jira/browse/FLINK-5131
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API, DataStream API, JobManager, ResourceManager
>Reporter: Zhijiang Wang
>
> Normally the UDF just creates short-life small objects and these can be 
> recycled quickly by JVM, so most of the memory resource is controlled and 
> managed by *TaskManager* framework.  But for some special cases, the UDF may 
> consume much resource to create long-live big objects, so it is necessary to 
> provide the options for professional users to define the resource usages if 
> needed.
> The basic approach is the following:
>   - Introduce the *ResourceSpec* structure to describe the different resource 
> factors (cpu cores, heap memory, direct memory, native memory, etc) and 
> provide some basic construction methods for resource group.
>   - The *ResourceSpec* can be setted onto the internal transformation in 
> DataStream and base operator in DataSet separately.
>   - In job graph generation, the *ResourceSpec* will be aggregated  for 
> chained operators.
>   - When *JobManager* requests slot for submitting task from 
> *ResourceManager*, the *ResourceProfile* will be expanded  to correspond with 
> *ResourceSpec*.
>   - The *ResourceManager* requests resource for container from cluster, it 
> should consider extra framework memory except for slot *ResourceProfile*.
>   - The framework memory is mainly used by *NetworkBufferPool* and 
> *MemoryManager* in *TaskManager*, and it can be configured in job level.
>   - Apart from resource, The JVM options attached with 

[jira] [Created] (FLINK-5135) ResourceProfile should be expanded to correspond with ResourceSpec

2016-11-22 Thread Zhijiang Wang (JIRA)
Zhijiang Wang created FLINK-5135:


 Summary: ResourceProfile should be expanded to correspond with 
ResourceSpec
 Key: FLINK-5135
 URL: https://issues.apache.org/jira/browse/FLINK-5135
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager, ResourceManager
Reporter: Zhijiang Wang






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating job graph

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5134:
-
Description: 
In *JobGraph* generation, each *JobVertex* corresponds to a series of chained 
operators.
The resource of *JobVertex* should be aggregation of individual resource in 
chained operators.
For memory resource in *JobVertex*, the aggregation is the sum formula for 
chained operators. 
And for cpu cores resource in *JobVertex*, the aggregation is the maximum 
formula for chained operators.

  was:
In *JobGraph* generation, each *JobVertex* corresponds to a series of chained 
operators.
The resource of *JobVertex* should be aggregation of individual resource in 
chained operators.
For memory resource in *JobVertex*, the aggregation is the sum formula for 
chained operators. And for cpu cores resource in *JobVertex*, the aggregation 
is the maximum formula for chained operators.


> Aggregate ResourceSpe for chained operators when generating job graph
> -
>
> Key: FLINK-5134
> URL: https://issues.apache.org/jira/browse/FLINK-5134
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataStream API
>Reporter: Zhijiang Wang
>
> In *JobGraph* generation, each *JobVertex* corresponds to a series of chained 
> operators.
> The resource of *JobVertex* should be aggregation of individual resource in 
> chained operators.
> For memory resource in *JobVertex*, the aggregation is the sum formula for 
> chained operators. 
> And for cpu cores resource in *JobVertex*, the aggregation is the maximum 
> formula for chained operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating job graph

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5134:
-
Description: 
In *JobGraph* generation, each *JobVertex* corresponds to a series of chained 
operators.
The resource of *JobVertex* should be aggregation of individual resource in 
chained operators.
For memory resource in *JobVertex*, the aggregation is the sum formula for 
chained operators. And for cpu cores resource in *JobVertex*, the aggregation 
is the maximum formula for chained operators.

  was:In DataStream API, the *ResourceSpec* is setted onto the internal 
transformation


> Aggregate ResourceSpe for chained operators when generating job graph
> -
>
> Key: FLINK-5134
> URL: https://issues.apache.org/jira/browse/FLINK-5134
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataStream API
>Reporter: Zhijiang Wang
>
> In *JobGraph* generation, each *JobVertex* corresponds to a series of chained 
> operators.
> The resource of *JobVertex* should be aggregation of individual resource in 
> chained operators.
> For memory resource in *JobVertex*, the aggregation is the sum formula for 
> chained operators. And for cpu cores resource in *JobVertex*, the aggregation 
> is the maximum formula for chained operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5131) Fine-grained Resource Configuration

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5131:
-
Description: 
Normally the UDF just creates short-life small objects and these can be 
recycled quickly by JVM, so most of the memory resource is controlled and 
managed by *TaskManager* framework.  But for some special cases, the UDF may 
consume much resource to create long-live big objects, so it is necessary to 
provide the options for professional users to define the resource usages if 
needed.

The basic approach is the following:
  - Introduce the *ResourceSpec* structure to describe the different resource 
factors (cpu cores, heap memory, direct memory, native memory, etc) and provide 
some basic construction methods for resource group.
  - The *ResourceSpec* can be setted onto the internal transformation in 
DataStream and base operator in DataSet separately.
  - In job graph generation, the *ResourceSpec* will be aggregated  for chained 
operators.
  - When *JobManager* requests slot for submitting task from *ResourceManager*, 
the *ResourceProfile* will be expanded  to correspondence with *ResourceSpec*.
  - The *ResourceManager* requests resource for container from cluster, it 
should consider extra framework memory except for slot *ResourceProfile*.
  - The framework memory is mainly used by *NetworkBufferPool* and 
*MemoryManager* in *TaskManager*, and it can be configured in job level.
  - Apart from resource, The JVM options attached with container should be 
supported and could also be configured in job level.

This feature will be implemented directly into flip-6 branch.

  was:
Normally the UDF just creates short-life small objects and these can be 
recycled quickly by JVM, so most of the memory resource is controlled and 
managed by *TaskManager* framework.  But for some special cases, the UDF may 
consume much resource to create long-live big objects, so it is necessary to 
provide the options for professional users to define the resource usages if 
needed.

The basic approach is the following:
  - Introduce the *ResourceSpec* structure to describe the different resource 
factors (cpu cores, heap memory, direct memory, native memory, etc) and provide 
some basic construction methods for resource group.
  - The *ResourceSpec* can be setted onto the internal transformation in 
DataStream and base operator in DataSet separately.
  - In stream graph generation, the *ResourceSpec* will be aggregated  for 
chained operators.
  - When *JobManager* requests slot for submitting task from *ResourceManager*, 
the *ResourceProfile* will be expanded  to correspondence with *ResourceSpec*.
  - The *ResourceManager* requests resource for container from cluster, it 
should consider extra framework memory except for slot *ResourceProfile*.
  - The framework memory is mainly used by *NetworkBufferPool* and 
*MemoryManager* in *TaskManager*, and it can be configured in job level.
  - Apart from resource, The JVM options attached with container should be 
supported and could also be configured in job level.

This feature will be implemented directly into flip-6 branch.


> Fine-grained Resource Configuration
> ---
>
> Key: FLINK-5131
> URL: https://issues.apache.org/jira/browse/FLINK-5131
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API, DataStream API, JobManager, ResourceManager
>Reporter: Zhijiang Wang
>
> Normally the UDF just creates short-life small objects and these can be 
> recycled quickly by JVM, so most of the memory resource is controlled and 
> managed by *TaskManager* framework.  But for some special cases, the UDF may 
> consume much resource to create long-live big objects, so it is necessary to 
> provide the options for professional users to define the resource usages if 
> needed.
> The basic approach is the following:
>   - Introduce the *ResourceSpec* structure to describe the different resource 
> factors (cpu cores, heap memory, direct memory, native memory, etc) and 
> provide some basic construction methods for resource group.
>   - The *ResourceSpec* can be setted onto the internal transformation in 
> DataStream and base operator in DataSet separately.
>   - In job graph generation, the *ResourceSpec* will be aggregated  for 
> chained operators.
>   - When *JobManager* requests slot for submitting task from 
> *ResourceManager*, the *ResourceProfile* will be expanded  to correspondence 
> with *ResourceSpec*.
>   - The *ResourceManager* requests resource for container from cluster, it 
> should consider extra framework memory except for slot *ResourceProfile*.
>   - The framework memory is mainly used by *NetworkBufferPool* and 
> *MemoryManager* in *TaskManager*, and it can be configured in job level.
>   - Apart from resource, The JVM options attached 

[jira] [Updated] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating job graph

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5134:
-
Summary: Aggregate ResourceSpe for chained operators when generating job 
graph  (was: Aggregate ResourceSpe for chained operators when generating stream 
graph)

> Aggregate ResourceSpe for chained operators when generating job graph
> -
>
> Key: FLINK-5134
> URL: https://issues.apache.org/jira/browse/FLINK-5134
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataStream API
>Reporter: Zhijiang Wang
>
> In DataStream API, the *ResourceSpec* is setted onto the internal 
> transformation



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating stream graph

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5134:
-
Description: In DataStream API, the *ResourceSpec* is setted onto the 
internal transformation  (was: In datastream API,)

> Aggregate ResourceSpe for chained operators when generating stream graph
> 
>
> Key: FLINK-5134
> URL: https://issues.apache.org/jira/browse/FLINK-5134
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataStream API
>Reporter: Zhijiang Wang
>
> In DataStream API, the *ResourceSpec* is setted onto the internal 
> transformation



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating stream graph

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5134:
-
Description: In datastream API,

> Aggregate ResourceSpe for chained operators when generating stream graph
> 
>
> Key: FLINK-5134
> URL: https://issues.apache.org/jira/browse/FLINK-5134
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataStream API
>Reporter: Zhijiang Wang
>
> In datastream API,



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating stream graph

2016-11-22 Thread Zhijiang Wang (JIRA)
Zhijiang Wang created FLINK-5134:


 Summary: Aggregate ResourceSpe for chained operators when 
generating stream graph
 Key: FLINK-5134
 URL: https://issues.apache.org/jira/browse/FLINK-5134
 Project: Flink
  Issue Type: Sub-task
  Components: DataStream API
Reporter: Zhijiang Wang






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2792: [FLINK-4937] [Table] Add incremental group window aggrega...

2016-11-22 Thread sunjincheng121
Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/2792
  
@fhueske  thanks a lot for the review. I have refactored AggregateUtil and 
updated the PR .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688860#comment-15688860
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/2792
  
@fhueske  thanks a lot for the review. I have refactored AggregateUtil and 
updated the PR .


> Add incremental group window aggregation for streaming Table API
> 
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an 
> incremental fashion. This means that the window collects all records and 
> performs the aggregation when the window is closed instead of eagerly 
> updating a partial aggregate for every added record. Since records are 
> buffered, non-incremental aggregation requires more storage space than 
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API 
> features [incremental 
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
>  using a {{ReduceFunction}}.
> We should add support for incremental aggregation in group-windows.
> This is a follow-up task of FLINK-4691.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5133) Add new setResource API for DataStream and DataSet

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5133:
-
Description: 
This is part of the fine-grained resource configuration.

For *DataStream*, the *setResource* API will be setted onto 
*SingleOutputStreamOperator* similar with other existing properties like 
parallelism, name, etc.
For *DataSet*, the *setResource* API will be setted onto *Operator* in the 
similar way.

There are two parameters described with minimum *ResourceSpec* and maximum 
*ResourceSpec* separately in the API for considering resource resize in future 
improvements.

> Add new setResource API for DataStream and DataSet
> --
>
> Key: FLINK-5133
> URL: https://issues.apache.org/jira/browse/FLINK-5133
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataSet API, DataStream API
>Reporter: Zhijiang Wang
>
> This is part of the fine-grained resource configuration.
> For *DataStream*, the *setResource* API will be setted onto 
> *SingleOutputStreamOperator* similar with other existing properties like 
> parallelism, name, etc.
> For *DataSet*, the *setResource* API will be setted onto *Operator* in the 
> similar way.
> There are two parameters described with minimum *ResourceSpec* and maximum 
> *ResourceSpec* separately in the API for considering resource resize in 
> future improvements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5023) Add get() method in State interface

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688809#comment-15688809
 ] 

ASF GitHub Bot commented on FLINK-5023:
---

Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/2768
  
@aljoscha Thanks for your review. I have updated the PR according to your 
suggestion. 


> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2768: [FLINK-5023][FLINK-5024] Add SimpleStateDescriptor to cla...

2016-11-22 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/2768
  
@aljoscha Thanks for your review. I have updated the PR according to your 
suggestion. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-5133) Add new setResource API for DataStream and DataSet

2016-11-22 Thread Zhijiang Wang (JIRA)
Zhijiang Wang created FLINK-5133:


 Summary: Add new setResource API for DataStream and DataSet
 Key: FLINK-5133
 URL: https://issues.apache.org/jira/browse/FLINK-5133
 Project: Flink
  Issue Type: Sub-task
  Components: DataSet API, DataStream API
Reporter: Zhijiang Wang






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5131) Fine-grained Resource Configuration

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5131:
-
Description: 
Normally the UDF just creates short-life small objects and these can be 
recycled quickly by JVM, so most of the memory resource is controlled and 
managed by *TaskManager* framework.  But for some special cases, the UDF may 
consume much resource to create long-live big objects, so it is necessary to 
provide the options for professional users to define the resource usages if 
needed.

The basic approach is the following:
  - Introduce the *ResourceSpec* structure to describe the different resource 
factors (cpu cores, heap memory, direct memory, native memory, etc) and provide 
some basic construction methods for resource group.
  - The *ResourceSpec* can be setted onto the internal transformation in 
DataStream and base operator in DataSet separately.
  - In stream graph generation, the *ResourceSpec* will be aggregated  for 
chained operators.
  - When *JobManager* requests slot for submitting task from *ResourceManager*, 
the *ResourceProfile* will be expanded  to correspondence with *ResourceSpec*.
  - The *ResourceManager* requests resource for container from cluster, it 
should consider extra framework memory except for slot *ResourceProfile*.
  - The framework memory is mainly used by *NetworkBufferPool* and 
*MemoryManager* in *TaskManager*, and it can be configured in job level.
  - Apart from resource, The JVM options attached with container should be 
supported and could also be configured in job level.

This feature will be implemented directly into flip-6 branch.

  was:
Normally the UDF just creates short-life small objects and these can be 
recycled quickly by JVM, so most of the memory resource is controlled and 
managed by *TaskManager* framework.  But for some special cases, the UDF may 
consume much resource to create long-live big objects, so it is necessary to 
provide the options for professional users to define the resource usages if 
needed.

The basic approach is the following:
  - Introduce the *ResourceSpec* structure to describe the different resource 
factors (cpu cores, heap memory, direct memory, native memory, etc) and provide 
some basic construction methods for resource group.
  - The *ResourceSpec* can be setted onto the internal transformation in 
DataStream and base operator in DataSet separately.
  - In stream graph generation, the *ResourceSpec* will be aggregated  for 
chained operators.
  - When *JobManager* requests slot for submitting task from *ResourceManager*, 
the *ResourceProfile* will be expanded  to correspondence with *ResourceSpec*.
  - The *ResourceManager* requests resource for container from cluster, it 
should consider extra framework memory except for slot *ResourceProfile*.
  - The framework memory is mainly used by *NetworkBufferPool* and 
*MemoryManager* in *TaskManager*, and it can be configured in job level.
  - Apart from resource, The JVM options attached with container should be 
supported and could also be configured in job level.


> Fine-grained Resource Configuration
> ---
>
> Key: FLINK-5131
> URL: https://issues.apache.org/jira/browse/FLINK-5131
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API, DataStream API, JobManager, ResourceManager
>Reporter: Zhijiang Wang
>
> Normally the UDF just creates short-life small objects and these can be 
> recycled quickly by JVM, so most of the memory resource is controlled and 
> managed by *TaskManager* framework.  But for some special cases, the UDF may 
> consume much resource to create long-live big objects, so it is necessary to 
> provide the options for professional users to define the resource usages if 
> needed.
> The basic approach is the following:
>   - Introduce the *ResourceSpec* structure to describe the different resource 
> factors (cpu cores, heap memory, direct memory, native memory, etc) and 
> provide some basic construction methods for resource group.
>   - The *ResourceSpec* can be setted onto the internal transformation in 
> DataStream and base operator in DataSet separately.
>   - In stream graph generation, the *ResourceSpec* will be aggregated  for 
> chained operators.
>   - When *JobManager* requests slot for submitting task from 
> *ResourceManager*, the *ResourceProfile* will be expanded  to correspondence 
> with *ResourceSpec*.
>   - The *ResourceManager* requests resource for container from cluster, it 
> should consider extra framework memory except for slot *ResourceProfile*.
>   - The framework memory is mainly used by *NetworkBufferPool* and 
> *MemoryManager* in *TaskManager*, and it can be configured in job level.
>   - Apart from resource, The JVM options attached with container should be 
> supported and could also be 

[jira] [Updated] (FLINK-5132) Introduce the ResourceSpec for grouping different resource factors in API

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5132:
-
Description: 
This is part of the fine-grained resource configuration.
The current resource factors include cpu cores, heap memory, direct memory, 
native memory and state size.
The *ResourceSpec* will provide some basic constructions for grouping different 
resource factors as needed and the construction can also be expanded easily for 
further requirements.


> Introduce the ResourceSpec for grouping different resource factors in API
> -
>
> Key: FLINK-5132
> URL: https://issues.apache.org/jira/browse/FLINK-5132
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Zhijiang Wang
>
> This is part of the fine-grained resource configuration.
> The current resource factors include cpu cores, heap memory, direct memory, 
> native memory and state size.
> The *ResourceSpec* will provide some basic constructions for grouping 
> different resource factors as needed and the construction can also be 
> expanded easily for further requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5132) Introduce the ResourceSpec for grouping different resource factors in API

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5132:
-
Description: 
This is part of the fine-grained resource configuration.

The current resource factors include cpu cores, heap memory, direct memory, 
native memory and state size.
The *ResourceSpec* will provide some basic constructions for grouping different 
resource factors as needed and the construction can also be expanded easily for 
further requirements.


  was:
This is part of the fine-grained resource configuration.
The current resource factors include cpu cores, heap memory, direct memory, 
native memory and state size.
The *ResourceSpec* will provide some basic constructions for grouping different 
resource factors as needed and the construction can also be expanded easily for 
further requirements.



> Introduce the ResourceSpec for grouping different resource factors in API
> -
>
> Key: FLINK-5132
> URL: https://issues.apache.org/jira/browse/FLINK-5132
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Zhijiang Wang
>
> This is part of the fine-grained resource configuration.
> The current resource factors include cpu cores, heap memory, direct memory, 
> native memory and state size.
> The *ResourceSpec* will provide some basic constructions for grouping 
> different resource factors as needed and the construction can also be 
> expanded easily for further requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-22 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r89252908
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -135,6 +138,32 @@ object UserDefinedFunctionUtils {
   }
 
   /**
+* Returns eval method matching the given signature of 
[[TypeInformation]].
+*/
+  def getEvalMethod(
+function: EvaluableFunction,
--- End diff --

hi @fhueske When we declare the ScalarFunction and TableFunction as 
follows: trait ScalarFunction, trait TableFunction [T], it will become very 
useful. The current version is optional.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688781#comment-15688781
 ] 

ASF GitHub Bot commented on FLINK-4469:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r89252908
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -135,6 +138,32 @@ object UserDefinedFunctionUtils {
   }
 
   /**
+* Returns eval method matching the given signature of 
[[TypeInformation]].
+*/
+  def getEvalMethod(
+function: EvaluableFunction,
--- End diff --

hi @fhueske When we declare the ScalarFunction and TableFunction as 
follows: trait ScalarFunction, trait TableFunction [T], it will become very 
useful. The current version is optional.


> Add support for user defined table function in Table API & SQL
> --
>
> Key: FLINK-4469
> URL: https://issues.apache.org/jira/browse/FLINK-4469
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Normal user-defined functions, such as concat(), take in a single input row 
> and output a single output row. In contrast, table-generating functions 
> transform a single input row to multiple output rows. It is very useful in 
> some cases, such as look up in HBase by rowkey and return one or more rows.
> Adding a user defined table function should:
> 1. inherit from UDTF class with specific generic type T
> 2. define one or more evel function. 
> NOTE: 
> 1. the eval method must be public and non-static.
> 2. the generic type T is the row type returned by table function. Because of 
> Java type erasure, we can’t extract T from the Iterable.
> 3. use {{collect(T)}} to emit table row
> 4. eval method can be overload. Blink will choose the best match eval method 
> to call according to parameter types and number.
> {code}
> public class Word {
>   public String word;
>   public Integer length;
> }
> public class SplitStringUDTF extends UDTF {
> public Iterable eval(String str) {
> if (str != null) {
> for (String s : str.split(",")) {
> collect(new Word(s, s.length()));
> }
> }
> }
> }
> // in SQL
> tableEnv.registerFunction("split", new SplitStringUDTF())
> tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS 
> t(w,l)")
> // in Java Table API
> tableEnv.registerFunction("split", new SplitStringUDTF())
> // rename split table columns to “w” and “l”
> table.crossApply("split(c) as (w, l)")
>  .select("a, b, w, l")
> // without renaming, we will use the origin field names in the POJO/case/...
> table.crossApply("split(c)")
>  .select("a, b, word, length")
> // in Scala Table API
> val split = new SplitStringUDTF()
> table.crossApply(split('c) as ('w, 'l))
>  .select('a, 'b, 'w, 'l)
> // outerApply for outer join to a UDTF
> table.outerApply(split('c))
>  .select('a, 'b, 'word, 'length)
> {code}
> See [1] for more information about UDTF design.
> [1] 
> https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit#



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5132) Introduce the ResourceSpec for grouping different resource factors in API

2016-11-22 Thread Zhijiang Wang (JIRA)
Zhijiang Wang created FLINK-5132:


 Summary: Introduce the ResourceSpec for grouping different 
resource factors in API
 Key: FLINK-5132
 URL: https://issues.apache.org/jira/browse/FLINK-5132
 Project: Flink
  Issue Type: Sub-task
  Components: Core
Reporter: Zhijiang Wang






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5131) Fine-grained Resource Configuration

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5131:
-
Description: 
Normally the UDF just creates short-life small objects and these can be 
recycled quickly by JVM, so most of the memory resource is controlled and 
managed by *TaskManager* framework.  But for some special cases, the UDF may 
consume much resource to create long-live big objects, so it is necessary to 
provide the options for professional users to define the resource usages if 
needed.

The basic approach is the following:
  - Introduce the *ResourceSpec* structure to describe the different resource 
factors (cpu cores, heap memory, direct memory, native memory, etc) and provide 
some basic construction methods for resource group.
  - The *ResourceSpec* can be setted onto the internal transformation in 
DataStream and base operator in DataSet separately.
  - In stream graph generation, the *ResourceSpec* will be aggregated  for 
chained operators.
  - When *JobManager* requests slot for submitting task from *ResourceManager*, 
the *ResourceProfile* will be expanded  to correspondence with *ResourceSpec*.
  - The *ResourceManager* requests resource for container from cluster, it 
should consider extra framework memory except for slot *ResourceProfile*.
  - The framework memory is mainly used by *NetworkBufferPool* and 
*MemoryManager* in *TaskManager*, and it can be configured in job level.
  - Apart from resource, The JVM options attached with container should be 
supported and could also be configured in job level.

> Fine-grained Resource Configuration
> ---
>
> Key: FLINK-5131
> URL: https://issues.apache.org/jira/browse/FLINK-5131
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API, DataStream API, JobManager, ResourceManager
>Reporter: Zhijiang Wang
>
> Normally the UDF just creates short-life small objects and these can be 
> recycled quickly by JVM, so most of the memory resource is controlled and 
> managed by *TaskManager* framework.  But for some special cases, the UDF may 
> consume much resource to create long-live big objects, so it is necessary to 
> provide the options for professional users to define the resource usages if 
> needed.
> The basic approach is the following:
>   - Introduce the *ResourceSpec* structure to describe the different resource 
> factors (cpu cores, heap memory, direct memory, native memory, etc) and 
> provide some basic construction methods for resource group.
>   - The *ResourceSpec* can be setted onto the internal transformation in 
> DataStream and base operator in DataSet separately.
>   - In stream graph generation, the *ResourceSpec* will be aggregated  for 
> chained operators.
>   - When *JobManager* requests slot for submitting task from 
> *ResourceManager*, the *ResourceProfile* will be expanded  to correspondence 
> with *ResourceSpec*.
>   - The *ResourceManager* requests resource for container from cluster, it 
> should consider extra framework memory except for slot *ResourceProfile*.
>   - The framework memory is mainly used by *NetworkBufferPool* and 
> *MemoryManager* in *TaskManager*, and it can be configured in job level.
>   - Apart from resource, The JVM options attached with container should be 
> supported and could also be configured in job level.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89249046
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = {
+
+// the mapping relation between field index of intermediate aggregate 
Row and output Row.
+val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
+
+// the mapping relation between aggregate function index in list and 
its corresponding
+// field index in output Row.
+val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
 
-(mapFunction, reduceGroupFunction)
+if (groupingOffsetMapping.length != groupings.length ||
+  aggOffsetMapping.length != namedAggregates.length) {
+  throw new TableException(
+"Could not find output field in input data type " +
+  "or aggregate functions.")
+}
+(groupingOffsetMapping, aggOffsetMapping)
+  }
+
+
+  private[flink] def createAllWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
--- End diff --

yes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688661#comment-15688661
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89249046
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = {
+
+// the mapping relation between field index of intermediate aggregate 
Row and output Row.
+val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
+
+// the mapping relation between aggregate function index in list and 
its corresponding
+// field index in output Row.
+val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
 
-(mapFunction, reduceGroupFunction)
+if (groupingOffsetMapping.length != groupings.length ||
+  aggOffsetMapping.length != namedAggregates.length) {
+  throw new TableException(
+"Could not find output field in input data type " +
+  "or aggregate functions.")
+}
+(groupingOffsetMapping, aggOffsetMapping)
+  }
+
+
+  private[flink] def createAllWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
--- End diff --

yes.


> Add incremental group window aggregation for streaming Table API
> 
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an 
> incremental fashion. This means that the window collects all records and 
> performs the aggregation when the window is closed instead of eagerly 
> updating a partial aggregate for every added record. Since records are 
> buffered, non-incremental aggregation requires more storage space than 
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API 
> features [incremental 
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
>  using a {{ReduceFunction}}.
> We should add support for incremental aggregation in group-windows.
> This is a follow-up task of FLINK-4691.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5061) Remove ContinuousEventTimeTrigger

2016-11-22 Thread Manu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688654#comment-15688654
 ] 

Manu Zhang commented on FLINK-5061:
---

Agreed. Just some of my thoughts and should not block the changes. 

> Remove ContinuousEventTimeTrigger
> -
>
> Key: FLINK-5061
> URL: https://issues.apache.org/jira/browse/FLINK-5061
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.2.0
>
>
> The Trigger does not do what people think it does. The problem is that a 
> watermark T signals that we won't see elements with timestamp < T in the 
> future. It does not signal that we have not yet seen elements with timestamp 
> > T. So it cannot really be used to trigger at different stages of processing 
> an event-time window.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5131) Fine-grained Resource Configuration

2016-11-22 Thread Zhijiang Wang (JIRA)
Zhijiang Wang created FLINK-5131:


 Summary: Fine-grained Resource Configuration
 Key: FLINK-5131
 URL: https://issues.apache.org/jira/browse/FLINK-5131
 Project: Flink
  Issue Type: New Feature
  Components: DataSet API, DataStream API, JobManager, ResourceManager
Reporter: Zhijiang Wang






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688612#comment-15688612
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89247407
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = {
+
+// the mapping relation between field index of intermediate aggregate 
Row and output Row.
+val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
+
+// the mapping relation between aggregate function index in list and 
its corresponding
+// field index in output Row.
+val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
 
-(mapFunction, reduceGroupFunction)
+if (groupingOffsetMapping.length != groupings.length ||
+  aggOffsetMapping.length != namedAggregates.length) {
+  throw new TableException(
+"Could not find output field in input data type " +
+  "or aggregate functions.")
+}
+(groupingOffsetMapping, aggOffsetMapping)
+  }
+
+
+  private[flink] def createAllWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
+} else {
+  new AggregateAllWindowFunction(aggFunction)
+}
+
+  }
+
+
+  private[flink] def createWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
+} else {
+  new AggregateWindowFunction(aggFunction)
+}
+
+  }
+
+  private[flink] def createAllWindowIncrementalAggregationFunction(
+window: LogicalWindow,
+aggregates: Array[Aggregate[_ <: Any]],
--- End diff --

sure. agree.


> Add incremental group window aggregation for streaming Table API
> 
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>

[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688613#comment-15688613
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89247416
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = {
+
+// the mapping relation between field index of intermediate aggregate 
Row and output Row.
+val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
+
+// the mapping relation between aggregate function index in list and 
its corresponding
+// field index in output Row.
+val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
 
-(mapFunction, reduceGroupFunction)
+if (groupingOffsetMapping.length != groupings.length ||
+  aggOffsetMapping.length != namedAggregates.length) {
+  throw new TableException(
+"Could not find output field in input data type " +
+  "or aggregate functions.")
+}
+(groupingOffsetMapping, aggOffsetMapping)
+  }
+
+
+  private[flink] def createAllWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
+} else {
+  new AggregateAllWindowFunction(aggFunction)
+}
+
+  }
+
+
+  private[flink] def createWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
+} else {
+  new AggregateWindowFunction(aggFunction)
+}
+
+  }
+
+  private[flink] def createAllWindowIncrementalAggregationFunction(
+window: LogicalWindow,
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int],
+properties: Seq[NamedWindowProperty])
+  : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+val (groupingOffsetMapping, aggOffsetMapping) =
+  getGroupingOffsetAndaggOffsetMapping(
+  namedAggregates,
+  inputType,

[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89247407
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = {
+
+// the mapping relation between field index of intermediate aggregate 
Row and output Row.
+val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
+
+// the mapping relation between aggregate function index in list and 
its corresponding
+// field index in output Row.
+val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
 
-(mapFunction, reduceGroupFunction)
+if (groupingOffsetMapping.length != groupings.length ||
+  aggOffsetMapping.length != namedAggregates.length) {
+  throw new TableException(
+"Could not find output field in input data type " +
+  "or aggregate functions.")
+}
+(groupingOffsetMapping, aggOffsetMapping)
+  }
+
+
+  private[flink] def createAllWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
+} else {
+  new AggregateAllWindowFunction(aggFunction)
+}
+
+  }
+
+
+  private[flink] def createWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
+} else {
+  new AggregateWindowFunction(aggFunction)
+}
+
+  }
+
+  private[flink] def createAllWindowIncrementalAggregationFunction(
+window: LogicalWindow,
+aggregates: Array[Aggregate[_ <: Any]],
--- End diff --

sure. agree.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89247416
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = {
+
+// the mapping relation between field index of intermediate aggregate 
Row and output Row.
+val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
+
+// the mapping relation between aggregate function index in list and 
its corresponding
+// field index in output Row.
+val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
 
-(mapFunction, reduceGroupFunction)
+if (groupingOffsetMapping.length != groupings.length ||
+  aggOffsetMapping.length != namedAggregates.length) {
+  throw new TableException(
+"Could not find output field in input data type " +
+  "or aggregate functions.")
+}
+(groupingOffsetMapping, aggOffsetMapping)
+  }
+
+
+  private[flink] def createAllWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
+} else {
+  new AggregateAllWindowFunction(aggFunction)
+}
+
+  }
+
+
+  private[flink] def createWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
+} else {
+  new AggregateWindowFunction(aggFunction)
+}
+
+  }
+
+  private[flink] def createAllWindowIncrementalAggregationFunction(
+window: LogicalWindow,
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int],
+properties: Seq[NamedWindowProperty])
+  : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+val (groupingOffsetMapping, aggOffsetMapping) =
+  getGroupingOffsetAndaggOffsetMapping(
+  namedAggregates,
+  inputType,
+  outputType,
+  groupings)
+
+val finalRowArity = outputType.getFieldCount
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+ 

[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688608#comment-15688608
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89247362
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -39,66 +45,69 @@ object AggregateUtil {
   type JavaList[T] = java.util.List[T]
 
   /**
-   * Create Flink operator functions for aggregates. It includes 2 
implementations of Flink 
-   * operator functions:
-   * [[org.apache.flink.api.common.functions.MapFunction]] and 
-   * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's 
partial aggregate,
-   * should also implement 
[[org.apache.flink.api.common.functions.CombineFunction]] as well). 
-   * The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the 
-   * intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
-   * format:
-   *
-   * {{{
-   *   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
-   * |  |
-   * v  v
-   *+-+-+++++
-   *|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
-   *+-+-+++++
-   *  ^
-   *  |
-   *   sum(y) aggOffsetInRow = 4
-   * }}}
-   *
-   */
-  def createOperatorFunctionsForAggregates(
-  namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-  inputType: RelDataType,
-  outputType: RelDataType,
-  groupings: Array[Int])
-: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = {
-
-val aggregateFunctionsAndFieldIndexes =
-  transformToAggregateFunctions(namedAggregates.map(_.getKey), 
inputType, groupings.length)
-// store the aggregate fields of each aggregate function, by the same 
order of aggregates.
-val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
-val aggregates = aggregateFunctionsAndFieldIndexes._2
+* Create prepare MapFunction for aggregates.
+* The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the
+* intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
+* format:
+*
+* {{{
+*   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+* |  |
+* v  v
+*+-+-+++++
+*|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
+*+-+-+++++
+*  ^
+*  |
+*   sum(y) aggOffsetInRow = 4
+* }}}
+*
+*/
+  private[flink] def createPrepareMapFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+aggFieldIndexes: Array[Int],
+groupings: Array[Int],
+inputType:
+RelDataType): MapFunction[Any, Row] = {
 
 val mapReturnType: RowTypeInfo =
   createAggregateBufferDataType(groupings, aggregates, inputType)
 
 val mapFunction = new AggregateMapFunction[Row, Row](
-aggregates, aggFieldIndexes, groupings,
-
mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
-
-// the mapping relation between field index of intermediate aggregate 
Row and output Row.
-val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
-
-// the mapping relation between aggregate function index in list and 
its corresponding
-// field index in output Row.
-val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
-
-if (groupingOffsetMapping.length != groupings.length ||
-aggOffsetMapping.length != namedAggregates.length) {
-  throw new TableException("Could not find output field in input data 
type " +
-  "or aggregate functions.")
-}
-
-val allPartialAggregate = aggregates.map(_.supportPartial).forall(x => 
x)
+  aggregates,
+  aggFieldIndexes,
  

[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89247362
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -39,66 +45,69 @@ object AggregateUtil {
   type JavaList[T] = java.util.List[T]
 
   /**
-   * Create Flink operator functions for aggregates. It includes 2 
implementations of Flink 
-   * operator functions:
-   * [[org.apache.flink.api.common.functions.MapFunction]] and 
-   * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's 
partial aggregate,
-   * should also implement 
[[org.apache.flink.api.common.functions.CombineFunction]] as well). 
-   * The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the 
-   * intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
-   * format:
-   *
-   * {{{
-   *   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
-   * |  |
-   * v  v
-   *+-+-+++++
-   *|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
-   *+-+-+++++
-   *  ^
-   *  |
-   *   sum(y) aggOffsetInRow = 4
-   * }}}
-   *
-   */
-  def createOperatorFunctionsForAggregates(
-  namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-  inputType: RelDataType,
-  outputType: RelDataType,
-  groupings: Array[Int])
-: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = {
-
-val aggregateFunctionsAndFieldIndexes =
-  transformToAggregateFunctions(namedAggregates.map(_.getKey), 
inputType, groupings.length)
-// store the aggregate fields of each aggregate function, by the same 
order of aggregates.
-val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
-val aggregates = aggregateFunctionsAndFieldIndexes._2
+* Create prepare MapFunction for aggregates.
+* The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the
+* intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
+* format:
+*
+* {{{
+*   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+* |  |
+* v  v
+*+-+-+++++
+*|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
+*+-+-+++++
+*  ^
+*  |
+*   sum(y) aggOffsetInRow = 4
+* }}}
+*
+*/
+  private[flink] def createPrepareMapFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+aggFieldIndexes: Array[Int],
+groupings: Array[Int],
+inputType:
+RelDataType): MapFunction[Any, Row] = {
 
 val mapReturnType: RowTypeInfo =
   createAggregateBufferDataType(groupings, aggregates, inputType)
 
 val mapFunction = new AggregateMapFunction[Row, Row](
-aggregates, aggFieldIndexes, groupings,
-
mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
-
-// the mapping relation between field index of intermediate aggregate 
Row and output Row.
-val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
-
-// the mapping relation between aggregate function index in list and 
its corresponding
-// field index in output Row.
-val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
-
-if (groupingOffsetMapping.length != groupings.length ||
-aggOffsetMapping.length != namedAggregates.length) {
-  throw new TableException("Could not find output field in input data 
type " +
-  "or aggregate functions.")
-}
-
-val allPartialAggregate = aggregates.map(_.supportPartial).forall(x => 
x)
+  aggregates,
+  aggFieldIndexes,
+  groupings,
+  
mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
 
-val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+

[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688606#comment-15688606
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89247312
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -39,66 +45,69 @@ object AggregateUtil {
   type JavaList[T] = java.util.List[T]
 
   /**
-   * Create Flink operator functions for aggregates. It includes 2 
implementations of Flink 
-   * operator functions:
-   * [[org.apache.flink.api.common.functions.MapFunction]] and 
-   * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's 
partial aggregate,
-   * should also implement 
[[org.apache.flink.api.common.functions.CombineFunction]] as well). 
-   * The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the 
-   * intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
-   * format:
-   *
-   * {{{
-   *   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
-   * |  |
-   * v  v
-   *+-+-+++++
-   *|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
-   *+-+-+++++
-   *  ^
-   *  |
-   *   sum(y) aggOffsetInRow = 4
-   * }}}
-   *
-   */
-  def createOperatorFunctionsForAggregates(
-  namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-  inputType: RelDataType,
-  outputType: RelDataType,
-  groupings: Array[Int])
-: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = {
-
-val aggregateFunctionsAndFieldIndexes =
-  transformToAggregateFunctions(namedAggregates.map(_.getKey), 
inputType, groupings.length)
-// store the aggregate fields of each aggregate function, by the same 
order of aggregates.
-val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
-val aggregates = aggregateFunctionsAndFieldIndexes._2
+* Create prepare MapFunction for aggregates.
+* The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the
+* intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
+* format:
+*
+* {{{
+*   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+* |  |
+* v  v
+*+-+-+++++
+*|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
+*+-+-+++++
+*  ^
+*  |
+*   sum(y) aggOffsetInRow = 4
+* }}}
+*
+*/
+  private[flink] def createPrepareMapFunction(
+aggregates: Array[Aggregate[_ <: Any]],
--- End diff --

Cool,I think  use Calcite's namedAggregates is a good idea.


> Add incremental group window aggregation for streaming Table API
> 
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an 
> incremental fashion. This means that the window collects all records and 
> performs the aggregation when the window is closed instead of eagerly 
> updating a partial aggregate for every added record. Since records are 
> buffered, non-incremental aggregation requires more storage space than 
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API 
> features [incremental 
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
>  using a {{ReduceFunction}}.
> We should add support for incremental aggregation in group-windows.
> This is a follow-up task of 

[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89247312
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -39,66 +45,69 @@ object AggregateUtil {
   type JavaList[T] = java.util.List[T]
 
   /**
-   * Create Flink operator functions for aggregates. It includes 2 
implementations of Flink 
-   * operator functions:
-   * [[org.apache.flink.api.common.functions.MapFunction]] and 
-   * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's 
partial aggregate,
-   * should also implement 
[[org.apache.flink.api.common.functions.CombineFunction]] as well). 
-   * The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the 
-   * intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
-   * format:
-   *
-   * {{{
-   *   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
-   * |  |
-   * v  v
-   *+-+-+++++
-   *|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
-   *+-+-+++++
-   *  ^
-   *  |
-   *   sum(y) aggOffsetInRow = 4
-   * }}}
-   *
-   */
-  def createOperatorFunctionsForAggregates(
-  namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-  inputType: RelDataType,
-  outputType: RelDataType,
-  groupings: Array[Int])
-: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = {
-
-val aggregateFunctionsAndFieldIndexes =
-  transformToAggregateFunctions(namedAggregates.map(_.getKey), 
inputType, groupings.length)
-// store the aggregate fields of each aggregate function, by the same 
order of aggregates.
-val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
-val aggregates = aggregateFunctionsAndFieldIndexes._2
+* Create prepare MapFunction for aggregates.
+* The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the
+* intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
+* format:
+*
+* {{{
+*   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+* |  |
+* v  v
+*+-+-+++++
+*|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
+*+-+-+++++
+*  ^
+*  |
+*   sum(y) aggOffsetInRow = 4
+* }}}
+*
+*/
+  private[flink] def createPrepareMapFunction(
+aggregates: Array[Aggregate[_ <: Any]],
--- End diff --

Cool,I think  use Calcite's namedAggregates is a good idea.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89244761
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
--- End diff --

yes.i'd do it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688533#comment-15688533
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89244761
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
--- End diff --

yes.i'd do it.


> Add incremental group window aggregation for streaming Table API
> 
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an 
> incremental fashion. This means that the window collects all records and 
> performs the aggregation when the window is closed instead of eagerly 
> updating a partial aggregate for every added record. Since records are 
> buffered, non-incremental aggregation requires more storage space than 
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API 
> features [incremental 
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
>  using a {{ReduceFunction}}.
> We should add support for incremental aggregation in group-windows.
> This is a follow-up task of FLINK-4691.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89242998
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -61,25 +61,108 @@ object AggregateUtil {
* }}}
*
*/
-  def createOperatorFunctionsForAggregates(
+def createOperatorFunctionsForAggregates(
   namedAggregates: Seq[CalcitePair[AggregateCall, String]],
   inputType: RelDataType,
   outputType: RelDataType,
   groupings: Array[Int])
 : (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = {
 
-val aggregateFunctionsAndFieldIndexes =
-  transformToAggregateFunctions(namedAggregates.map(_.getKey), 
inputType, groupings.length)
-// store the aggregate fields of each aggregate function, by the same 
order of aggregates.
-val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
-val aggregates = aggregateFunctionsAndFieldIndexes._2
+   val (aggFieldIndexes, aggregates)  =
+   transformToAggregateFunctions(namedAggregates.map(_.getKey),
+ inputType, groupings.length)
 
-val mapReturnType: RowTypeInfo =
-  createAggregateBufferDataType(groupings, aggregates, inputType)
+createOperatorFunctionsForAggregates(namedAggregates,
+  inputType,
+  outputType,
+  groupings,
+  aggregates,aggFieldIndexes)
+}
 
-val mapFunction = new AggregateMapFunction[Row, Row](
-aggregates, aggFieldIndexes, groupings,
-
mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
+def createOperatorFunctionsForAggregates(
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int],
+aggregates:Array[Aggregate[_ <: Any]],
+aggFieldIndexes:Array[Int])
+: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row])= {
+
+  val mapFunction = createAggregateMapFunction(aggregates,
+aggFieldIndexes, groupings, inputType)
+
+  // the mapping relation between field index of intermediate 
aggregate Row and output Row.
+  val groupingOffsetMapping = getGroupKeysMapping(inputType, 
outputType, groupings)
+
+  // the mapping relation between aggregate function index in list and 
its corresponding
+  // field index in output Row.
+  val aggOffsetMapping = getAggregateMapping(namedAggregates, 
outputType)
+
+  if (groupingOffsetMapping.length != groupings.length ||
+aggOffsetMapping.length != namedAggregates.length) {
+throw new TableException("Could not find output field in input 
data type " +
+  "or aggregate functions.")
+  }
+
+  val allPartialAggregate = aggregates.map(_.supportPartial).forall(x 
=> x)
+
+  val intermediateRowArity = groupings.length +
+aggregates.map(_.intermediateDataType.length).sum
+
+  val reduceGroupFunction =
+if (allPartialAggregate) {
+  new AggregateReduceCombineFunction(
+aggregates,
+groupingOffsetMapping,
+aggOffsetMapping,
+intermediateRowArity,
+outputType.getFieldCount)
+}
+else {
+  new AggregateReduceGroupFunction(
+aggregates,
+groupingOffsetMapping,
+aggOffsetMapping,
+intermediateRowArity,
+outputType.getFieldCount)
+}
+
+  (mapFunction, reduceGroupFunction)
+  }
+
+  /**
+* Create Flink operator functions for Incremental aggregates.
+* It includes 2 implementations of Flink operator functions:
+* [[org.apache.flink.api.common.functions.MapFunction]] and
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+* The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the
+* intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
+* format:
+*
+* {{{
+*   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+* |  |
+* v  v
+*+-+-+++++
+*|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
+*+-+-+++++
+*

[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688485#comment-15688485
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89242998
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -61,25 +61,108 @@ object AggregateUtil {
* }}}
*
*/
-  def createOperatorFunctionsForAggregates(
+def createOperatorFunctionsForAggregates(
   namedAggregates: Seq[CalcitePair[AggregateCall, String]],
   inputType: RelDataType,
   outputType: RelDataType,
   groupings: Array[Int])
 : (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = {
 
-val aggregateFunctionsAndFieldIndexes =
-  transformToAggregateFunctions(namedAggregates.map(_.getKey), 
inputType, groupings.length)
-// store the aggregate fields of each aggregate function, by the same 
order of aggregates.
-val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
-val aggregates = aggregateFunctionsAndFieldIndexes._2
+   val (aggFieldIndexes, aggregates)  =
+   transformToAggregateFunctions(namedAggregates.map(_.getKey),
+ inputType, groupings.length)
 
-val mapReturnType: RowTypeInfo =
-  createAggregateBufferDataType(groupings, aggregates, inputType)
+createOperatorFunctionsForAggregates(namedAggregates,
+  inputType,
+  outputType,
+  groupings,
+  aggregates,aggFieldIndexes)
+}
 
-val mapFunction = new AggregateMapFunction[Row, Row](
-aggregates, aggFieldIndexes, groupings,
-
mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
+def createOperatorFunctionsForAggregates(
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int],
+aggregates:Array[Aggregate[_ <: Any]],
+aggFieldIndexes:Array[Int])
+: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row])= {
+
+  val mapFunction = createAggregateMapFunction(aggregates,
+aggFieldIndexes, groupings, inputType)
+
+  // the mapping relation between field index of intermediate 
aggregate Row and output Row.
+  val groupingOffsetMapping = getGroupKeysMapping(inputType, 
outputType, groupings)
+
+  // the mapping relation between aggregate function index in list and 
its corresponding
+  // field index in output Row.
+  val aggOffsetMapping = getAggregateMapping(namedAggregates, 
outputType)
+
+  if (groupingOffsetMapping.length != groupings.length ||
+aggOffsetMapping.length != namedAggregates.length) {
+throw new TableException("Could not find output field in input 
data type " +
+  "or aggregate functions.")
+  }
+
+  val allPartialAggregate = aggregates.map(_.supportPartial).forall(x 
=> x)
+
+  val intermediateRowArity = groupings.length +
+aggregates.map(_.intermediateDataType.length).sum
+
+  val reduceGroupFunction =
+if (allPartialAggregate) {
+  new AggregateReduceCombineFunction(
+aggregates,
+groupingOffsetMapping,
+aggOffsetMapping,
+intermediateRowArity,
+outputType.getFieldCount)
+}
+else {
+  new AggregateReduceGroupFunction(
+aggregates,
+groupingOffsetMapping,
+aggOffsetMapping,
+intermediateRowArity,
+outputType.getFieldCount)
+}
+
+  (mapFunction, reduceGroupFunction)
+  }
+
+  /**
+* Create Flink operator functions for Incremental aggregates.
+* It includes 2 implementations of Flink operator functions:
+* [[org.apache.flink.api.common.functions.MapFunction]] and
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+* The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the
+* intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
+* format:
+*
+* {{{
+*   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+* |  |
+* v  v
+*

[jira] [Commented] (FLINK-4669) scala api createLocalEnvironment() function add default Configuration parameter

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688370#comment-15688370
 ] 

ASF GitHub Bot commented on FLINK-4669:
---

Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/2541
  
@StephanEwen  have any improvement needed?


> scala api createLocalEnvironment() function add default Configuration 
> parameter
> ---
>
> Key: FLINK-4669
> URL: https://issues.apache.org/jira/browse/FLINK-4669
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: shijinkui
>
> scala program can't direct use createLocalEnvironment with custom Configure 
> object.
> such as I want to start web server in local mode, I will do such as:
> ```
> // set up execution environment
> val conf = new Configuration
> conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
> conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 
> ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT)
> val env = new org.apache.flink.streaming.api.scala.StreamExecutionEnvironment(
>   
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment(2,
>  conf)
> )
> ```
> so we need createLocalEnvironment function have a config parameter 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2541: [FLINK-4669] scala api createLocalEnvironment() function ...

2016-11-22 Thread shijinkui
Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/2541
  
@StephanEwen  have any improvement needed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4910) Introduce safety net for closing file system streams

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688284#comment-15688284
 ] 

ASF GitHub Bot commented on FLINK-4910:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2691


> Introduce safety net for closing file system streams
> 
>
> Key: FLINK-4910
> URL: https://issues.apache.org/jira/browse/FLINK-4910
> Project: Flink
>  Issue Type: Improvement
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Streams that are opened through {{FileSystem}} must be closed at the end of 
> their life cycle. However, we found hints that some code forgets to close 
> such streams.
> We should introduce i) a mechanism that closes leaking unclosed streams after 
> usage and ii) provides logging that helps us to track down and fi the sources 
> of such leaks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5107) Job Manager goes out of memory from long history of prior execution attempts

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688285#comment-15688285
 ] 

ASF GitHub Bot commented on FLINK-5107:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2837


> Job Manager goes out of memory from long history of prior execution attempts
> 
>
> Key: FLINK-5107
> URL: https://issues.apache.org/jira/browse/FLINK-5107
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> We have observed that the job manager can run out of memory during long 
> running jobs with many vertexes. Analysis of the heap dump shows, that the 
> ever-growing history of prior execution attempts is the culprit for this 
> problem.
> We should limit this history to a number of n most recent attempts. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4910) Introduce safety net for closing file system streams

2016-11-22 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-4910.
--
Resolution: Fixed

Fixed via ba8ed263695d16eacb4bdfdf195dd22c83bb53ed

> Introduce safety net for closing file system streams
> 
>
> Key: FLINK-4910
> URL: https://issues.apache.org/jira/browse/FLINK-4910
> Project: Flink
>  Issue Type: Improvement
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Streams that are opened through {{FileSystem}} must be closed at the end of 
> their life cycle. However, we found hints that some code forgets to close 
> such streams.
> We should introduce i) a mechanism that closes leaking unclosed streams after 
> usage and ii) provides logging that helps us to track down and fi the sources 
> of such leaks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2837: [FLINK-5107] Introduced limit for prior execution ...

2016-11-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2837


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2691: [FLINK-4910] Introduce safety net for closing file...

2016-11-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2691


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-5107) Job Manager goes out of memory from long history of prior execution attempts

2016-11-22 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-5107.
--
Resolution: Fixed

Fixed via f5af7f1025aac9cac3f83de5f0e3aece5730ec0f

> Job Manager goes out of memory from long history of prior execution attempts
> 
>
> Key: FLINK-5107
> URL: https://issues.apache.org/jira/browse/FLINK-5107
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> We have observed that the job manager can run out of memory during long 
> running jobs with many vertexes. Analysis of the heap dump shows, that the 
> ever-growing history of prior execution attempts is the culprit for this 
> problem.
> We should limit this history to a number of n most recent attempts. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2837: [FLINK-5107] Introduced limit for prior execution attempt...

2016-11-22 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2837
  
Thanks for the contribution @StefanRRichter. Failing tests are unrelated. 
Merging this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4910) Introduce safety net for closing file system streams

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688084#comment-15688084
 ] 

ASF GitHub Bot commented on FLINK-4910:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2691
  
Thanks for the great contribution @StefanRRichter. Merging this PR.


> Introduce safety net for closing file system streams
> 
>
> Key: FLINK-4910
> URL: https://issues.apache.org/jira/browse/FLINK-4910
> Project: Flink
>  Issue Type: Improvement
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Streams that are opened through {{FileSystem}} must be closed at the end of 
> their life cycle. However, we found hints that some code forgets to close 
> such streams.
> We should introduce i) a mechanism that closes leaking unclosed streams after 
> usage and ii) provides logging that helps us to track down and fi the sources 
> of such leaks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5107) Job Manager goes out of memory from long history of prior execution attempts

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688088#comment-15688088
 ] 

ASF GitHub Bot commented on FLINK-5107:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2837
  
Thanks for the contribution @StefanRRichter. Failing tests are unrelated. 
Merging this PR.


> Job Manager goes out of memory from long history of prior execution attempts
> 
>
> Key: FLINK-5107
> URL: https://issues.apache.org/jira/browse/FLINK-5107
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> We have observed that the job manager can run out of memory during long 
> running jobs with many vertexes. Analysis of the heap dump shows, that the 
> ever-growing history of prior execution attempts is the culprit for this 
> problem.
> We should limit this history to a number of n most recent attempts. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2691: [FLINK-4910] Introduce safety net for closing file system...

2016-11-22 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2691
  
Thanks for the great contribution @StefanRRichter. Merging this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-5082) Pull ExecutionService lifecycle management out of the JobManager

2016-11-22 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-5082.

Resolution: Fixed

Fixed in 1.2 via ae4b274a9919d01a236df4e819a0a07c5d8543ac
Fixed in 1.1.4 via 7fb71c5bf1aab85250bf29bd0ea0654079cea48f

> Pull ExecutionService lifecycle management out of the JobManager
> 
>
> Key: FLINK-5082
> URL: https://issues.apache.org/jira/browse/FLINK-5082
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0, 1.1.4
>
>
> The {{JobManager}} receives an {{ExecutorService}} to run its futures as a 
> constructor parameter. Even though the {{ExecutorService}} comes from 
> outside, the {{JobManager}} shuts the executor service down if the 
> {{JobManager}} terminates. This is clearly a sub-optimal behaviour leading 
> also to {{RejectedExecutionExceptions}}.
> I propose to move the {{ExecutorService}} lifecycle management out of the 
> {{JobManager}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688077#comment-15688077
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
Ok, so I am progressing this a bit independently from the termination stuff 
and then we rebase to the first PR that is merged. I just changed everything 
and rebased to the current master. 

Some notable changes:
- The `StreamIterationCheckpointingITCase` is not made deterministic, it 
fails after the first successful checkpoint once and the jobs stops after 
everything has been recovered appropriately.
- I am now using ListState which is supposed to work like a charm with the 
rocksdb file backend. Note that with the default in-memory backend there is a 
high chance to get issues given the low memory capacity that it is given by 
default.
- One tricky part that can be potentially done better is the way I set the 
logger in the StreamIterationHead (had to change the head op field access to 
`protected` in the OperatorChain)

Whenever you find time go ahead and check it out. It passes my super-strict 
test which is a good thing. :)


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...

2016-11-22 Thread senorcarbone
Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
Ok, so I am progressing this a bit independently from the termination stuff 
and then we rebase to the first PR that is merged. I just changed everything 
and rebased to the current master. 

Some notable changes:
- The `StreamIterationCheckpointingITCase` is not made deterministic, it 
fails after the first successful checkpoint once and the jobs stops after 
everything has been recovered appropriately.
- I am now using ListState which is supposed to work like a charm with the 
rocksdb file backend. Note that with the default in-memory backend there is a 
high chance to get issues given the low memory capacity that it is given by 
default.
- One tricky part that can be potentially done better is the way I set the 
logger in the StreamIterationHead (had to change the head op field access to 
`protected` in the OperatorChain)

Whenever you find time go ahead and check it out. It passes my super-strict 
test which is a good thing. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2820: [FLINK-5082] Pull ExecutorService lifecycle manage...

2016-11-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2820


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2815: [FLINK-5073] Use Executor to run ZooKeeper callbac...

2016-11-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2815


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-5085) Execute CheckpointCoodinator's state discard calls asynchronously

2016-11-22 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-5085.

Resolution: Fixed

Fixed in 1.2 via c590912c93a4059b40452dfa6cffbdd4d58cac13
Fixed in 1.1.4 via cf4b221270cff3541bea318f907f9d8207b2fa4d 

> Execute CheckpointCoodinator's state discard calls asynchronously
> -
>
> Key: FLINK-5085
> URL: https://issues.apache.org/jira/browse/FLINK-5085
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0, 1.1.4
>
>
> The {{CheckpointCoordinator}} discards under certain circumstances pending 
> checkpoints or state handles. These discard operations can involve a blocking 
> IO operation if the underlying state handle refers to a file which has to be 
> deleted. In order to not block the calling thread, we should execute these 
> calls in a dedicated IO executor.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2825: [FLINK-5085] Execute CheckpointCoordinator's state...

2016-11-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2825


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-5073) ZooKeeperCompleteCheckpointStore executes blocking delete operation in ZooKeeper client thread

2016-11-22 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-5073.

Resolution: Fixed

Fixed in 1.2 via 3fb92d8687f03c1fac8b87396b2b5a7ff29f6dd6
Fixed in 1.1.4 via f2e4c193e1fb6b0cf26861bc01c2f3d6bcd4d8f6

> ZooKeeperCompleteCheckpointStore executes blocking delete operation in 
> ZooKeeper client thread
> --
>
> Key: FLINK-5073
> URL: https://issues.apache.org/jira/browse/FLINK-5073
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0, 1.1.4
>
>
> When deleting completed checkpoints from the 
> {{ZooKeeperCompletedCheckpointStore}}, one first tries to delete the meta 
> state handle from ZooKeeper and then deletes the actual checkpoint in a 
> callback from the delete operation. This callback is executed by the 
> ZooKeeper client's main thread which is problematic, because it blocks the 
> ZooKeeper client. If a delete operation takes longer than it takes to 
> complete a checkpoint, then it might even happen that delete operations of 
> outdated checkpoints are piling up because they are effectively executed 
> sequentially.
> I propose to execute the delete operations by a dedicated {{Executor}} so 
> that we keep the client's main thread free to do ZooKeeper related work.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5082) Pull ExecutionService lifecycle management out of the JobManager

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688071#comment-15688071
 ] 

ASF GitHub Bot commented on FLINK-5082:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2820


> Pull ExecutionService lifecycle management out of the JobManager
> 
>
> Key: FLINK-5082
> URL: https://issues.apache.org/jira/browse/FLINK-5082
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0, 1.1.4
>
>
> The {{JobManager}} receives an {{ExecutorService}} to run its futures as a 
> constructor parameter. Even though the {{ExecutorService}} comes from 
> outside, the {{JobManager}} shuts the executor service down if the 
> {{JobManager}} terminates. This is clearly a sub-optimal behaviour leading 
> also to {{RejectedExecutionExceptions}}.
> I propose to move the {{ExecutorService}} lifecycle management out of the 
> {{JobManager}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5073) ZooKeeperCompleteCheckpointStore executes blocking delete operation in ZooKeeper client thread

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688072#comment-15688072
 ] 

ASF GitHub Bot commented on FLINK-5073:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2815


> ZooKeeperCompleteCheckpointStore executes blocking delete operation in 
> ZooKeeper client thread
> --
>
> Key: FLINK-5073
> URL: https://issues.apache.org/jira/browse/FLINK-5073
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0, 1.1.4
>
>
> When deleting completed checkpoints from the 
> {{ZooKeeperCompletedCheckpointStore}}, one first tries to delete the meta 
> state handle from ZooKeeper and then deletes the actual checkpoint in a 
> callback from the delete operation. This callback is executed by the 
> ZooKeeper client's main thread which is problematic, because it blocks the 
> ZooKeeper client. If a delete operation takes longer than it takes to 
> complete a checkpoint, then it might even happen that delete operations of 
> outdated checkpoints are piling up because they are effectively executed 
> sequentially.
> I propose to execute the delete operations by a dedicated {{Executor}} so 
> that we keep the client's main thread free to do ZooKeeper related work.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5085) Execute CheckpointCoodinator's state discard calls asynchronously

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688042#comment-15688042
 ] 

ASF GitHub Bot commented on FLINK-5085:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2825
  
Build passed locally 
https://travis-ci.org/tillrohrmann/flink/builds/178011045. Merging this PR.


> Execute CheckpointCoodinator's state discard calls asynchronously
> -
>
> Key: FLINK-5085
> URL: https://issues.apache.org/jira/browse/FLINK-5085
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0, 1.1.4
>
>
> The {{CheckpointCoordinator}} discards under certain circumstances pending 
> checkpoints or state handles. These discard operations can involve a blocking 
> IO operation if the underlying state handle refers to a file which has to be 
> deleted. In order to not block the calling thread, we should execute these 
> calls in a dedicated IO executor.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2825: [FLINK-5085] Execute CheckpointCoordinator's state discar...

2016-11-22 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2825
  
Build passed locally 
https://travis-ci.org/tillrohrmann/flink/builds/178011045. Merging this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (FLINK-4848) keystoreFilePath should be checked against null in SSLUtils#createSSLServerContext

2016-11-22 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582930#comment-15582930
 ] 

Ted Yu edited comment on FLINK-4848 at 11/22/16 9:32 PM:
-

There is similar issue with trustStoreFilePath:

{code}
trustStoreFile = new FileInputStream(new File(trustStoreFilePath));
{code}


was (Author: yuzhih...@gmail.com):
There is similar issue with trustStoreFilePath:
{code}
trustStoreFile = new FileInputStream(new File(trustStoreFilePath));
{code}

> keystoreFilePath should be checked against null in 
> SSLUtils#createSSLServerContext
> --
>
> Key: FLINK-4848
> URL: https://issues.apache.org/jira/browse/FLINK-4848
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   String keystoreFilePath = sslConfig.getString(
> ConfigConstants.SECURITY_SSL_KEYSTORE,
> null);
> ...
>   try {
> keyStoreFile = new FileInputStream(new File(keystoreFilePath));
> {code}
> If keystoreFilePath is null, the File ctor would throw NPE.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4999) Improve accuracy of SingleInputGate#getNumberOfQueuedBuffers

2016-11-22 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu resolved FLINK-4999.
---
Resolution: Won't Fix

> Improve accuracy of SingleInputGate#getNumberOfQueuedBuffers
> 
>
> Key: FLINK-4999
> URL: https://issues.apache.org/jira/browse/FLINK-4999
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> // re-try 3 times, if fails, return 0 for "unknown"
> for (int retry = 0; retry < 3; retry++) {
> ...
>   catch (Exception ignored) {}
> {code}
> There is no synchronization around accessing inputChannels currently. 
> Therefore the method expects potential exception.
> Upon the 3rd try, synchronization should be taken w.r.t. inputChannels so 
> that the return value is accurate.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5050) JSON.org license is CatX

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15687938#comment-15687938
 ] 

ASF GitHub Bot commented on FLINK-5050:
---

Github user tedyu commented on the issue:

https://github.com/apache/flink/pull/2824
  
lgtm


> JSON.org license is CatX
> 
>
> Key: FLINK-5050
> URL: https://issues.apache.org/jira/browse/FLINK-5050
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Sergey Sokur
>
> We should exclude org.json:json dependency from hive-exec dependency.
> {code}
> [INFO] +- org.apache.flink:flink-java:jar:1.2-SNAPSHOT:provided
> ...
> [INFO] +- org.apache.hive.hcatalog:hcatalog-core:jar:0.12.0:compile
> ...
> [INFO] |  +- org.apache.hive:hive-exec:jar:0.12.0:compile
> [INFO] |  |  +- com.google.protobuf:protobuf-java:jar:2.4.1:compile
> [INFO] |  |  +- org.iq80.snappy:snappy:jar:0.2:compile
> [INFO] |  |  +- org.json:json:jar:20090211:compile
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2824: [FLINK-5050] JSON.org license is CatX

2016-11-22 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/flink/pull/2824
  
lgtm


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5097) The TypeExtractor is missing input type information in some Graph methods

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15687902#comment-15687902
 ] 

ASF GitHub Bot commented on FLINK-5097:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2842
  
@vasia should `Graph` be using the function specific 
`TypeExtractor.getMapReturnTypes` (not sure why this is plural) and similar 
where core Functions are used?

For `Gelly` functions, I think you are right that this capability is 
lacking, i.e. `NeighborsFunctionWithVertexValue` has three input types and 
`TypeExtractor` only provides `getUnaryOperatorReturnType` and 
`getBinaryOperatorReturnType`? It also looks like `TypeExtractor` expects a 
specific though natural ordering (input types before output types).

@twalthr what do you recommend?


> The TypeExtractor is missing input type information in some Graph methods
> -
>
> Key: FLINK-5097
> URL: https://issues.apache.org/jira/browse/FLINK-5097
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Reporter: Vasia Kalavri
>Assignee: Vasia Kalavri
>
> The TypeExtractor is called without information about the input type in 
> {{mapVertices}} and {{mapEdges}} although this information can be easily 
> retrieved.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2842: [FLINK-5097][gelly] Add missing input type information to...

2016-11-22 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2842
  
@vasia should `Graph` be using the function specific 
`TypeExtractor.getMapReturnTypes` (not sure why this is plural) and similar 
where core Functions are used?

For `Gelly` functions, I think you are right that this capability is 
lacking, i.e. `NeighborsFunctionWithVertexValue` has three input types and 
`TypeExtractor` only provides `getUnaryOperatorReturnType` and 
`getBinaryOperatorReturnType`? It also looks like `TypeExtractor` expects a 
specific though natural ordering (input types before output types).

@twalthr what do you recommend?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2851: [FLINK-5124] [table] Support more temporal arithme...

2016-11-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2851#discussion_r89192379
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
 ---
@@ -656,38 +656,56 @@ object ScalarOperators {
   right: GeneratedExpression)
 : GeneratedExpression = {
 
-val operator = if (plus) "+" else "-"
+val op = if (plus) "+" else "-"
 
 (left.resultType, right.resultType) match {
   case (l: TimeIntervalTypeInfo[_], r: TimeIntervalTypeInfo[_]) if l 
== r =>
-generateArithmeticOperator(operator, nullCheck, l, left, right)
+generateArithmeticOperator(op, nullCheck, l, left, right)
 
   case (SqlTimeTypeInfo.DATE, TimeIntervalTypeInfo.INTERVAL_MILLIS) |
(TimeIntervalTypeInfo.INTERVAL_MILLIS, SqlTimeTypeInfo.DATE) =>
 generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.DATE, left, 
right) {
   if (isTimePoint(left.resultType)) {
-(leftTerm, rightTerm) => s"$leftTerm + ((int) ($rightTerm / 
${MILLIS_PER_DAY}L))"
+(leftTerm, rightTerm) =>
+  s"$leftTerm $op ((int) ($rightTerm / ${MILLIS_PER_DAY}L))"
   } else {
-(leftTerm, rightTerm) => s"((int) ($leftTerm / 
${MILLIS_PER_DAY}L)) + $rightTerm"
+(leftTerm, rightTerm) =>
+  s"((int) ($leftTerm / ${MILLIS_PER_DAY}L)) $op $rightTerm"
+  }
+}
+
+  case (SqlTimeTypeInfo.DATE, TimeIntervalTypeInfo.INTERVAL_MONTHS) |
+   (TimeIntervalTypeInfo.INTERVAL_MONTHS, SqlTimeTypeInfo.DATE) =>
+generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.DATE, left, 
right) {
+  if (isTimePoint(left.resultType)) {
+(leftTerm, rightTerm) =>
+  
s"${qualifyMethod(BuiltInMethod.ADD_MONTHS.method)}($leftTerm, $op($rightTerm))"
+  } else {
+(leftTerm, rightTerm) =>
+  
s"${qualifyMethod(BuiltInMethod.ADD_MONTHS.method)}($rightTerm, $op($leftTerm))"
   }
 }
 
   case (SqlTimeTypeInfo.TIME, TimeIntervalTypeInfo.INTERVAL_MILLIS) |
(TimeIntervalTypeInfo.INTERVAL_MILLIS, SqlTimeTypeInfo.TIME) =>
 generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.TIME, left, 
right) {
   if (isTimePoint(left.resultType)) {
-(leftTerm, rightTerm) => s"$leftTerm + ((int) ($rightTerm))"
+(leftTerm, rightTerm) => s"$leftTerm $op ((int) ($rightTerm))"
   } else {
-(leftTerm, rightTerm) => s"((int) ($leftTerm)) + $rightTerm"
+(leftTerm, rightTerm) => s"((int) ($leftTerm)) $op $rightTerm"
   }
 }
 
   case (SqlTimeTypeInfo.TIMESTAMP, 
TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
--- End diff --

Don't we need the inverse case `(TimeIntervalTypeInfo.INTERVAL_MILLIS, 
SqlTimeTypeInfo.TIMESTAMP)` as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5124) Support more temporal arithmetic

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15687674#comment-15687674
 ] 

ASF GitHub Bot commented on FLINK-5124:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2851#discussion_r89192110
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
 ---
@@ -656,38 +656,56 @@ object ScalarOperators {
   right: GeneratedExpression)
 : GeneratedExpression = {
 
-val operator = if (plus) "+" else "-"
+val op = if (plus) "+" else "-"
 
 (left.resultType, right.resultType) match {
   case (l: TimeIntervalTypeInfo[_], r: TimeIntervalTypeInfo[_]) if l 
== r =>
-generateArithmeticOperator(operator, nullCheck, l, left, right)
+generateArithmeticOperator(op, nullCheck, l, left, right)
 
   case (SqlTimeTypeInfo.DATE, TimeIntervalTypeInfo.INTERVAL_MILLIS) |
(TimeIntervalTypeInfo.INTERVAL_MILLIS, SqlTimeTypeInfo.DATE) =>
 generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.DATE, left, 
right) {
   if (isTimePoint(left.resultType)) {
--- End diff --

Wouldn't it be easier to read if this case would be handled by the outer 
pattern matching with separate cases for `(SqlTimeTypeInfo.DATE, 
TimeIntervalTypeInfo.INTERVAL_MILLIS)` and 
`(TimeIntervalTypeInfo.INTERVAL_MILLIS, SqlTimeTypeInfo.DATE)`?


> Support more temporal arithmetic
> 
>
> Key: FLINK-5124
> URL: https://issues.apache.org/jira/browse/FLINK-5124
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Multiple TPC-H queries fail because of missing temporal arithmetic support. 
> Since CALCITE-308 has been fixed we can add additional operations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5124) Support more temporal arithmetic

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15687675#comment-15687675
 ] 

ASF GitHub Bot commented on FLINK-5124:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2851#discussion_r89192420
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
 ---
@@ -656,38 +656,56 @@ object ScalarOperators {
   right: GeneratedExpression)
 : GeneratedExpression = {
 
-val operator = if (plus) "+" else "-"
+val op = if (plus) "+" else "-"
 
 (left.resultType, right.resultType) match {
   case (l: TimeIntervalTypeInfo[_], r: TimeIntervalTypeInfo[_]) if l 
== r =>
-generateArithmeticOperator(operator, nullCheck, l, left, right)
+generateArithmeticOperator(op, nullCheck, l, left, right)
 
   case (SqlTimeTypeInfo.DATE, TimeIntervalTypeInfo.INTERVAL_MILLIS) |
(TimeIntervalTypeInfo.INTERVAL_MILLIS, SqlTimeTypeInfo.DATE) =>
 generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.DATE, left, 
right) {
   if (isTimePoint(left.resultType)) {
-(leftTerm, rightTerm) => s"$leftTerm + ((int) ($rightTerm / 
${MILLIS_PER_DAY}L))"
+(leftTerm, rightTerm) =>
+  s"$leftTerm $op ((int) ($rightTerm / ${MILLIS_PER_DAY}L))"
   } else {
-(leftTerm, rightTerm) => s"((int) ($leftTerm / 
${MILLIS_PER_DAY}L)) + $rightTerm"
+(leftTerm, rightTerm) =>
+  s"((int) ($leftTerm / ${MILLIS_PER_DAY}L)) $op $rightTerm"
+  }
+}
+
+  case (SqlTimeTypeInfo.DATE, TimeIntervalTypeInfo.INTERVAL_MONTHS) |
+   (TimeIntervalTypeInfo.INTERVAL_MONTHS, SqlTimeTypeInfo.DATE) =>
+generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.DATE, left, 
right) {
+  if (isTimePoint(left.resultType)) {
+(leftTerm, rightTerm) =>
+  
s"${qualifyMethod(BuiltInMethod.ADD_MONTHS.method)}($leftTerm, $op($rightTerm))"
+  } else {
+(leftTerm, rightTerm) =>
+  
s"${qualifyMethod(BuiltInMethod.ADD_MONTHS.method)}($rightTerm, $op($leftTerm))"
   }
 }
 
   case (SqlTimeTypeInfo.TIME, TimeIntervalTypeInfo.INTERVAL_MILLIS) |
(TimeIntervalTypeInfo.INTERVAL_MILLIS, SqlTimeTypeInfo.TIME) =>
 generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.TIME, left, 
right) {
   if (isTimePoint(left.resultType)) {
-(leftTerm, rightTerm) => s"$leftTerm + ((int) ($rightTerm))"
+(leftTerm, rightTerm) => s"$leftTerm $op ((int) ($rightTerm))"
   } else {
-(leftTerm, rightTerm) => s"((int) ($leftTerm)) + $rightTerm"
+(leftTerm, rightTerm) => s"((int) ($leftTerm)) $op $rightTerm"
   }
 }
 
   case (SqlTimeTypeInfo.TIMESTAMP, 
TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
 generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.TIMESTAMP, 
left, right) {
-  (leftTerm, rightTerm) => s"$leftTerm + $rightTerm"
+  (leftTerm, rightTerm) => s"$leftTerm $op $rightTerm"
 }
 
-  // TODO more operations when CALCITE-308 is fixed
+  case (SqlTimeTypeInfo.TIMESTAMP, 
TimeIntervalTypeInfo.INTERVAL_MONTHS) =>
--- End diff --

Do we need the inverse case?


> Support more temporal arithmetic
> 
>
> Key: FLINK-5124
> URL: https://issues.apache.org/jira/browse/FLINK-5124
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Multiple TPC-H queries fail because of missing temporal arithmetic support. 
> Since CALCITE-308 has been fixed we can add additional operations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2851: [FLINK-5124] [table] Support more temporal arithme...

2016-11-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2851#discussion_r89192110
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
 ---
@@ -656,38 +656,56 @@ object ScalarOperators {
   right: GeneratedExpression)
 : GeneratedExpression = {
 
-val operator = if (plus) "+" else "-"
+val op = if (plus) "+" else "-"
 
 (left.resultType, right.resultType) match {
   case (l: TimeIntervalTypeInfo[_], r: TimeIntervalTypeInfo[_]) if l 
== r =>
-generateArithmeticOperator(operator, nullCheck, l, left, right)
+generateArithmeticOperator(op, nullCheck, l, left, right)
 
   case (SqlTimeTypeInfo.DATE, TimeIntervalTypeInfo.INTERVAL_MILLIS) |
(TimeIntervalTypeInfo.INTERVAL_MILLIS, SqlTimeTypeInfo.DATE) =>
 generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.DATE, left, 
right) {
   if (isTimePoint(left.resultType)) {
--- End diff --

Wouldn't it be easier to read if this case would be handled by the outer 
pattern matching with separate cases for `(SqlTimeTypeInfo.DATE, 
TimeIntervalTypeInfo.INTERVAL_MILLIS)` and 
`(TimeIntervalTypeInfo.INTERVAL_MILLIS, SqlTimeTypeInfo.DATE)`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5124) Support more temporal arithmetic

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15687673#comment-15687673
 ] 

ASF GitHub Bot commented on FLINK-5124:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2851#discussion_r89192379
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
 ---
@@ -656,38 +656,56 @@ object ScalarOperators {
   right: GeneratedExpression)
 : GeneratedExpression = {
 
-val operator = if (plus) "+" else "-"
+val op = if (plus) "+" else "-"
 
 (left.resultType, right.resultType) match {
   case (l: TimeIntervalTypeInfo[_], r: TimeIntervalTypeInfo[_]) if l 
== r =>
-generateArithmeticOperator(operator, nullCheck, l, left, right)
+generateArithmeticOperator(op, nullCheck, l, left, right)
 
   case (SqlTimeTypeInfo.DATE, TimeIntervalTypeInfo.INTERVAL_MILLIS) |
(TimeIntervalTypeInfo.INTERVAL_MILLIS, SqlTimeTypeInfo.DATE) =>
 generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.DATE, left, 
right) {
   if (isTimePoint(left.resultType)) {
-(leftTerm, rightTerm) => s"$leftTerm + ((int) ($rightTerm / 
${MILLIS_PER_DAY}L))"
+(leftTerm, rightTerm) =>
+  s"$leftTerm $op ((int) ($rightTerm / ${MILLIS_PER_DAY}L))"
   } else {
-(leftTerm, rightTerm) => s"((int) ($leftTerm / 
${MILLIS_PER_DAY}L)) + $rightTerm"
+(leftTerm, rightTerm) =>
+  s"((int) ($leftTerm / ${MILLIS_PER_DAY}L)) $op $rightTerm"
+  }
+}
+
+  case (SqlTimeTypeInfo.DATE, TimeIntervalTypeInfo.INTERVAL_MONTHS) |
+   (TimeIntervalTypeInfo.INTERVAL_MONTHS, SqlTimeTypeInfo.DATE) =>
+generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.DATE, left, 
right) {
+  if (isTimePoint(left.resultType)) {
+(leftTerm, rightTerm) =>
+  
s"${qualifyMethod(BuiltInMethod.ADD_MONTHS.method)}($leftTerm, $op($rightTerm))"
+  } else {
+(leftTerm, rightTerm) =>
+  
s"${qualifyMethod(BuiltInMethod.ADD_MONTHS.method)}($rightTerm, $op($leftTerm))"
   }
 }
 
   case (SqlTimeTypeInfo.TIME, TimeIntervalTypeInfo.INTERVAL_MILLIS) |
(TimeIntervalTypeInfo.INTERVAL_MILLIS, SqlTimeTypeInfo.TIME) =>
 generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.TIME, left, 
right) {
   if (isTimePoint(left.resultType)) {
-(leftTerm, rightTerm) => s"$leftTerm + ((int) ($rightTerm))"
+(leftTerm, rightTerm) => s"$leftTerm $op ((int) ($rightTerm))"
   } else {
-(leftTerm, rightTerm) => s"((int) ($leftTerm)) + $rightTerm"
+(leftTerm, rightTerm) => s"((int) ($leftTerm)) $op $rightTerm"
   }
 }
 
   case (SqlTimeTypeInfo.TIMESTAMP, 
TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
--- End diff --

Don't we need the inverse case `(TimeIntervalTypeInfo.INTERVAL_MILLIS, 
SqlTimeTypeInfo.TIMESTAMP)` as well?


> Support more temporal arithmetic
> 
>
> Key: FLINK-5124
> URL: https://issues.apache.org/jira/browse/FLINK-5124
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Multiple TPC-H queries fail because of missing temporal arithmetic support. 
> Since CALCITE-308 has been fixed we can add additional operations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4895) Drop support for Hadoop 1

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15687642#comment-15687642
 ] 

ASF GitHub Bot commented on FLINK-4895:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2850
  
I haven't tried it yet.


> Drop support for Hadoop 1
> -
>
> Key: FLINK-4895
> URL: https://issues.apache.org/jira/browse/FLINK-4895
> Project: Flink
>  Issue Type: Task
>  Components: Build System
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>
> As per this mailing list discussion: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DISCUSS-Drop-Hadoop-1-support-with-Flink-1-2-td9530.html
>  the community agreed to drop support for Hadoop 1.
> The task includes
> - removing the hadoop-1 / hadoop-2 build profiles, 
> - removing the scripts for generating hadoop-x poms
> - updating the release script
> - updating the nightly build script
> - updating the travis configuration file
> - updating the documentation
> - updating the website



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2850: [FLINK-4895] Drop Hadoop1 support

2016-11-22 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2850
  
I haven't tried it yet.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2664: [FLINK-4861] [build] Package optional project artifacts

2016-11-22 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2664
  
Thank you for fixing the issue so quickly.
I'm wondering whether the current approach is a good idea, because it 
requires manual checking of all transitive dependencies. We have something 
similar (=manual as well) in the quickstart, where we exclude some artifact, 
and it doesn't really happen that people update that list of excludes.
So I fear that if somebody changes a connector for example, it will not be 
added here.
I guess adding an assembly descriptor for each connector / module would 
solve the problem with the transitive dependencies. I don't know if there's a 
more efficient approach.

What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4861) Package optional project artifacts

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15687629#comment-15687629
 ] 

ASF GitHub Bot commented on FLINK-4861:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2664
  
Thank you for fixing the issue so quickly.
I'm wondering whether the current approach is a good idea, because it 
requires manual checking of all transitive dependencies. We have something 
similar (=manual as well) in the quickstart, where we exclude some artifact, 
and it doesn't really happen that people update that list of excludes.
So I fear that if somebody changes a connector for example, it will not be 
added here.
I guess adding an assembly descriptor for each connector / module would 
solve the problem with the transitive dependencies. I don't know if there's a 
more efficient approach.

What do you think?


> Package optional project artifacts
> --
>
> Key: FLINK-4861
> URL: https://issues.apache.org/jira/browse/FLINK-4861
> Project: Flink
>  Issue Type: New Feature
>  Components: Build System
>Affects Versions: 1.2.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.2.0
>
>
> Per the mailing list 
> [discussion|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Additional-project-downloads-td13223.html],
>  package the Flink libraries and connectors into subdirectories of a new 
> {{opt}} directory in the release/snapshot tarballs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2852: Fix misprint in condition

2016-11-22 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2852
  
Thanks for the fix @BorisOsipov! 
+1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4861) Package optional project artifacts

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15687577#comment-15687577
 ] 

ASF GitHub Bot commented on FLINK-4861:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2664
  
@rmetzger it appears that project artifacts are not included as transitive 
dependencies and I had overlooked 0.9 as a dependency for the 0.10 connector. 
After correcting this the provided hierarchy is as follows:

```
opt/connectors/streaming/kafka-0.10

opt/connectors/streaming/kafka-0.10/flink-connector-kafka-0.10_2.10-1.2-SNAPSHOT.jar

opt/connectors/streaming/kafka-0.10/flink-connector-kafka-0.9_2.10-1.2-SNAPSHOT.jar

opt/connectors/streaming/kafka-0.10/flink-connector-kafka-base_2.10-1.2-SNAPSHOT.jar
opt/connectors/streaming/kafka-0.10/kafka-clients-0.9.0.1.jar
opt/connectors/streaming/kafka-0.10/lz4-1.2.0.jar
opt/connectors/streaming/kafka-0.8

opt/connectors/streaming/kafka-0.8/flink-connector-kafka-0.8_2.10-1.2-SNAPSHOT.jar

opt/connectors/streaming/kafka-0.8/flink-connector-kafka-base_2.10-1.2-SNAPSHOT.jar
opt/connectors/streaming/kafka-0.8/kafka_2.10-0.8.2.2.jar
opt/connectors/streaming/kafka-0.8/scala-library-2.10.4.jar
opt/connectors/streaming/kafka-0.8/zkclient-0.3.jar
opt/connectors/streaming/kafka-0.9

opt/connectors/streaming/kafka-0.9/flink-connector-kafka-0.9_2.10-1.2-SNAPSHOT.jar

opt/connectors/streaming/kafka-0.9/flink-connector-kafka-base_2.10-1.2-SNAPSHOT.jar
opt/connectors/streaming/kafka-0.9/kafka-clients-0.9.0.1.jar
opt/connectors/streaming/kafka-0.9/lz4-1.2.0.jar
```


> Package optional project artifacts
> --
>
> Key: FLINK-4861
> URL: https://issues.apache.org/jira/browse/FLINK-4861
> Project: Flink
>  Issue Type: New Feature
>  Components: Build System
>Affects Versions: 1.2.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.2.0
>
>
> Per the mailing list 
> [discussion|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Additional-project-downloads-td13223.html],
>  package the Flink libraries and connectors into subdirectories of a new 
> {{opt}} directory in the release/snapshot tarballs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2664: [FLINK-4861] [build] Package optional project artifacts

2016-11-22 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2664
  
@rmetzger it appears that project artifacts are not included as transitive 
dependencies and I had overlooked 0.9 as a dependency for the 0.10 connector. 
After correcting this the provided hierarchy is as follows:

```
opt/connectors/streaming/kafka-0.10

opt/connectors/streaming/kafka-0.10/flink-connector-kafka-0.10_2.10-1.2-SNAPSHOT.jar

opt/connectors/streaming/kafka-0.10/flink-connector-kafka-0.9_2.10-1.2-SNAPSHOT.jar

opt/connectors/streaming/kafka-0.10/flink-connector-kafka-base_2.10-1.2-SNAPSHOT.jar
opt/connectors/streaming/kafka-0.10/kafka-clients-0.9.0.1.jar
opt/connectors/streaming/kafka-0.10/lz4-1.2.0.jar
opt/connectors/streaming/kafka-0.8

opt/connectors/streaming/kafka-0.8/flink-connector-kafka-0.8_2.10-1.2-SNAPSHOT.jar

opt/connectors/streaming/kafka-0.8/flink-connector-kafka-base_2.10-1.2-SNAPSHOT.jar
opt/connectors/streaming/kafka-0.8/kafka_2.10-0.8.2.2.jar
opt/connectors/streaming/kafka-0.8/scala-library-2.10.4.jar
opt/connectors/streaming/kafka-0.8/zkclient-0.3.jar
opt/connectors/streaming/kafka-0.9

opt/connectors/streaming/kafka-0.9/flink-connector-kafka-0.9_2.10-1.2-SNAPSHOT.jar

opt/connectors/streaming/kafka-0.9/flink-connector-kafka-base_2.10-1.2-SNAPSHOT.jar
opt/connectors/streaming/kafka-0.9/kafka-clients-0.9.0.1.jar
opt/connectors/streaming/kafka-0.9/lz4-1.2.0.jar
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   4   >