[jira] [Created] (FLINK-5140) NoResourceAvailableException is thrown while starting a new job
Biao Liu created FLINK-5140: --- Summary: NoResourceAvailableException is thrown while starting a new job Key: FLINK-5140 URL: https://issues.apache.org/jira/browse/FLINK-5140 Project: Flink Issue Type: Bug Components: Cluster Management Environment: FLIP-6 feature branch Reporter: Biao Liu Priority: Minor Here is the stack trace: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: not connected to ResourceManager and no slot available at org.apache.flink.runtime.instance.SlotPool.internalAllocateSlot(SlotPool.java:281) at org.apache.flink.runtime.instance.SlotPool.allocateSlot(SlotPool.java:256) ... Currently I have to set RestartStrategy to handle this exception. But in some test cases, I want to test failure about the cluster. It will make this much more complicated. After discussing with [~tiemsn], maybe we can fix this problem by executing executionGraph after registered to resource manager. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5139) consum kafka can set offset
jing lining created FLINK-5139: -- Summary: consum kafka can set offset Key: FLINK-5139 URL: https://issues.apache.org/jira/browse/FLINK-5139 Project: Flink Issue Type: Improvement Components: Kafka Connector Affects Versions: 1.1.3 Reporter: jing lining now get offset by this org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08#getInvalidOffsetBehavior class whether can put in offset which is number -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5138) The StateBackend provides the method to estimate memory usage based on hint of state size in ResourceSpec
[ https://issues.apache.org/jira/browse/FLINK-5138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5138: - Description: Users may specify the state size in *setResource* API, then the different *StateBackend* implementation should roughly estimate the different kinds of memory usages based on the state size. This part of estimate memory will be aggregated with other memories in *ResourceSpec*. There are two advantages to do this: - For RocksDB backend, the proper memory setting for RocksDB can get better performance in read and write. - This estimate memory will be considered when requesting resource for container, so the total memory usage will not exceed the container limit. > The StateBackend provides the method to estimate memory usage based on hint > of state size in ResourceSpec > - > > Key: FLINK-5138 > URL: https://issues.apache.org/jira/browse/FLINK-5138 > Project: Flink > Issue Type: Sub-task > Components: Core, DataSet API, DataStream API >Reporter: Zhijiang Wang > > Users may specify the state size in *setResource* API, then the different > *StateBackend* implementation should roughly estimate the different kinds of > memory usages based on the state size. This part of estimate memory will be > aggregated with other memories in *ResourceSpec*. There are two advantages to > do this: > - For RocksDB backend, the proper memory setting for RocksDB can get better > performance in read and write. > - This estimate memory will be considered when requesting resource for > container, so the total memory usage will not exceed the container limit. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5138) The StateBackend provides the method to estimate memory usage based on hint of state size in ResourceSpec
[ https://issues.apache.org/jira/browse/FLINK-5138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5138: - Component/s: Core > The StateBackend provides the method to estimate memory usage based on hint > of state size in ResourceSpec > - > > Key: FLINK-5138 > URL: https://issues.apache.org/jira/browse/FLINK-5138 > Project: Flink > Issue Type: Sub-task > Components: Core, DataSet API, DataStream API >Reporter: Zhijiang Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5138) The StateBackend provides the method to estimate memory usage based on hint of state size in ResourceSpec
Zhijiang Wang created FLINK-5138: Summary: The StateBackend provides the method to estimate memory usage based on hint of state size in ResourceSpec Key: FLINK-5138 URL: https://issues.apache.org/jira/browse/FLINK-5138 Project: Flink Issue Type: Sub-task Components: DataSet API, DataStream API Reporter: Zhijiang Wang -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5131) Fine-grained Resource Configuration
[ https://issues.apache.org/jira/browse/FLINK-5131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5131: - Description: Normally the UDF just creates short-life small objects and these can be recycled quickly by JVM, so most of the memory resource is controlled and managed by *TaskManager* framework. But for some special cases, the UDF may consume much resource to create long-live big objects, so it is necessary to provide the options for professional users to define the resource usages if needed. The basic approach is the following: - Introduce the *ResourceSpec* structure to describe the different resource factors (cpu cores, heap memory, direct memory, native memory, etc) and provide some basic construction methods for resource group. - The *ResourceSpec* can be setted onto the internal transformation in DataStream and base operator in DataSet separately. - The *StateBackend* should provide the method to estimate the memory usage based on hint of state size in *ResourceSpec*. - In job graph generation, the *ResourceSpec* will be aggregated for chained operators. - When *JobManager* requests slot for submitting task from *ResourceManager*, the *ResourceProfile* will be expanded to correspond with *ResourceSpec*. - The *ResourceManager* requests resource for container from cluster, it should consider extra framework memory except for slot *ResourceProfile*. - The framework memory is mainly used by *NetworkBufferPool* and *MemoryManager* in *TaskManager*, and it can be configured in job level. - Apart from resource, The JVM options attached with container should be supported and could also be configured in job level. This feature will be implemented directly into flip-6 branch. was: Normally the UDF just creates short-life small objects and these can be recycled quickly by JVM, so most of the memory resource is controlled and managed by *TaskManager* framework. But for some special cases, the UDF may consume much resource to create long-live big objects, so it is necessary to provide the options for professional users to define the resource usages if needed. The basic approach is the following: - Introduce the *ResourceSpec* structure to describe the different resource factors (cpu cores, heap memory, direct memory, native memory, etc) and provide some basic construction methods for resource group. - The *ResourceSpec* can be setted onto the internal transformation in DataStream and base operator in DataSet separately. - In job graph generation, the *ResourceSpec* will be aggregated for chained operators. - When *JobManager* requests slot for submitting task from *ResourceManager*, the *ResourceProfile* will be expanded to correspond with *ResourceSpec*. - The *ResourceManager* requests resource for container from cluster, it should consider extra framework memory except for slot *ResourceProfile*. - The framework memory is mainly used by *NetworkBufferPool* and *MemoryManager* in *TaskManager*, and it can be configured in job level. - Apart from resource, The JVM options attached with container should be supported and could also be configured in job level. This feature will be implemented directly into flip-6 branch. > Fine-grained Resource Configuration > --- > > Key: FLINK-5131 > URL: https://issues.apache.org/jira/browse/FLINK-5131 > Project: Flink > Issue Type: New Feature > Components: DataSet API, DataStream API, JobManager, ResourceManager >Reporter: Zhijiang Wang > > Normally the UDF just creates short-life small objects and these can be > recycled quickly by JVM, so most of the memory resource is controlled and > managed by *TaskManager* framework. But for some special cases, the UDF may > consume much resource to create long-live big objects, so it is necessary to > provide the options for professional users to define the resource usages if > needed. > The basic approach is the following: > - Introduce the *ResourceSpec* structure to describe the different resource > factors (cpu cores, heap memory, direct memory, native memory, etc) and > provide some basic construction methods for resource group. > - The *ResourceSpec* can be setted onto the internal transformation in > DataStream and base operator in DataSet separately. > - The *StateBackend* should provide the method to estimate the memory usage > based on hint of state size in *ResourceSpec*. > - In job graph generation, the *ResourceSpec* will be aggregated for > chained operators. > - When *JobManager* requests slot for submitting task from > *ResourceManager*, the *ResourceProfile* will be expanded to correspond with > *ResourceSpec*. > - The *ResourceManager* requests resource for container from cluster, it > should consider extra framework
[jira] [Assigned] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating job graph
[ https://issues.apache.org/jira/browse/FLINK-5134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang reassigned FLINK-5134: Assignee: Zhijiang Wang > Aggregate ResourceSpe for chained operators when generating job graph > - > > Key: FLINK-5134 > URL: https://issues.apache.org/jira/browse/FLINK-5134 > Project: Flink > Issue Type: Sub-task > Components: DataStream API >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > In *JobGraph* generation, each *JobVertex* corresponds to a series of chained > operators. > The resource of *JobVertex* should be aggregation of individual resource in > chained operators. > For memory resource in *JobVertex*, the aggregation is the sum formula for > chained operators. > And for cpu cores resource in *JobVertex*, the aggregation is the maximum > formula for chained operators. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5133) Add new setResource API for DataStream and DataSet
[ https://issues.apache.org/jira/browse/FLINK-5133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang reassigned FLINK-5133: Assignee: Zhijiang Wang > Add new setResource API for DataStream and DataSet > -- > > Key: FLINK-5133 > URL: https://issues.apache.org/jira/browse/FLINK-5133 > Project: Flink > Issue Type: Sub-task > Components: DataSet API, DataStream API >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > This is part of the fine-grained resource configuration. > For *DataStream*, the *setResource* API will be setted onto > *SingleOutputStreamOperator* similar with other existing properties like > parallelism, name, etc. > For *DataSet*, the *setResource* API will be setted onto *Operator* in the > similar way. > There are two parameters described with minimum *ResourceSpec* and maximum > *ResourceSpec* separately in the API for considering resource resize in > future improvements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5136) Aggregation of slot ResourceProfile and framework memory by ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-5136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang reassigned FLINK-5136: Assignee: Zhijiang Wang > Aggregation of slot ResourceProfile and framework memory by ResourceManager > --- > > Key: FLINK-5136 > URL: https://issues.apache.org/jira/browse/FLINK-5136 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > When *ResourceManager* requests container resource from cluster, the > framework memory should be considered together with slot resource. > Currently the framework memory is mainly used by *MemoryManager* and > *NetworkBufferPool* in *TaskManager*, and this memory can be got from > configuration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5135) ResourceProfile for slot request should be expanded to correspond with ResourceSpec
[ https://issues.apache.org/jira/browse/FLINK-5135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang reassigned FLINK-5135: Assignee: Zhijiang Wang > ResourceProfile for slot request should be expanded to correspond with > ResourceSpec > --- > > Key: FLINK-5135 > URL: https://issues.apache.org/jira/browse/FLINK-5135 > Project: Flink > Issue Type: Sub-task > Components: JobManager, ResourceManager >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > The *JobManager* requests slot by *ResourceProfile* from *ResourceManager* > before submitting tasks. Currently the *ResourceProfile* only contains cpu > cores and memory properties. The memory should be expanded to different types > such as heap memory, direct memory and native memory which corresponds with > memory in *ResourceSpec*. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5132) Introduce the ResourceSpec for grouping different resource factors in API
[ https://issues.apache.org/jira/browse/FLINK-5132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang reassigned FLINK-5132: Assignee: Zhijiang Wang > Introduce the ResourceSpec for grouping different resource factors in API > - > > Key: FLINK-5132 > URL: https://issues.apache.org/jira/browse/FLINK-5132 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > This is part of the fine-grained resource configuration. > The current resource factors include cpu cores, heap memory, direct memory, > native memory and state size. > The *ResourceSpec* will provide some basic constructions for grouping > different resource factors as needed and the construction can also be > expanded easily for further requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5137) Config JVM options and set onto the TaskManager container
[ https://issues.apache.org/jira/browse/FLINK-5137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang reassigned FLINK-5137: Assignee: Zhijiang Wang > Config JVM options and set onto the TaskManager container > - > > Key: FLINK-5137 > URL: https://issues.apache.org/jira/browse/FLINK-5137 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager, TaskManager >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > The users can config JVM options in job level. And the *ResourceManager* will > set related JVM options for launching *TaskManager* container based on > framework memory and slot resource profile. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5137) Config JVM options and set onto the TaskManager container
[ https://issues.apache.org/jira/browse/FLINK-5137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5137: - Description: The users can config JVM options in job level. And the *ResourceManager* will set related JVM options for launching *TaskManager* container based on framework memory and slot resource profile. > Config JVM options and set onto the TaskManager container > - > > Key: FLINK-5137 > URL: https://issues.apache.org/jira/browse/FLINK-5137 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager, TaskManager >Reporter: Zhijiang Wang > > The users can config JVM options in job level. And the *ResourceManager* will > set related JVM options for launching *TaskManager* container based on > framework memory and slot resource profile. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5137) Config JVM options and set onto the TaskManager container
Zhijiang Wang created FLINK-5137: Summary: Config JVM options and set onto the TaskManager container Key: FLINK-5137 URL: https://issues.apache.org/jira/browse/FLINK-5137 Project: Flink Issue Type: Sub-task Components: ResourceManager, TaskManager Reporter: Zhijiang Wang -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5136) Aggregation of slot ResourceProfile and framework memory by ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-5136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5136: - Description: When *ResourceManager* requests container resource from cluster, the framework memory should be considered together with slot resource. Currently the framework memory is mainly used by *MemoryManager* and *NetworkBufferPool* in *TaskManager*, and this memory can be got from configuration. > Aggregation of slot ResourceProfile and framework memory by ResourceManager > --- > > Key: FLINK-5136 > URL: https://issues.apache.org/jira/browse/FLINK-5136 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Zhijiang Wang > > When *ResourceManager* requests container resource from cluster, the > framework memory should be considered together with slot resource. > Currently the framework memory is mainly used by *MemoryManager* and > *NetworkBufferPool* in *TaskManager*, and this memory can be got from > configuration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5136) Aggregation of slot ResourceProfile and framework memory by ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-5136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5136: - Summary: Aggregation of slot ResourceProfile and framework memory by ResourceManager (was: Aggregation of slot ResourceProfile and framework memory for requesting resource from cluster by ResourceManager) > Aggregation of slot ResourceProfile and framework memory by ResourceManager > --- > > Key: FLINK-5136 > URL: https://issues.apache.org/jira/browse/FLINK-5136 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Zhijiang Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5136) Aggregation of slot ResourceProfile and framework memory for requesting resource from cluster by ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-5136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5136: - Summary: Aggregation of slot ResourceProfile and framework memory for requesting resource from cluster by ResourceManager (was: ResourceManager should consider both slot resource profile and framework memory for requesting resource from cluster) > Aggregation of slot ResourceProfile and framework memory for requesting > resource from cluster by ResourceManager > > > Key: FLINK-5136 > URL: https://issues.apache.org/jira/browse/FLINK-5136 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Zhijiang Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5136) ResourceManager should consider both slot resource profile and framework memory for requesting resource from cluster
Zhijiang Wang created FLINK-5136: Summary: ResourceManager should consider both slot resource profile and framework memory for requesting resource from cluster Key: FLINK-5136 URL: https://issues.apache.org/jira/browse/FLINK-5136 Project: Flink Issue Type: Sub-task Components: ResourceManager Reporter: Zhijiang Wang -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5135) ResourceProfile for slot request should be expanded to correspond with ResourceSpec
[ https://issues.apache.org/jira/browse/FLINK-5135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5135: - Description: The *JobManager* requests slot by *ResourceProfile* from *ResourceManager* before submitting tasks. Currently the *ResourceProfile* only contains cpu cores and memory properties. The memory should be expanded to different types such as heap memory, direct memory and native memory which corresponds with memory in *ResourceSpec*. > ResourceProfile for slot request should be expanded to correspond with > ResourceSpec > --- > > Key: FLINK-5135 > URL: https://issues.apache.org/jira/browse/FLINK-5135 > Project: Flink > Issue Type: Sub-task > Components: JobManager, ResourceManager >Reporter: Zhijiang Wang > > The *JobManager* requests slot by *ResourceProfile* from *ResourceManager* > before submitting tasks. Currently the *ResourceProfile* only contains cpu > cores and memory properties. The memory should be expanded to different types > such as heap memory, direct memory and native memory which corresponds with > memory in *ResourceSpec*. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5135) ResourceProfile for slot request should be expanded to correspond with ResourceSpec
[ https://issues.apache.org/jira/browse/FLINK-5135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5135: - Summary: ResourceProfile for slot request should be expanded to correspond with ResourceSpec (was: ResourceProfile should be expanded to correspond with ResourceSpec) > ResourceProfile for slot request should be expanded to correspond with > ResourceSpec > --- > > Key: FLINK-5135 > URL: https://issues.apache.org/jira/browse/FLINK-5135 > Project: Flink > Issue Type: Sub-task > Components: JobManager, ResourceManager >Reporter: Zhijiang Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5131) Fine-grained Resource Configuration
[ https://issues.apache.org/jira/browse/FLINK-5131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5131: - Description: Normally the UDF just creates short-life small objects and these can be recycled quickly by JVM, so most of the memory resource is controlled and managed by *TaskManager* framework. But for some special cases, the UDF may consume much resource to create long-live big objects, so it is necessary to provide the options for professional users to define the resource usages if needed. The basic approach is the following: - Introduce the *ResourceSpec* structure to describe the different resource factors (cpu cores, heap memory, direct memory, native memory, etc) and provide some basic construction methods for resource group. - The *ResourceSpec* can be setted onto the internal transformation in DataStream and base operator in DataSet separately. - In job graph generation, the *ResourceSpec* will be aggregated for chained operators. - When *JobManager* requests slot for submitting task from *ResourceManager*, the *ResourceProfile* will be expanded to correspond with *ResourceSpec*. - The *ResourceManager* requests resource for container from cluster, it should consider extra framework memory except for slot *ResourceProfile*. - The framework memory is mainly used by *NetworkBufferPool* and *MemoryManager* in *TaskManager*, and it can be configured in job level. - Apart from resource, The JVM options attached with container should be supported and could also be configured in job level. This feature will be implemented directly into flip-6 branch. was: Normally the UDF just creates short-life small objects and these can be recycled quickly by JVM, so most of the memory resource is controlled and managed by *TaskManager* framework. But for some special cases, the UDF may consume much resource to create long-live big objects, so it is necessary to provide the options for professional users to define the resource usages if needed. The basic approach is the following: - Introduce the *ResourceSpec* structure to describe the different resource factors (cpu cores, heap memory, direct memory, native memory, etc) and provide some basic construction methods for resource group. - The *ResourceSpec* can be setted onto the internal transformation in DataStream and base operator in DataSet separately. - In job graph generation, the *ResourceSpec* will be aggregated for chained operators. - When *JobManager* requests slot for submitting task from *ResourceManager*, the *ResourceProfile* will be expanded to correspondence with *ResourceSpec*. - The *ResourceManager* requests resource for container from cluster, it should consider extra framework memory except for slot *ResourceProfile*. - The framework memory is mainly used by *NetworkBufferPool* and *MemoryManager* in *TaskManager*, and it can be configured in job level. - Apart from resource, The JVM options attached with container should be supported and could also be configured in job level. This feature will be implemented directly into flip-6 branch. > Fine-grained Resource Configuration > --- > > Key: FLINK-5131 > URL: https://issues.apache.org/jira/browse/FLINK-5131 > Project: Flink > Issue Type: New Feature > Components: DataSet API, DataStream API, JobManager, ResourceManager >Reporter: Zhijiang Wang > > Normally the UDF just creates short-life small objects and these can be > recycled quickly by JVM, so most of the memory resource is controlled and > managed by *TaskManager* framework. But for some special cases, the UDF may > consume much resource to create long-live big objects, so it is necessary to > provide the options for professional users to define the resource usages if > needed. > The basic approach is the following: > - Introduce the *ResourceSpec* structure to describe the different resource > factors (cpu cores, heap memory, direct memory, native memory, etc) and > provide some basic construction methods for resource group. > - The *ResourceSpec* can be setted onto the internal transformation in > DataStream and base operator in DataSet separately. > - In job graph generation, the *ResourceSpec* will be aggregated for > chained operators. > - When *JobManager* requests slot for submitting task from > *ResourceManager*, the *ResourceProfile* will be expanded to correspond with > *ResourceSpec*. > - The *ResourceManager* requests resource for container from cluster, it > should consider extra framework memory except for slot *ResourceProfile*. > - The framework memory is mainly used by *NetworkBufferPool* and > *MemoryManager* in *TaskManager*, and it can be configured in job level. > - Apart from resource, The JVM options attached with
[jira] [Created] (FLINK-5135) ResourceProfile should be expanded to correspond with ResourceSpec
Zhijiang Wang created FLINK-5135: Summary: ResourceProfile should be expanded to correspond with ResourceSpec Key: FLINK-5135 URL: https://issues.apache.org/jira/browse/FLINK-5135 Project: Flink Issue Type: Sub-task Components: JobManager, ResourceManager Reporter: Zhijiang Wang -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating job graph
[ https://issues.apache.org/jira/browse/FLINK-5134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5134: - Description: In *JobGraph* generation, each *JobVertex* corresponds to a series of chained operators. The resource of *JobVertex* should be aggregation of individual resource in chained operators. For memory resource in *JobVertex*, the aggregation is the sum formula for chained operators. And for cpu cores resource in *JobVertex*, the aggregation is the maximum formula for chained operators. was: In *JobGraph* generation, each *JobVertex* corresponds to a series of chained operators. The resource of *JobVertex* should be aggregation of individual resource in chained operators. For memory resource in *JobVertex*, the aggregation is the sum formula for chained operators. And for cpu cores resource in *JobVertex*, the aggregation is the maximum formula for chained operators. > Aggregate ResourceSpe for chained operators when generating job graph > - > > Key: FLINK-5134 > URL: https://issues.apache.org/jira/browse/FLINK-5134 > Project: Flink > Issue Type: Sub-task > Components: DataStream API >Reporter: Zhijiang Wang > > In *JobGraph* generation, each *JobVertex* corresponds to a series of chained > operators. > The resource of *JobVertex* should be aggregation of individual resource in > chained operators. > For memory resource in *JobVertex*, the aggregation is the sum formula for > chained operators. > And for cpu cores resource in *JobVertex*, the aggregation is the maximum > formula for chained operators. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating job graph
[ https://issues.apache.org/jira/browse/FLINK-5134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5134: - Description: In *JobGraph* generation, each *JobVertex* corresponds to a series of chained operators. The resource of *JobVertex* should be aggregation of individual resource in chained operators. For memory resource in *JobVertex*, the aggregation is the sum formula for chained operators. And for cpu cores resource in *JobVertex*, the aggregation is the maximum formula for chained operators. was:In DataStream API, the *ResourceSpec* is setted onto the internal transformation > Aggregate ResourceSpe for chained operators when generating job graph > - > > Key: FLINK-5134 > URL: https://issues.apache.org/jira/browse/FLINK-5134 > Project: Flink > Issue Type: Sub-task > Components: DataStream API >Reporter: Zhijiang Wang > > In *JobGraph* generation, each *JobVertex* corresponds to a series of chained > operators. > The resource of *JobVertex* should be aggregation of individual resource in > chained operators. > For memory resource in *JobVertex*, the aggregation is the sum formula for > chained operators. And for cpu cores resource in *JobVertex*, the aggregation > is the maximum formula for chained operators. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5131) Fine-grained Resource Configuration
[ https://issues.apache.org/jira/browse/FLINK-5131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5131: - Description: Normally the UDF just creates short-life small objects and these can be recycled quickly by JVM, so most of the memory resource is controlled and managed by *TaskManager* framework. But for some special cases, the UDF may consume much resource to create long-live big objects, so it is necessary to provide the options for professional users to define the resource usages if needed. The basic approach is the following: - Introduce the *ResourceSpec* structure to describe the different resource factors (cpu cores, heap memory, direct memory, native memory, etc) and provide some basic construction methods for resource group. - The *ResourceSpec* can be setted onto the internal transformation in DataStream and base operator in DataSet separately. - In job graph generation, the *ResourceSpec* will be aggregated for chained operators. - When *JobManager* requests slot for submitting task from *ResourceManager*, the *ResourceProfile* will be expanded to correspondence with *ResourceSpec*. - The *ResourceManager* requests resource for container from cluster, it should consider extra framework memory except for slot *ResourceProfile*. - The framework memory is mainly used by *NetworkBufferPool* and *MemoryManager* in *TaskManager*, and it can be configured in job level. - Apart from resource, The JVM options attached with container should be supported and could also be configured in job level. This feature will be implemented directly into flip-6 branch. was: Normally the UDF just creates short-life small objects and these can be recycled quickly by JVM, so most of the memory resource is controlled and managed by *TaskManager* framework. But for some special cases, the UDF may consume much resource to create long-live big objects, so it is necessary to provide the options for professional users to define the resource usages if needed. The basic approach is the following: - Introduce the *ResourceSpec* structure to describe the different resource factors (cpu cores, heap memory, direct memory, native memory, etc) and provide some basic construction methods for resource group. - The *ResourceSpec* can be setted onto the internal transformation in DataStream and base operator in DataSet separately. - In stream graph generation, the *ResourceSpec* will be aggregated for chained operators. - When *JobManager* requests slot for submitting task from *ResourceManager*, the *ResourceProfile* will be expanded to correspondence with *ResourceSpec*. - The *ResourceManager* requests resource for container from cluster, it should consider extra framework memory except for slot *ResourceProfile*. - The framework memory is mainly used by *NetworkBufferPool* and *MemoryManager* in *TaskManager*, and it can be configured in job level. - Apart from resource, The JVM options attached with container should be supported and could also be configured in job level. This feature will be implemented directly into flip-6 branch. > Fine-grained Resource Configuration > --- > > Key: FLINK-5131 > URL: https://issues.apache.org/jira/browse/FLINK-5131 > Project: Flink > Issue Type: New Feature > Components: DataSet API, DataStream API, JobManager, ResourceManager >Reporter: Zhijiang Wang > > Normally the UDF just creates short-life small objects and these can be > recycled quickly by JVM, so most of the memory resource is controlled and > managed by *TaskManager* framework. But for some special cases, the UDF may > consume much resource to create long-live big objects, so it is necessary to > provide the options for professional users to define the resource usages if > needed. > The basic approach is the following: > - Introduce the *ResourceSpec* structure to describe the different resource > factors (cpu cores, heap memory, direct memory, native memory, etc) and > provide some basic construction methods for resource group. > - The *ResourceSpec* can be setted onto the internal transformation in > DataStream and base operator in DataSet separately. > - In job graph generation, the *ResourceSpec* will be aggregated for > chained operators. > - When *JobManager* requests slot for submitting task from > *ResourceManager*, the *ResourceProfile* will be expanded to correspondence > with *ResourceSpec*. > - The *ResourceManager* requests resource for container from cluster, it > should consider extra framework memory except for slot *ResourceProfile*. > - The framework memory is mainly used by *NetworkBufferPool* and > *MemoryManager* in *TaskManager*, and it can be configured in job level. > - Apart from resource, The JVM options attached
[jira] [Updated] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating job graph
[ https://issues.apache.org/jira/browse/FLINK-5134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5134: - Summary: Aggregate ResourceSpe for chained operators when generating job graph (was: Aggregate ResourceSpe for chained operators when generating stream graph) > Aggregate ResourceSpe for chained operators when generating job graph > - > > Key: FLINK-5134 > URL: https://issues.apache.org/jira/browse/FLINK-5134 > Project: Flink > Issue Type: Sub-task > Components: DataStream API >Reporter: Zhijiang Wang > > In DataStream API, the *ResourceSpec* is setted onto the internal > transformation -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating stream graph
[ https://issues.apache.org/jira/browse/FLINK-5134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5134: - Description: In DataStream API, the *ResourceSpec* is setted onto the internal transformation (was: In datastream API,) > Aggregate ResourceSpe for chained operators when generating stream graph > > > Key: FLINK-5134 > URL: https://issues.apache.org/jira/browse/FLINK-5134 > Project: Flink > Issue Type: Sub-task > Components: DataStream API >Reporter: Zhijiang Wang > > In DataStream API, the *ResourceSpec* is setted onto the internal > transformation -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating stream graph
[ https://issues.apache.org/jira/browse/FLINK-5134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5134: - Description: In datastream API, > Aggregate ResourceSpe for chained operators when generating stream graph > > > Key: FLINK-5134 > URL: https://issues.apache.org/jira/browse/FLINK-5134 > Project: Flink > Issue Type: Sub-task > Components: DataStream API >Reporter: Zhijiang Wang > > In datastream API, -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating stream graph
Zhijiang Wang created FLINK-5134: Summary: Aggregate ResourceSpe for chained operators when generating stream graph Key: FLINK-5134 URL: https://issues.apache.org/jira/browse/FLINK-5134 Project: Flink Issue Type: Sub-task Components: DataStream API Reporter: Zhijiang Wang -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2792: [FLINK-4937] [Table] Add incremental group window aggrega...
Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/2792 @fhueske thanks a lot for the review. I have refactored AggregateUtil and updated the PR . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API
[ https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688860#comment-15688860 ] ASF GitHub Bot commented on FLINK-4937: --- Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/2792 @fhueske thanks a lot for the review. I have refactored AggregateUtil and updated the PR . > Add incremental group window aggregation for streaming Table API > > > Key: FLINK-4937 > URL: https://issues.apache.org/jira/browse/FLINK-4937 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: sunjincheng > > Group-window aggregates for streaming tables are currently not done in an > incremental fashion. This means that the window collects all records and > performs the aggregation when the window is closed instead of eagerly > updating a partial aggregate for every added record. Since records are > buffered, non-incremental aggregation requires more storage space than > incremental aggregation. > The DataStream API which is used under the hood of the streaming Table API > features [incremental > aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation] > using a {{ReduceFunction}}. > We should add support for incremental aggregation in group-windows. > This is a follow-up task of FLINK-4691. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5133) Add new setResource API for DataStream and DataSet
[ https://issues.apache.org/jira/browse/FLINK-5133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5133: - Description: This is part of the fine-grained resource configuration. For *DataStream*, the *setResource* API will be setted onto *SingleOutputStreamOperator* similar with other existing properties like parallelism, name, etc. For *DataSet*, the *setResource* API will be setted onto *Operator* in the similar way. There are two parameters described with minimum *ResourceSpec* and maximum *ResourceSpec* separately in the API for considering resource resize in future improvements. > Add new setResource API for DataStream and DataSet > -- > > Key: FLINK-5133 > URL: https://issues.apache.org/jira/browse/FLINK-5133 > Project: Flink > Issue Type: Sub-task > Components: DataSet API, DataStream API >Reporter: Zhijiang Wang > > This is part of the fine-grained resource configuration. > For *DataStream*, the *setResource* API will be setted onto > *SingleOutputStreamOperator* similar with other existing properties like > parallelism, name, etc. > For *DataSet*, the *setResource* API will be setted onto *Operator* in the > similar way. > There are two parameters described with minimum *ResourceSpec* and maximum > *ResourceSpec* separately in the API for considering resource resize in > future improvements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688809#comment-15688809 ] ASF GitHub Bot commented on FLINK-5023: --- Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/2768 @aljoscha Thanks for your review. I have updated the PR according to your suggestion. > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2768: [FLINK-5023][FLINK-5024] Add SimpleStateDescriptor to cla...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/2768 @aljoscha Thanks for your review. I have updated the PR according to your suggestion. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-5133) Add new setResource API for DataStream and DataSet
Zhijiang Wang created FLINK-5133: Summary: Add new setResource API for DataStream and DataSet Key: FLINK-5133 URL: https://issues.apache.org/jira/browse/FLINK-5133 Project: Flink Issue Type: Sub-task Components: DataSet API, DataStream API Reporter: Zhijiang Wang -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5131) Fine-grained Resource Configuration
[ https://issues.apache.org/jira/browse/FLINK-5131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5131: - Description: Normally the UDF just creates short-life small objects and these can be recycled quickly by JVM, so most of the memory resource is controlled and managed by *TaskManager* framework. But for some special cases, the UDF may consume much resource to create long-live big objects, so it is necessary to provide the options for professional users to define the resource usages if needed. The basic approach is the following: - Introduce the *ResourceSpec* structure to describe the different resource factors (cpu cores, heap memory, direct memory, native memory, etc) and provide some basic construction methods for resource group. - The *ResourceSpec* can be setted onto the internal transformation in DataStream and base operator in DataSet separately. - In stream graph generation, the *ResourceSpec* will be aggregated for chained operators. - When *JobManager* requests slot for submitting task from *ResourceManager*, the *ResourceProfile* will be expanded to correspondence with *ResourceSpec*. - The *ResourceManager* requests resource for container from cluster, it should consider extra framework memory except for slot *ResourceProfile*. - The framework memory is mainly used by *NetworkBufferPool* and *MemoryManager* in *TaskManager*, and it can be configured in job level. - Apart from resource, The JVM options attached with container should be supported and could also be configured in job level. This feature will be implemented directly into flip-6 branch. was: Normally the UDF just creates short-life small objects and these can be recycled quickly by JVM, so most of the memory resource is controlled and managed by *TaskManager* framework. But for some special cases, the UDF may consume much resource to create long-live big objects, so it is necessary to provide the options for professional users to define the resource usages if needed. The basic approach is the following: - Introduce the *ResourceSpec* structure to describe the different resource factors (cpu cores, heap memory, direct memory, native memory, etc) and provide some basic construction methods for resource group. - The *ResourceSpec* can be setted onto the internal transformation in DataStream and base operator in DataSet separately. - In stream graph generation, the *ResourceSpec* will be aggregated for chained operators. - When *JobManager* requests slot for submitting task from *ResourceManager*, the *ResourceProfile* will be expanded to correspondence with *ResourceSpec*. - The *ResourceManager* requests resource for container from cluster, it should consider extra framework memory except for slot *ResourceProfile*. - The framework memory is mainly used by *NetworkBufferPool* and *MemoryManager* in *TaskManager*, and it can be configured in job level. - Apart from resource, The JVM options attached with container should be supported and could also be configured in job level. > Fine-grained Resource Configuration > --- > > Key: FLINK-5131 > URL: https://issues.apache.org/jira/browse/FLINK-5131 > Project: Flink > Issue Type: New Feature > Components: DataSet API, DataStream API, JobManager, ResourceManager >Reporter: Zhijiang Wang > > Normally the UDF just creates short-life small objects and these can be > recycled quickly by JVM, so most of the memory resource is controlled and > managed by *TaskManager* framework. But for some special cases, the UDF may > consume much resource to create long-live big objects, so it is necessary to > provide the options for professional users to define the resource usages if > needed. > The basic approach is the following: > - Introduce the *ResourceSpec* structure to describe the different resource > factors (cpu cores, heap memory, direct memory, native memory, etc) and > provide some basic construction methods for resource group. > - The *ResourceSpec* can be setted onto the internal transformation in > DataStream and base operator in DataSet separately. > - In stream graph generation, the *ResourceSpec* will be aggregated for > chained operators. > - When *JobManager* requests slot for submitting task from > *ResourceManager*, the *ResourceProfile* will be expanded to correspondence > with *ResourceSpec*. > - The *ResourceManager* requests resource for container from cluster, it > should consider extra framework memory except for slot *ResourceProfile*. > - The framework memory is mainly used by *NetworkBufferPool* and > *MemoryManager* in *TaskManager*, and it can be configured in job level. > - Apart from resource, The JVM options attached with container should be > supported and could also be
[jira] [Updated] (FLINK-5132) Introduce the ResourceSpec for grouping different resource factors in API
[ https://issues.apache.org/jira/browse/FLINK-5132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5132: - Description: This is part of the fine-grained resource configuration. The current resource factors include cpu cores, heap memory, direct memory, native memory and state size. The *ResourceSpec* will provide some basic constructions for grouping different resource factors as needed and the construction can also be expanded easily for further requirements. > Introduce the ResourceSpec for grouping different resource factors in API > - > > Key: FLINK-5132 > URL: https://issues.apache.org/jira/browse/FLINK-5132 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: Zhijiang Wang > > This is part of the fine-grained resource configuration. > The current resource factors include cpu cores, heap memory, direct memory, > native memory and state size. > The *ResourceSpec* will provide some basic constructions for grouping > different resource factors as needed and the construction can also be > expanded easily for further requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5132) Introduce the ResourceSpec for grouping different resource factors in API
[ https://issues.apache.org/jira/browse/FLINK-5132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5132: - Description: This is part of the fine-grained resource configuration. The current resource factors include cpu cores, heap memory, direct memory, native memory and state size. The *ResourceSpec* will provide some basic constructions for grouping different resource factors as needed and the construction can also be expanded easily for further requirements. was: This is part of the fine-grained resource configuration. The current resource factors include cpu cores, heap memory, direct memory, native memory and state size. The *ResourceSpec* will provide some basic constructions for grouping different resource factors as needed and the construction can also be expanded easily for further requirements. > Introduce the ResourceSpec for grouping different resource factors in API > - > > Key: FLINK-5132 > URL: https://issues.apache.org/jira/browse/FLINK-5132 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: Zhijiang Wang > > This is part of the fine-grained resource configuration. > The current resource factors include cpu cores, heap memory, direct memory, > native memory and state size. > The *ResourceSpec* will provide some basic constructions for grouping > different resource factors as needed and the construction can also be > expanded easily for further requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r89252908 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -135,6 +138,32 @@ object UserDefinedFunctionUtils { } /** +* Returns eval method matching the given signature of [[TypeInformation]]. +*/ + def getEvalMethod( +function: EvaluableFunction, --- End diff -- hi @fhueske When we declare the ScalarFunction and TableFunction as follows: trait ScalarFunction, trait TableFunction [T], it will become very useful. The current version is optional. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688781#comment-15688781 ] ASF GitHub Bot commented on FLINK-4469: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r89252908 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -135,6 +138,32 @@ object UserDefinedFunctionUtils { } /** +* Returns eval method matching the given signature of [[TypeInformation]]. +*/ + def getEvalMethod( +function: EvaluableFunction, --- End diff -- hi @fhueske When we declare the ScalarFunction and TableFunction as follows: trait ScalarFunction, trait TableFunction [T], it will become very useful. The current version is optional. > Add support for user defined table function in Table API & SQL > -- > > Key: FLINK-4469 > URL: https://issues.apache.org/jira/browse/FLINK-4469 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Normal user-defined functions, such as concat(), take in a single input row > and output a single output row. In contrast, table-generating functions > transform a single input row to multiple output rows. It is very useful in > some cases, such as look up in HBase by rowkey and return one or more rows. > Adding a user defined table function should: > 1. inherit from UDTF class with specific generic type T > 2. define one or more evel function. > NOTE: > 1. the eval method must be public and non-static. > 2. the generic type T is the row type returned by table function. Because of > Java type erasure, we can’t extract T from the Iterable. > 3. use {{collect(T)}} to emit table row > 4. eval method can be overload. Blink will choose the best match eval method > to call according to parameter types and number. > {code} > public class Word { > public String word; > public Integer length; > } > public class SplitStringUDTF extends UDTF { > public Iterable eval(String str) { > if (str != null) { > for (String s : str.split(",")) { > collect(new Word(s, s.length())); > } > } > } > } > // in SQL > tableEnv.registerFunction("split", new SplitStringUDTF()) > tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS > t(w,l)") > // in Java Table API > tableEnv.registerFunction("split", new SplitStringUDTF()) > // rename split table columns to “w” and “l” > table.crossApply("split(c) as (w, l)") > .select("a, b, w, l") > // without renaming, we will use the origin field names in the POJO/case/... > table.crossApply("split(c)") > .select("a, b, word, length") > // in Scala Table API > val split = new SplitStringUDTF() > table.crossApply(split('c) as ('w, 'l)) > .select('a, 'b, 'w, 'l) > // outerApply for outer join to a UDTF > table.outerApply(split('c)) > .select('a, 'b, 'word, 'length) > {code} > See [1] for more information about UDTF design. > [1] > https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit# -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5132) Introduce the ResourceSpec for grouping different resource factors in API
Zhijiang Wang created FLINK-5132: Summary: Introduce the ResourceSpec for grouping different resource factors in API Key: FLINK-5132 URL: https://issues.apache.org/jira/browse/FLINK-5132 Project: Flink Issue Type: Sub-task Components: Core Reporter: Zhijiang Wang -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5131) Fine-grained Resource Configuration
[ https://issues.apache.org/jira/browse/FLINK-5131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5131: - Description: Normally the UDF just creates short-life small objects and these can be recycled quickly by JVM, so most of the memory resource is controlled and managed by *TaskManager* framework. But for some special cases, the UDF may consume much resource to create long-live big objects, so it is necessary to provide the options for professional users to define the resource usages if needed. The basic approach is the following: - Introduce the *ResourceSpec* structure to describe the different resource factors (cpu cores, heap memory, direct memory, native memory, etc) and provide some basic construction methods for resource group. - The *ResourceSpec* can be setted onto the internal transformation in DataStream and base operator in DataSet separately. - In stream graph generation, the *ResourceSpec* will be aggregated for chained operators. - When *JobManager* requests slot for submitting task from *ResourceManager*, the *ResourceProfile* will be expanded to correspondence with *ResourceSpec*. - The *ResourceManager* requests resource for container from cluster, it should consider extra framework memory except for slot *ResourceProfile*. - The framework memory is mainly used by *NetworkBufferPool* and *MemoryManager* in *TaskManager*, and it can be configured in job level. - Apart from resource, The JVM options attached with container should be supported and could also be configured in job level. > Fine-grained Resource Configuration > --- > > Key: FLINK-5131 > URL: https://issues.apache.org/jira/browse/FLINK-5131 > Project: Flink > Issue Type: New Feature > Components: DataSet API, DataStream API, JobManager, ResourceManager >Reporter: Zhijiang Wang > > Normally the UDF just creates short-life small objects and these can be > recycled quickly by JVM, so most of the memory resource is controlled and > managed by *TaskManager* framework. But for some special cases, the UDF may > consume much resource to create long-live big objects, so it is necessary to > provide the options for professional users to define the resource usages if > needed. > The basic approach is the following: > - Introduce the *ResourceSpec* structure to describe the different resource > factors (cpu cores, heap memory, direct memory, native memory, etc) and > provide some basic construction methods for resource group. > - The *ResourceSpec* can be setted onto the internal transformation in > DataStream and base operator in DataSet separately. > - In stream graph generation, the *ResourceSpec* will be aggregated for > chained operators. > - When *JobManager* requests slot for submitting task from > *ResourceManager*, the *ResourceProfile* will be expanded to correspondence > with *ResourceSpec*. > - The *ResourceManager* requests resource for container from cluster, it > should consider extra framework memory except for slot *ResourceProfile*. > - The framework memory is mainly used by *NetworkBufferPool* and > *MemoryManager* in *TaskManager*, and it can be configured in job level. > - Apart from resource, The JVM options attached with container should be > supported and could also be configured in job level. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89249046 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -115,14 +124,210 @@ object AggregateUtil { intermediateRowArity, outputType.getFieldCount) } +groupReduceFunction + } + + /** +* Create IncrementalAggregateReduceFunction for Incremental aggregates. It implement +* [[org.apache.flink.api.common.functions.ReduceFunction]] +* +*/ + private[flink] def createIncrementalAggregateReduceFunction( +aggregates: Array[Aggregate[_ <: Any]], +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): IncrementalAggregateReduceFunction = { +val groupingOffsetMapping = + getGroupingOffsetAndaggOffsetMapping( +namedAggregates, +inputType, +outputType, +groupings)._1 +val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum +val reduceFunction = new IncrementalAggregateReduceFunction( + aggregates, + groupingOffsetMapping, + intermediateRowArity) +reduceFunction + } + + /** +* @return groupingOffsetMapping (mapping relation between field index of intermediate +* aggregate Row and output Row.) +* and aggOffsetMapping (the mapping relation between aggregate function index in list +* and its corresponding field index in output Row.) +*/ + def getGroupingOffsetAndaggOffsetMapping( +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = { + +// the mapping relation between field index of intermediate aggregate Row and output Row. +val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings) + +// the mapping relation between aggregate function index in list and its corresponding +// field index in output Row. +val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType) -(mapFunction, reduceGroupFunction) +if (groupingOffsetMapping.length != groupings.length || + aggOffsetMapping.length != namedAggregates.length) { + throw new TableException( +"Could not find output field in input data type " + + "or aggregate functions.") +} +(groupingOffsetMapping, aggOffsetMapping) + } + + + private[flink] def createAllWindowAggregationFunction( +window: LogicalWindow, +properties: Seq[NamedWindowProperty], +aggFunction: RichGroupReduceFunction[Row, Row]) --- End diff -- yes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API
[ https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688661#comment-15688661 ] ASF GitHub Bot commented on FLINK-4937: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89249046 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -115,14 +124,210 @@ object AggregateUtil { intermediateRowArity, outputType.getFieldCount) } +groupReduceFunction + } + + /** +* Create IncrementalAggregateReduceFunction for Incremental aggregates. It implement +* [[org.apache.flink.api.common.functions.ReduceFunction]] +* +*/ + private[flink] def createIncrementalAggregateReduceFunction( +aggregates: Array[Aggregate[_ <: Any]], +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): IncrementalAggregateReduceFunction = { +val groupingOffsetMapping = + getGroupingOffsetAndaggOffsetMapping( +namedAggregates, +inputType, +outputType, +groupings)._1 +val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum +val reduceFunction = new IncrementalAggregateReduceFunction( + aggregates, + groupingOffsetMapping, + intermediateRowArity) +reduceFunction + } + + /** +* @return groupingOffsetMapping (mapping relation between field index of intermediate +* aggregate Row and output Row.) +* and aggOffsetMapping (the mapping relation between aggregate function index in list +* and its corresponding field index in output Row.) +*/ + def getGroupingOffsetAndaggOffsetMapping( +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = { + +// the mapping relation between field index of intermediate aggregate Row and output Row. +val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings) + +// the mapping relation between aggregate function index in list and its corresponding +// field index in output Row. +val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType) -(mapFunction, reduceGroupFunction) +if (groupingOffsetMapping.length != groupings.length || + aggOffsetMapping.length != namedAggregates.length) { + throw new TableException( +"Could not find output field in input data type " + + "or aggregate functions.") +} +(groupingOffsetMapping, aggOffsetMapping) + } + + + private[flink] def createAllWindowAggregationFunction( +window: LogicalWindow, +properties: Seq[NamedWindowProperty], +aggFunction: RichGroupReduceFunction[Row, Row]) --- End diff -- yes. > Add incremental group window aggregation for streaming Table API > > > Key: FLINK-4937 > URL: https://issues.apache.org/jira/browse/FLINK-4937 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: sunjincheng > > Group-window aggregates for streaming tables are currently not done in an > incremental fashion. This means that the window collects all records and > performs the aggregation when the window is closed instead of eagerly > updating a partial aggregate for every added record. Since records are > buffered, non-incremental aggregation requires more storage space than > incremental aggregation. > The DataStream API which is used under the hood of the streaming Table API > features [incremental > aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation] > using a {{ReduceFunction}}. > We should add support for incremental aggregation in group-windows. > This is a follow-up task of FLINK-4691. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5061) Remove ContinuousEventTimeTrigger
[ https://issues.apache.org/jira/browse/FLINK-5061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688654#comment-15688654 ] Manu Zhang commented on FLINK-5061: --- Agreed. Just some of my thoughts and should not block the changes. > Remove ContinuousEventTimeTrigger > - > > Key: FLINK-5061 > URL: https://issues.apache.org/jira/browse/FLINK-5061 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 1.2.0 > > > The Trigger does not do what people think it does. The problem is that a > watermark T signals that we won't see elements with timestamp < T in the > future. It does not signal that we have not yet seen elements with timestamp > > T. So it cannot really be used to trigger at different stages of processing > an event-time window. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5131) Fine-grained Resource Configuration
Zhijiang Wang created FLINK-5131: Summary: Fine-grained Resource Configuration Key: FLINK-5131 URL: https://issues.apache.org/jira/browse/FLINK-5131 Project: Flink Issue Type: New Feature Components: DataSet API, DataStream API, JobManager, ResourceManager Reporter: Zhijiang Wang -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API
[ https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688612#comment-15688612 ] ASF GitHub Bot commented on FLINK-4937: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89247407 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -115,14 +124,210 @@ object AggregateUtil { intermediateRowArity, outputType.getFieldCount) } +groupReduceFunction + } + + /** +* Create IncrementalAggregateReduceFunction for Incremental aggregates. It implement +* [[org.apache.flink.api.common.functions.ReduceFunction]] +* +*/ + private[flink] def createIncrementalAggregateReduceFunction( +aggregates: Array[Aggregate[_ <: Any]], +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): IncrementalAggregateReduceFunction = { +val groupingOffsetMapping = + getGroupingOffsetAndaggOffsetMapping( +namedAggregates, +inputType, +outputType, +groupings)._1 +val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum +val reduceFunction = new IncrementalAggregateReduceFunction( + aggregates, + groupingOffsetMapping, + intermediateRowArity) +reduceFunction + } + + /** +* @return groupingOffsetMapping (mapping relation between field index of intermediate +* aggregate Row and output Row.) +* and aggOffsetMapping (the mapping relation between aggregate function index in list +* and its corresponding field index in output Row.) +*/ + def getGroupingOffsetAndaggOffsetMapping( +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = { + +// the mapping relation between field index of intermediate aggregate Row and output Row. +val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings) + +// the mapping relation between aggregate function index in list and its corresponding +// field index in output Row. +val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType) -(mapFunction, reduceGroupFunction) +if (groupingOffsetMapping.length != groupings.length || + aggOffsetMapping.length != namedAggregates.length) { + throw new TableException( +"Could not find output field in input data type " + + "or aggregate functions.") +} +(groupingOffsetMapping, aggOffsetMapping) + } + + + private[flink] def createAllWindowAggregationFunction( +window: LogicalWindow, +properties: Seq[NamedWindowProperty], +aggFunction: RichGroupReduceFunction[Row, Row]) + : AllWindowFunction[Row, Row, DataStreamWindow] = { + +if (isTimeWindow(window)) { + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos) + .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]] +} else { + new AggregateAllWindowFunction(aggFunction) +} + + } + + + private[flink] def createWindowAggregationFunction( +window: LogicalWindow, +properties: Seq[NamedWindowProperty], +aggFunction: RichGroupReduceFunction[Row, Row]) + : WindowFunction[Row, Row, Tuple, DataStreamWindow] = { + +if (isTimeWindow(window)) { + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + new AggregateTimeWindowFunction(aggFunction, startPos, endPos) + .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]] +} else { + new AggregateWindowFunction(aggFunction) +} + + } + + private[flink] def createAllWindowIncrementalAggregationFunction( +window: LogicalWindow, +aggregates: Array[Aggregate[_ <: Any]], --- End diff -- sure. agree. > Add incremental group window aggregation for streaming Table API > > > Key: FLINK-4937 > URL: https://issues.apache.org/jira/browse/FLINK-4937 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.2.0 >
[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API
[ https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688613#comment-15688613 ] ASF GitHub Bot commented on FLINK-4937: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89247416 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -115,14 +124,210 @@ object AggregateUtil { intermediateRowArity, outputType.getFieldCount) } +groupReduceFunction + } + + /** +* Create IncrementalAggregateReduceFunction for Incremental aggregates. It implement +* [[org.apache.flink.api.common.functions.ReduceFunction]] +* +*/ + private[flink] def createIncrementalAggregateReduceFunction( +aggregates: Array[Aggregate[_ <: Any]], +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): IncrementalAggregateReduceFunction = { +val groupingOffsetMapping = + getGroupingOffsetAndaggOffsetMapping( +namedAggregates, +inputType, +outputType, +groupings)._1 +val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum +val reduceFunction = new IncrementalAggregateReduceFunction( + aggregates, + groupingOffsetMapping, + intermediateRowArity) +reduceFunction + } + + /** +* @return groupingOffsetMapping (mapping relation between field index of intermediate +* aggregate Row and output Row.) +* and aggOffsetMapping (the mapping relation between aggregate function index in list +* and its corresponding field index in output Row.) +*/ + def getGroupingOffsetAndaggOffsetMapping( +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = { + +// the mapping relation between field index of intermediate aggregate Row and output Row. +val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings) + +// the mapping relation between aggregate function index in list and its corresponding +// field index in output Row. +val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType) -(mapFunction, reduceGroupFunction) +if (groupingOffsetMapping.length != groupings.length || + aggOffsetMapping.length != namedAggregates.length) { + throw new TableException( +"Could not find output field in input data type " + + "or aggregate functions.") +} +(groupingOffsetMapping, aggOffsetMapping) + } + + + private[flink] def createAllWindowAggregationFunction( +window: LogicalWindow, +properties: Seq[NamedWindowProperty], +aggFunction: RichGroupReduceFunction[Row, Row]) + : AllWindowFunction[Row, Row, DataStreamWindow] = { + +if (isTimeWindow(window)) { + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos) + .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]] +} else { + new AggregateAllWindowFunction(aggFunction) +} + + } + + + private[flink] def createWindowAggregationFunction( +window: LogicalWindow, +properties: Seq[NamedWindowProperty], +aggFunction: RichGroupReduceFunction[Row, Row]) + : WindowFunction[Row, Row, Tuple, DataStreamWindow] = { + +if (isTimeWindow(window)) { + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + new AggregateTimeWindowFunction(aggFunction, startPos, endPos) + .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]] +} else { + new AggregateWindowFunction(aggFunction) +} + + } + + private[flink] def createAllWindowIncrementalAggregationFunction( +window: LogicalWindow, +aggregates: Array[Aggregate[_ <: Any]], +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int], +properties: Seq[NamedWindowProperty]) + : AllWindowFunction[Row, Row, DataStreamWindow] = { + +val (groupingOffsetMapping, aggOffsetMapping) = + getGroupingOffsetAndaggOffsetMapping( + namedAggregates, + inputType,
[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89247407 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -115,14 +124,210 @@ object AggregateUtil { intermediateRowArity, outputType.getFieldCount) } +groupReduceFunction + } + + /** +* Create IncrementalAggregateReduceFunction for Incremental aggregates. It implement +* [[org.apache.flink.api.common.functions.ReduceFunction]] +* +*/ + private[flink] def createIncrementalAggregateReduceFunction( +aggregates: Array[Aggregate[_ <: Any]], +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): IncrementalAggregateReduceFunction = { +val groupingOffsetMapping = + getGroupingOffsetAndaggOffsetMapping( +namedAggregates, +inputType, +outputType, +groupings)._1 +val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum +val reduceFunction = new IncrementalAggregateReduceFunction( + aggregates, + groupingOffsetMapping, + intermediateRowArity) +reduceFunction + } + + /** +* @return groupingOffsetMapping (mapping relation between field index of intermediate +* aggregate Row and output Row.) +* and aggOffsetMapping (the mapping relation between aggregate function index in list +* and its corresponding field index in output Row.) +*/ + def getGroupingOffsetAndaggOffsetMapping( +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = { + +// the mapping relation between field index of intermediate aggregate Row and output Row. +val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings) + +// the mapping relation between aggregate function index in list and its corresponding +// field index in output Row. +val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType) -(mapFunction, reduceGroupFunction) +if (groupingOffsetMapping.length != groupings.length || + aggOffsetMapping.length != namedAggregates.length) { + throw new TableException( +"Could not find output field in input data type " + + "or aggregate functions.") +} +(groupingOffsetMapping, aggOffsetMapping) + } + + + private[flink] def createAllWindowAggregationFunction( +window: LogicalWindow, +properties: Seq[NamedWindowProperty], +aggFunction: RichGroupReduceFunction[Row, Row]) + : AllWindowFunction[Row, Row, DataStreamWindow] = { + +if (isTimeWindow(window)) { + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos) + .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]] +} else { + new AggregateAllWindowFunction(aggFunction) +} + + } + + + private[flink] def createWindowAggregationFunction( +window: LogicalWindow, +properties: Seq[NamedWindowProperty], +aggFunction: RichGroupReduceFunction[Row, Row]) + : WindowFunction[Row, Row, Tuple, DataStreamWindow] = { + +if (isTimeWindow(window)) { + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + new AggregateTimeWindowFunction(aggFunction, startPos, endPos) + .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]] +} else { + new AggregateWindowFunction(aggFunction) +} + + } + + private[flink] def createAllWindowIncrementalAggregationFunction( +window: LogicalWindow, +aggregates: Array[Aggregate[_ <: Any]], --- End diff -- sure. agree. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89247416 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -115,14 +124,210 @@ object AggregateUtil { intermediateRowArity, outputType.getFieldCount) } +groupReduceFunction + } + + /** +* Create IncrementalAggregateReduceFunction for Incremental aggregates. It implement +* [[org.apache.flink.api.common.functions.ReduceFunction]] +* +*/ + private[flink] def createIncrementalAggregateReduceFunction( +aggregates: Array[Aggregate[_ <: Any]], +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): IncrementalAggregateReduceFunction = { +val groupingOffsetMapping = + getGroupingOffsetAndaggOffsetMapping( +namedAggregates, +inputType, +outputType, +groupings)._1 +val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum +val reduceFunction = new IncrementalAggregateReduceFunction( + aggregates, + groupingOffsetMapping, + intermediateRowArity) +reduceFunction + } + + /** +* @return groupingOffsetMapping (mapping relation between field index of intermediate +* aggregate Row and output Row.) +* and aggOffsetMapping (the mapping relation between aggregate function index in list +* and its corresponding field index in output Row.) +*/ + def getGroupingOffsetAndaggOffsetMapping( +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = { + +// the mapping relation between field index of intermediate aggregate Row and output Row. +val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings) + +// the mapping relation between aggregate function index in list and its corresponding +// field index in output Row. +val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType) -(mapFunction, reduceGroupFunction) +if (groupingOffsetMapping.length != groupings.length || + aggOffsetMapping.length != namedAggregates.length) { + throw new TableException( +"Could not find output field in input data type " + + "or aggregate functions.") +} +(groupingOffsetMapping, aggOffsetMapping) + } + + + private[flink] def createAllWindowAggregationFunction( +window: LogicalWindow, +properties: Seq[NamedWindowProperty], +aggFunction: RichGroupReduceFunction[Row, Row]) + : AllWindowFunction[Row, Row, DataStreamWindow] = { + +if (isTimeWindow(window)) { + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos) + .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]] +} else { + new AggregateAllWindowFunction(aggFunction) +} + + } + + + private[flink] def createWindowAggregationFunction( +window: LogicalWindow, +properties: Seq[NamedWindowProperty], +aggFunction: RichGroupReduceFunction[Row, Row]) + : WindowFunction[Row, Row, Tuple, DataStreamWindow] = { + +if (isTimeWindow(window)) { + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + new AggregateTimeWindowFunction(aggFunction, startPos, endPos) + .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]] +} else { + new AggregateWindowFunction(aggFunction) +} + + } + + private[flink] def createAllWindowIncrementalAggregationFunction( +window: LogicalWindow, +aggregates: Array[Aggregate[_ <: Any]], +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int], +properties: Seq[NamedWindowProperty]) + : AllWindowFunction[Row, Row, DataStreamWindow] = { + +val (groupingOffsetMapping, aggOffsetMapping) = + getGroupingOffsetAndaggOffsetMapping( + namedAggregates, + inputType, + outputType, + groupings) + +val finalRowArity = outputType.getFieldCount + +if (isTimeWindow(window)) { + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) +
[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API
[ https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688608#comment-15688608 ] ASF GitHub Bot commented on FLINK-4937: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89247362 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -39,66 +45,69 @@ object AggregateUtil { type JavaList[T] = java.util.List[T] /** - * Create Flink operator functions for aggregates. It includes 2 implementations of Flink - * operator functions: - * [[org.apache.flink.api.common.functions.MapFunction]] and - * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's partial aggregate, - * should also implement [[org.apache.flink.api.common.functions.CombineFunction]] as well). - * The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the - * intermediate aggregate values of all aggregate function, it's stored in Row by the following - * format: - * - * {{{ - * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 - * | | - * v v - *+-+-+++++ - *|groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | - *+-+-+++++ - * ^ - * | - * sum(y) aggOffsetInRow = 4 - * }}} - * - */ - def createOperatorFunctionsForAggregates( - namedAggregates: Seq[CalcitePair[AggregateCall, String]], - inputType: RelDataType, - outputType: RelDataType, - groupings: Array[Int]) -: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = { - -val aggregateFunctionsAndFieldIndexes = - transformToAggregateFunctions(namedAggregates.map(_.getKey), inputType, groupings.length) -// store the aggregate fields of each aggregate function, by the same order of aggregates. -val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1 -val aggregates = aggregateFunctionsAndFieldIndexes._2 +* Create prepare MapFunction for aggregates. +* The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the +* intermediate aggregate values of all aggregate function, it's stored in Row by the following +* format: +* +* {{{ +* avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 +* | | +* v v +*+-+-+++++ +*|groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | +*+-+-+++++ +* ^ +* | +* sum(y) aggOffsetInRow = 4 +* }}} +* +*/ + private[flink] def createPrepareMapFunction( +aggregates: Array[Aggregate[_ <: Any]], +aggFieldIndexes: Array[Int], +groupings: Array[Int], +inputType: +RelDataType): MapFunction[Any, Row] = { val mapReturnType: RowTypeInfo = createAggregateBufferDataType(groupings, aggregates, inputType) val mapFunction = new AggregateMapFunction[Row, Row]( -aggregates, aggFieldIndexes, groupings, - mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]] - -// the mapping relation between field index of intermediate aggregate Row and output Row. -val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings) - -// the mapping relation between aggregate function index in list and its corresponding -// field index in output Row. -val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType) - -if (groupingOffsetMapping.length != groupings.length || -aggOffsetMapping.length != namedAggregates.length) { - throw new TableException("Could not find output field in input data type " + - "or aggregate functions.") -} - -val allPartialAggregate = aggregates.map(_.supportPartial).forall(x => x) + aggregates, + aggFieldIndexes,
[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89247362 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -39,66 +45,69 @@ object AggregateUtil { type JavaList[T] = java.util.List[T] /** - * Create Flink operator functions for aggregates. It includes 2 implementations of Flink - * operator functions: - * [[org.apache.flink.api.common.functions.MapFunction]] and - * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's partial aggregate, - * should also implement [[org.apache.flink.api.common.functions.CombineFunction]] as well). - * The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the - * intermediate aggregate values of all aggregate function, it's stored in Row by the following - * format: - * - * {{{ - * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 - * | | - * v v - *+-+-+++++ - *|groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | - *+-+-+++++ - * ^ - * | - * sum(y) aggOffsetInRow = 4 - * }}} - * - */ - def createOperatorFunctionsForAggregates( - namedAggregates: Seq[CalcitePair[AggregateCall, String]], - inputType: RelDataType, - outputType: RelDataType, - groupings: Array[Int]) -: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = { - -val aggregateFunctionsAndFieldIndexes = - transformToAggregateFunctions(namedAggregates.map(_.getKey), inputType, groupings.length) -// store the aggregate fields of each aggregate function, by the same order of aggregates. -val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1 -val aggregates = aggregateFunctionsAndFieldIndexes._2 +* Create prepare MapFunction for aggregates. +* The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the +* intermediate aggregate values of all aggregate function, it's stored in Row by the following +* format: +* +* {{{ +* avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 +* | | +* v v +*+-+-+++++ +*|groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | +*+-+-+++++ +* ^ +* | +* sum(y) aggOffsetInRow = 4 +* }}} +* +*/ + private[flink] def createPrepareMapFunction( +aggregates: Array[Aggregate[_ <: Any]], +aggFieldIndexes: Array[Int], +groupings: Array[Int], +inputType: +RelDataType): MapFunction[Any, Row] = { val mapReturnType: RowTypeInfo = createAggregateBufferDataType(groupings, aggregates, inputType) val mapFunction = new AggregateMapFunction[Row, Row]( -aggregates, aggFieldIndexes, groupings, - mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]] - -// the mapping relation between field index of intermediate aggregate Row and output Row. -val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings) - -// the mapping relation between aggregate function index in list and its corresponding -// field index in output Row. -val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType) - -if (groupingOffsetMapping.length != groupings.length || -aggOffsetMapping.length != namedAggregates.length) { - throw new TableException("Could not find output field in input data type " + - "or aggregate functions.") -} - -val allPartialAggregate = aggregates.map(_.supportPartial).forall(x => x) + aggregates, + aggFieldIndexes, + groupings, + mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]] -val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum +
[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API
[ https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688606#comment-15688606 ] ASF GitHub Bot commented on FLINK-4937: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89247312 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -39,66 +45,69 @@ object AggregateUtil { type JavaList[T] = java.util.List[T] /** - * Create Flink operator functions for aggregates. It includes 2 implementations of Flink - * operator functions: - * [[org.apache.flink.api.common.functions.MapFunction]] and - * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's partial aggregate, - * should also implement [[org.apache.flink.api.common.functions.CombineFunction]] as well). - * The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the - * intermediate aggregate values of all aggregate function, it's stored in Row by the following - * format: - * - * {{{ - * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 - * | | - * v v - *+-+-+++++ - *|groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | - *+-+-+++++ - * ^ - * | - * sum(y) aggOffsetInRow = 4 - * }}} - * - */ - def createOperatorFunctionsForAggregates( - namedAggregates: Seq[CalcitePair[AggregateCall, String]], - inputType: RelDataType, - outputType: RelDataType, - groupings: Array[Int]) -: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = { - -val aggregateFunctionsAndFieldIndexes = - transformToAggregateFunctions(namedAggregates.map(_.getKey), inputType, groupings.length) -// store the aggregate fields of each aggregate function, by the same order of aggregates. -val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1 -val aggregates = aggregateFunctionsAndFieldIndexes._2 +* Create prepare MapFunction for aggregates. +* The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the +* intermediate aggregate values of all aggregate function, it's stored in Row by the following +* format: +* +* {{{ +* avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 +* | | +* v v +*+-+-+++++ +*|groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | +*+-+-+++++ +* ^ +* | +* sum(y) aggOffsetInRow = 4 +* }}} +* +*/ + private[flink] def createPrepareMapFunction( +aggregates: Array[Aggregate[_ <: Any]], --- End diff -- Cool,I think use Calcite's namedAggregates is a good idea. > Add incremental group window aggregation for streaming Table API > > > Key: FLINK-4937 > URL: https://issues.apache.org/jira/browse/FLINK-4937 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: sunjincheng > > Group-window aggregates for streaming tables are currently not done in an > incremental fashion. This means that the window collects all records and > performs the aggregation when the window is closed instead of eagerly > updating a partial aggregate for every added record. Since records are > buffered, non-incremental aggregation requires more storage space than > incremental aggregation. > The DataStream API which is used under the hood of the streaming Table API > features [incremental > aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation] > using a {{ReduceFunction}}. > We should add support for incremental aggregation in group-windows. > This is a follow-up task of
[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89247312 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -39,66 +45,69 @@ object AggregateUtil { type JavaList[T] = java.util.List[T] /** - * Create Flink operator functions for aggregates. It includes 2 implementations of Flink - * operator functions: - * [[org.apache.flink.api.common.functions.MapFunction]] and - * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's partial aggregate, - * should also implement [[org.apache.flink.api.common.functions.CombineFunction]] as well). - * The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the - * intermediate aggregate values of all aggregate function, it's stored in Row by the following - * format: - * - * {{{ - * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 - * | | - * v v - *+-+-+++++ - *|groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | - *+-+-+++++ - * ^ - * | - * sum(y) aggOffsetInRow = 4 - * }}} - * - */ - def createOperatorFunctionsForAggregates( - namedAggregates: Seq[CalcitePair[AggregateCall, String]], - inputType: RelDataType, - outputType: RelDataType, - groupings: Array[Int]) -: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = { - -val aggregateFunctionsAndFieldIndexes = - transformToAggregateFunctions(namedAggregates.map(_.getKey), inputType, groupings.length) -// store the aggregate fields of each aggregate function, by the same order of aggregates. -val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1 -val aggregates = aggregateFunctionsAndFieldIndexes._2 +* Create prepare MapFunction for aggregates. +* The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the +* intermediate aggregate values of all aggregate function, it's stored in Row by the following +* format: +* +* {{{ +* avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 +* | | +* v v +*+-+-+++++ +*|groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | +*+-+-+++++ +* ^ +* | +* sum(y) aggOffsetInRow = 4 +* }}} +* +*/ + private[flink] def createPrepareMapFunction( +aggregates: Array[Aggregate[_ <: Any]], --- End diff -- Cool,I think use Calcite's namedAggregates is a good idea. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89244761 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -115,14 +124,210 @@ object AggregateUtil { intermediateRowArity, outputType.getFieldCount) } +groupReduceFunction + } + + /** +* Create IncrementalAggregateReduceFunction for Incremental aggregates. It implement +* [[org.apache.flink.api.common.functions.ReduceFunction]] +* +*/ + private[flink] def createIncrementalAggregateReduceFunction( +aggregates: Array[Aggregate[_ <: Any]], +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): IncrementalAggregateReduceFunction = { +val groupingOffsetMapping = + getGroupingOffsetAndaggOffsetMapping( +namedAggregates, +inputType, +outputType, +groupings)._1 +val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum +val reduceFunction = new IncrementalAggregateReduceFunction( + aggregates, + groupingOffsetMapping, + intermediateRowArity) +reduceFunction + } + + /** +* @return groupingOffsetMapping (mapping relation between field index of intermediate +* aggregate Row and output Row.) +* and aggOffsetMapping (the mapping relation between aggregate function index in list +* and its corresponding field index in output Row.) +*/ + def getGroupingOffsetAndaggOffsetMapping( --- End diff -- yes.i'd do it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API
[ https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688533#comment-15688533 ] ASF GitHub Bot commented on FLINK-4937: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89244761 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -115,14 +124,210 @@ object AggregateUtil { intermediateRowArity, outputType.getFieldCount) } +groupReduceFunction + } + + /** +* Create IncrementalAggregateReduceFunction for Incremental aggregates. It implement +* [[org.apache.flink.api.common.functions.ReduceFunction]] +* +*/ + private[flink] def createIncrementalAggregateReduceFunction( +aggregates: Array[Aggregate[_ <: Any]], +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): IncrementalAggregateReduceFunction = { +val groupingOffsetMapping = + getGroupingOffsetAndaggOffsetMapping( +namedAggregates, +inputType, +outputType, +groupings)._1 +val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum +val reduceFunction = new IncrementalAggregateReduceFunction( + aggregates, + groupingOffsetMapping, + intermediateRowArity) +reduceFunction + } + + /** +* @return groupingOffsetMapping (mapping relation between field index of intermediate +* aggregate Row and output Row.) +* and aggOffsetMapping (the mapping relation between aggregate function index in list +* and its corresponding field index in output Row.) +*/ + def getGroupingOffsetAndaggOffsetMapping( --- End diff -- yes.i'd do it. > Add incremental group window aggregation for streaming Table API > > > Key: FLINK-4937 > URL: https://issues.apache.org/jira/browse/FLINK-4937 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: sunjincheng > > Group-window aggregates for streaming tables are currently not done in an > incremental fashion. This means that the window collects all records and > performs the aggregation when the window is closed instead of eagerly > updating a partial aggregate for every added record. Since records are > buffered, non-incremental aggregation requires more storage space than > incremental aggregation. > The DataStream API which is used under the hood of the streaming Table API > features [incremental > aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation] > using a {{ReduceFunction}}. > We should add support for incremental aggregation in group-windows. > This is a follow-up task of FLINK-4691. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89242998 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -61,25 +61,108 @@ object AggregateUtil { * }}} * */ - def createOperatorFunctionsForAggregates( +def createOperatorFunctionsForAggregates( namedAggregates: Seq[CalcitePair[AggregateCall, String]], inputType: RelDataType, outputType: RelDataType, groupings: Array[Int]) : (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = { -val aggregateFunctionsAndFieldIndexes = - transformToAggregateFunctions(namedAggregates.map(_.getKey), inputType, groupings.length) -// store the aggregate fields of each aggregate function, by the same order of aggregates. -val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1 -val aggregates = aggregateFunctionsAndFieldIndexes._2 + val (aggFieldIndexes, aggregates) = + transformToAggregateFunctions(namedAggregates.map(_.getKey), + inputType, groupings.length) -val mapReturnType: RowTypeInfo = - createAggregateBufferDataType(groupings, aggregates, inputType) +createOperatorFunctionsForAggregates(namedAggregates, + inputType, + outputType, + groupings, + aggregates,aggFieldIndexes) +} -val mapFunction = new AggregateMapFunction[Row, Row]( -aggregates, aggFieldIndexes, groupings, - mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]] +def createOperatorFunctionsForAggregates( +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int], +aggregates:Array[Aggregate[_ <: Any]], +aggFieldIndexes:Array[Int]) +: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row])= { + + val mapFunction = createAggregateMapFunction(aggregates, +aggFieldIndexes, groupings, inputType) + + // the mapping relation between field index of intermediate aggregate Row and output Row. + val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings) + + // the mapping relation between aggregate function index in list and its corresponding + // field index in output Row. + val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType) + + if (groupingOffsetMapping.length != groupings.length || +aggOffsetMapping.length != namedAggregates.length) { +throw new TableException("Could not find output field in input data type " + + "or aggregate functions.") + } + + val allPartialAggregate = aggregates.map(_.supportPartial).forall(x => x) + + val intermediateRowArity = groupings.length + +aggregates.map(_.intermediateDataType.length).sum + + val reduceGroupFunction = +if (allPartialAggregate) { + new AggregateReduceCombineFunction( +aggregates, +groupingOffsetMapping, +aggOffsetMapping, +intermediateRowArity, +outputType.getFieldCount) +} +else { + new AggregateReduceGroupFunction( +aggregates, +groupingOffsetMapping, +aggOffsetMapping, +intermediateRowArity, +outputType.getFieldCount) +} + + (mapFunction, reduceGroupFunction) + } + + /** +* Create Flink operator functions for Incremental aggregates. +* It includes 2 implementations of Flink operator functions: +* [[org.apache.flink.api.common.functions.MapFunction]] and +* [[org.apache.flink.api.common.functions.ReduceFunction]] +* The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the +* intermediate aggregate values of all aggregate function, it's stored in Row by the following +* format: +* +* {{{ +* avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 +* | | +* v v +*+-+-+++++ +*|groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | +*+-+-+++++ +*
[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API
[ https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688485#comment-15688485 ] ASF GitHub Bot commented on FLINK-4937: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89242998 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -61,25 +61,108 @@ object AggregateUtil { * }}} * */ - def createOperatorFunctionsForAggregates( +def createOperatorFunctionsForAggregates( namedAggregates: Seq[CalcitePair[AggregateCall, String]], inputType: RelDataType, outputType: RelDataType, groupings: Array[Int]) : (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = { -val aggregateFunctionsAndFieldIndexes = - transformToAggregateFunctions(namedAggregates.map(_.getKey), inputType, groupings.length) -// store the aggregate fields of each aggregate function, by the same order of aggregates. -val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1 -val aggregates = aggregateFunctionsAndFieldIndexes._2 + val (aggFieldIndexes, aggregates) = + transformToAggregateFunctions(namedAggregates.map(_.getKey), + inputType, groupings.length) -val mapReturnType: RowTypeInfo = - createAggregateBufferDataType(groupings, aggregates, inputType) +createOperatorFunctionsForAggregates(namedAggregates, + inputType, + outputType, + groupings, + aggregates,aggFieldIndexes) +} -val mapFunction = new AggregateMapFunction[Row, Row]( -aggregates, aggFieldIndexes, groupings, - mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]] +def createOperatorFunctionsForAggregates( +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int], +aggregates:Array[Aggregate[_ <: Any]], +aggFieldIndexes:Array[Int]) +: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row])= { + + val mapFunction = createAggregateMapFunction(aggregates, +aggFieldIndexes, groupings, inputType) + + // the mapping relation between field index of intermediate aggregate Row and output Row. + val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings) + + // the mapping relation between aggregate function index in list and its corresponding + // field index in output Row. + val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType) + + if (groupingOffsetMapping.length != groupings.length || +aggOffsetMapping.length != namedAggregates.length) { +throw new TableException("Could not find output field in input data type " + + "or aggregate functions.") + } + + val allPartialAggregate = aggregates.map(_.supportPartial).forall(x => x) + + val intermediateRowArity = groupings.length + +aggregates.map(_.intermediateDataType.length).sum + + val reduceGroupFunction = +if (allPartialAggregate) { + new AggregateReduceCombineFunction( +aggregates, +groupingOffsetMapping, +aggOffsetMapping, +intermediateRowArity, +outputType.getFieldCount) +} +else { + new AggregateReduceGroupFunction( +aggregates, +groupingOffsetMapping, +aggOffsetMapping, +intermediateRowArity, +outputType.getFieldCount) +} + + (mapFunction, reduceGroupFunction) + } + + /** +* Create Flink operator functions for Incremental aggregates. +* It includes 2 implementations of Flink operator functions: +* [[org.apache.flink.api.common.functions.MapFunction]] and +* [[org.apache.flink.api.common.functions.ReduceFunction]] +* The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the +* intermediate aggregate values of all aggregate function, it's stored in Row by the following +* format: +* +* {{{ +* avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 +* | | +* v v +*
[jira] [Commented] (FLINK-4669) scala api createLocalEnvironment() function add default Configuration parameter
[ https://issues.apache.org/jira/browse/FLINK-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688370#comment-15688370 ] ASF GitHub Bot commented on FLINK-4669: --- Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/2541 @StephanEwen have any improvement needed? > scala api createLocalEnvironment() function add default Configuration > parameter > --- > > Key: FLINK-4669 > URL: https://issues.apache.org/jira/browse/FLINK-4669 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: shijinkui > > scala program can't direct use createLocalEnvironment with custom Configure > object. > such as I want to start web server in local mode, I will do such as: > ``` > // set up execution environment > val conf = new Configuration > conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) > conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, > ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT) > val env = new org.apache.flink.streaming.api.scala.StreamExecutionEnvironment( > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment(2, > conf) > ) > ``` > so we need createLocalEnvironment function have a config parameter -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2541: [FLINK-4669] scala api createLocalEnvironment() function ...
Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/2541 @StephanEwen have any improvement needed? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4910) Introduce safety net for closing file system streams
[ https://issues.apache.org/jira/browse/FLINK-4910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688284#comment-15688284 ] ASF GitHub Bot commented on FLINK-4910: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2691 > Introduce safety net for closing file system streams > > > Key: FLINK-4910 > URL: https://issues.apache.org/jira/browse/FLINK-4910 > Project: Flink > Issue Type: Improvement >Reporter: Stefan Richter >Assignee: Stefan Richter > > Streams that are opened through {{FileSystem}} must be closed at the end of > their life cycle. However, we found hints that some code forgets to close > such streams. > We should introduce i) a mechanism that closes leaking unclosed streams after > usage and ii) provides logging that helps us to track down and fi the sources > of such leaks. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5107) Job Manager goes out of memory from long history of prior execution attempts
[ https://issues.apache.org/jira/browse/FLINK-5107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688285#comment-15688285 ] ASF GitHub Bot commented on FLINK-5107: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2837 > Job Manager goes out of memory from long history of prior execution attempts > > > Key: FLINK-5107 > URL: https://issues.apache.org/jira/browse/FLINK-5107 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Stefan Richter >Assignee: Stefan Richter > > We have observed that the job manager can run out of memory during long > running jobs with many vertexes. Analysis of the heap dump shows, that the > ever-growing history of prior execution attempts is the culprit for this > problem. > We should limit this history to a number of n most recent attempts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4910) Introduce safety net for closing file system streams
[ https://issues.apache.org/jira/browse/FLINK-4910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-4910. -- Resolution: Fixed Fixed via ba8ed263695d16eacb4bdfdf195dd22c83bb53ed > Introduce safety net for closing file system streams > > > Key: FLINK-4910 > URL: https://issues.apache.org/jira/browse/FLINK-4910 > Project: Flink > Issue Type: Improvement >Reporter: Stefan Richter >Assignee: Stefan Richter > > Streams that are opened through {{FileSystem}} must be closed at the end of > their life cycle. However, we found hints that some code forgets to close > such streams. > We should introduce i) a mechanism that closes leaking unclosed streams after > usage and ii) provides logging that helps us to track down and fi the sources > of such leaks. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2837: [FLINK-5107] Introduced limit for prior execution ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2837 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2691: [FLINK-4910] Introduce safety net for closing file...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2691 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-5107) Job Manager goes out of memory from long history of prior execution attempts
[ https://issues.apache.org/jira/browse/FLINK-5107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-5107. -- Resolution: Fixed Fixed via f5af7f1025aac9cac3f83de5f0e3aece5730ec0f > Job Manager goes out of memory from long history of prior execution attempts > > > Key: FLINK-5107 > URL: https://issues.apache.org/jira/browse/FLINK-5107 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Stefan Richter >Assignee: Stefan Richter > > We have observed that the job manager can run out of memory during long > running jobs with many vertexes. Analysis of the heap dump shows, that the > ever-growing history of prior execution attempts is the culprit for this > problem. > We should limit this history to a number of n most recent attempts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2837: [FLINK-5107] Introduced limit for prior execution attempt...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2837 Thanks for the contribution @StefanRRichter. Failing tests are unrelated. Merging this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4910) Introduce safety net for closing file system streams
[ https://issues.apache.org/jira/browse/FLINK-4910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688084#comment-15688084 ] ASF GitHub Bot commented on FLINK-4910: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2691 Thanks for the great contribution @StefanRRichter. Merging this PR. > Introduce safety net for closing file system streams > > > Key: FLINK-4910 > URL: https://issues.apache.org/jira/browse/FLINK-4910 > Project: Flink > Issue Type: Improvement >Reporter: Stefan Richter >Assignee: Stefan Richter > > Streams that are opened through {{FileSystem}} must be closed at the end of > their life cycle. However, we found hints that some code forgets to close > such streams. > We should introduce i) a mechanism that closes leaking unclosed streams after > usage and ii) provides logging that helps us to track down and fi the sources > of such leaks. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5107) Job Manager goes out of memory from long history of prior execution attempts
[ https://issues.apache.org/jira/browse/FLINK-5107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688088#comment-15688088 ] ASF GitHub Bot commented on FLINK-5107: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2837 Thanks for the contribution @StefanRRichter. Failing tests are unrelated. Merging this PR. > Job Manager goes out of memory from long history of prior execution attempts > > > Key: FLINK-5107 > URL: https://issues.apache.org/jira/browse/FLINK-5107 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Stefan Richter >Assignee: Stefan Richter > > We have observed that the job manager can run out of memory during long > running jobs with many vertexes. Analysis of the heap dump shows, that the > ever-growing history of prior execution attempts is the culprit for this > problem. > We should limit this history to a number of n most recent attempts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2691: [FLINK-4910] Introduce safety net for closing file system...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2691 Thanks for the great contribution @StefanRRichter. Merging this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-5082) Pull ExecutionService lifecycle management out of the JobManager
[ https://issues.apache.org/jira/browse/FLINK-5082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-5082. Resolution: Fixed Fixed in 1.2 via ae4b274a9919d01a236df4e819a0a07c5d8543ac Fixed in 1.1.4 via 7fb71c5bf1aab85250bf29bd0ea0654079cea48f > Pull ExecutionService lifecycle management out of the JobManager > > > Key: FLINK-5082 > URL: https://issues.apache.org/jira/browse/FLINK-5082 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.2.0, 1.1.3 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.2.0, 1.1.4 > > > The {{JobManager}} receives an {{ExecutorService}} to run its futures as a > constructor parameter. Even though the {{ExecutorService}} comes from > outside, the {{JobManager}} shuts the executor service down if the > {{JobManager}} terminates. This is clearly a sub-optimal behaviour leading > also to {{RejectedExecutionExceptions}}. > I propose to move the {{ExecutorService}} lifecycle management out of the > {{JobManager}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688077#comment-15688077 ] ASF GitHub Bot commented on FLINK-3257: --- Github user senorcarbone commented on the issue: https://github.com/apache/flink/pull/1668 Ok, so I am progressing this a bit independently from the termination stuff and then we rebase to the first PR that is merged. I just changed everything and rebased to the current master. Some notable changes: - The `StreamIterationCheckpointingITCase` is not made deterministic, it fails after the first successful checkpoint once and the jobs stops after everything has been recovered appropriately. - I am now using ListState which is supposed to work like a charm with the rocksdb file backend. Note that with the default in-memory backend there is a high chance to get issues given the low memory capacity that it is given by default. - One tricky part that can be potentially done better is the way I set the logger in the StreamIterationHead (had to change the head op field access to `protected` in the OperatorChain) Whenever you find time go ahead and check it out. It passes my super-strict test which is a good thing. :) > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...
Github user senorcarbone commented on the issue: https://github.com/apache/flink/pull/1668 Ok, so I am progressing this a bit independently from the termination stuff and then we rebase to the first PR that is merged. I just changed everything and rebased to the current master. Some notable changes: - The `StreamIterationCheckpointingITCase` is not made deterministic, it fails after the first successful checkpoint once and the jobs stops after everything has been recovered appropriately. - I am now using ListState which is supposed to work like a charm with the rocksdb file backend. Note that with the default in-memory backend there is a high chance to get issues given the low memory capacity that it is given by default. - One tricky part that can be potentially done better is the way I set the logger in the StreamIterationHead (had to change the head op field access to `protected` in the OperatorChain) Whenever you find time go ahead and check it out. It passes my super-strict test which is a good thing. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2820: [FLINK-5082] Pull ExecutorService lifecycle manage...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2820 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2815: [FLINK-5073] Use Executor to run ZooKeeper callbac...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2815 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-5085) Execute CheckpointCoodinator's state discard calls asynchronously
[ https://issues.apache.org/jira/browse/FLINK-5085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-5085. Resolution: Fixed Fixed in 1.2 via c590912c93a4059b40452dfa6cffbdd4d58cac13 Fixed in 1.1.4 via cf4b221270cff3541bea318f907f9d8207b2fa4d > Execute CheckpointCoodinator's state discard calls asynchronously > - > > Key: FLINK-5085 > URL: https://issues.apache.org/jira/browse/FLINK-5085 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0, 1.1.3 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.2.0, 1.1.4 > > > The {{CheckpointCoordinator}} discards under certain circumstances pending > checkpoints or state handles. These discard operations can involve a blocking > IO operation if the underlying state handle refers to a file which has to be > deleted. In order to not block the calling thread, we should execute these > calls in a dedicated IO executor. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2825: [FLINK-5085] Execute CheckpointCoordinator's state...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2825 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-5073) ZooKeeperCompleteCheckpointStore executes blocking delete operation in ZooKeeper client thread
[ https://issues.apache.org/jira/browse/FLINK-5073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-5073. Resolution: Fixed Fixed in 1.2 via 3fb92d8687f03c1fac8b87396b2b5a7ff29f6dd6 Fixed in 1.1.4 via f2e4c193e1fb6b0cf26861bc01c2f3d6bcd4d8f6 > ZooKeeperCompleteCheckpointStore executes blocking delete operation in > ZooKeeper client thread > -- > > Key: FLINK-5073 > URL: https://issues.apache.org/jira/browse/FLINK-5073 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.2.0, 1.1.3 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.2.0, 1.1.4 > > > When deleting completed checkpoints from the > {{ZooKeeperCompletedCheckpointStore}}, one first tries to delete the meta > state handle from ZooKeeper and then deletes the actual checkpoint in a > callback from the delete operation. This callback is executed by the > ZooKeeper client's main thread which is problematic, because it blocks the > ZooKeeper client. If a delete operation takes longer than it takes to > complete a checkpoint, then it might even happen that delete operations of > outdated checkpoints are piling up because they are effectively executed > sequentially. > I propose to execute the delete operations by a dedicated {{Executor}} so > that we keep the client's main thread free to do ZooKeeper related work. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5082) Pull ExecutionService lifecycle management out of the JobManager
[ https://issues.apache.org/jira/browse/FLINK-5082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688071#comment-15688071 ] ASF GitHub Bot commented on FLINK-5082: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2820 > Pull ExecutionService lifecycle management out of the JobManager > > > Key: FLINK-5082 > URL: https://issues.apache.org/jira/browse/FLINK-5082 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.2.0, 1.1.3 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.2.0, 1.1.4 > > > The {{JobManager}} receives an {{ExecutorService}} to run its futures as a > constructor parameter. Even though the {{ExecutorService}} comes from > outside, the {{JobManager}} shuts the executor service down if the > {{JobManager}} terminates. This is clearly a sub-optimal behaviour leading > also to {{RejectedExecutionExceptions}}. > I propose to move the {{ExecutorService}} lifecycle management out of the > {{JobManager}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5073) ZooKeeperCompleteCheckpointStore executes blocking delete operation in ZooKeeper client thread
[ https://issues.apache.org/jira/browse/FLINK-5073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688072#comment-15688072 ] ASF GitHub Bot commented on FLINK-5073: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2815 > ZooKeeperCompleteCheckpointStore executes blocking delete operation in > ZooKeeper client thread > -- > > Key: FLINK-5073 > URL: https://issues.apache.org/jira/browse/FLINK-5073 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.2.0, 1.1.3 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.2.0, 1.1.4 > > > When deleting completed checkpoints from the > {{ZooKeeperCompletedCheckpointStore}}, one first tries to delete the meta > state handle from ZooKeeper and then deletes the actual checkpoint in a > callback from the delete operation. This callback is executed by the > ZooKeeper client's main thread which is problematic, because it blocks the > ZooKeeper client. If a delete operation takes longer than it takes to > complete a checkpoint, then it might even happen that delete operations of > outdated checkpoints are piling up because they are effectively executed > sequentially. > I propose to execute the delete operations by a dedicated {{Executor}} so > that we keep the client's main thread free to do ZooKeeper related work. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5085) Execute CheckpointCoodinator's state discard calls asynchronously
[ https://issues.apache.org/jira/browse/FLINK-5085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688042#comment-15688042 ] ASF GitHub Bot commented on FLINK-5085: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2825 Build passed locally https://travis-ci.org/tillrohrmann/flink/builds/178011045. Merging this PR. > Execute CheckpointCoodinator's state discard calls asynchronously > - > > Key: FLINK-5085 > URL: https://issues.apache.org/jira/browse/FLINK-5085 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0, 1.1.3 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.2.0, 1.1.4 > > > The {{CheckpointCoordinator}} discards under certain circumstances pending > checkpoints or state handles. These discard operations can involve a blocking > IO operation if the underlying state handle refers to a file which has to be > deleted. In order to not block the calling thread, we should execute these > calls in a dedicated IO executor. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2825: [FLINK-5085] Execute CheckpointCoordinator's state discar...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2825 Build passed locally https://travis-ci.org/tillrohrmann/flink/builds/178011045. Merging this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Comment Edited] (FLINK-4848) keystoreFilePath should be checked against null in SSLUtils#createSSLServerContext
[ https://issues.apache.org/jira/browse/FLINK-4848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582930#comment-15582930 ] Ted Yu edited comment on FLINK-4848 at 11/22/16 9:32 PM: - There is similar issue with trustStoreFilePath: {code} trustStoreFile = new FileInputStream(new File(trustStoreFilePath)); {code} was (Author: yuzhih...@gmail.com): There is similar issue with trustStoreFilePath: {code} trustStoreFile = new FileInputStream(new File(trustStoreFilePath)); {code} > keystoreFilePath should be checked against null in > SSLUtils#createSSLServerContext > -- > > Key: FLINK-4848 > URL: https://issues.apache.org/jira/browse/FLINK-4848 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > String keystoreFilePath = sslConfig.getString( > ConfigConstants.SECURITY_SSL_KEYSTORE, > null); > ... > try { > keyStoreFile = new FileInputStream(new File(keystoreFilePath)); > {code} > If keystoreFilePath is null, the File ctor would throw NPE. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4999) Improve accuracy of SingleInputGate#getNumberOfQueuedBuffers
[ https://issues.apache.org/jira/browse/FLINK-4999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu resolved FLINK-4999. --- Resolution: Won't Fix > Improve accuracy of SingleInputGate#getNumberOfQueuedBuffers > > > Key: FLINK-4999 > URL: https://issues.apache.org/jira/browse/FLINK-4999 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > // re-try 3 times, if fails, return 0 for "unknown" > for (int retry = 0; retry < 3; retry++) { > ... > catch (Exception ignored) {} > {code} > There is no synchronization around accessing inputChannels currently. > Therefore the method expects potential exception. > Upon the 3rd try, synchronization should be taken w.r.t. inputChannels so > that the return value is accurate. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5050) JSON.org license is CatX
[ https://issues.apache.org/jira/browse/FLINK-5050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15687938#comment-15687938 ] ASF GitHub Bot commented on FLINK-5050: --- Github user tedyu commented on the issue: https://github.com/apache/flink/pull/2824 lgtm > JSON.org license is CatX > > > Key: FLINK-5050 > URL: https://issues.apache.org/jira/browse/FLINK-5050 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: Sergey Sokur > > We should exclude org.json:json dependency from hive-exec dependency. > {code} > [INFO] +- org.apache.flink:flink-java:jar:1.2-SNAPSHOT:provided > ... > [INFO] +- org.apache.hive.hcatalog:hcatalog-core:jar:0.12.0:compile > ... > [INFO] | +- org.apache.hive:hive-exec:jar:0.12.0:compile > [INFO] | | +- com.google.protobuf:protobuf-java:jar:2.4.1:compile > [INFO] | | +- org.iq80.snappy:snappy:jar:0.2:compile > [INFO] | | +- org.json:json:jar:20090211:compile > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2824: [FLINK-5050] JSON.org license is CatX
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/2824 lgtm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5097) The TypeExtractor is missing input type information in some Graph methods
[ https://issues.apache.org/jira/browse/FLINK-5097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15687902#comment-15687902 ] ASF GitHub Bot commented on FLINK-5097: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/2842 @vasia should `Graph` be using the function specific `TypeExtractor.getMapReturnTypes` (not sure why this is plural) and similar where core Functions are used? For `Gelly` functions, I think you are right that this capability is lacking, i.e. `NeighborsFunctionWithVertexValue` has three input types and `TypeExtractor` only provides `getUnaryOperatorReturnType` and `getBinaryOperatorReturnType`? It also looks like `TypeExtractor` expects a specific though natural ordering (input types before output types). @twalthr what do you recommend? > The TypeExtractor is missing input type information in some Graph methods > - > > Key: FLINK-5097 > URL: https://issues.apache.org/jira/browse/FLINK-5097 > Project: Flink > Issue Type: Bug > Components: Gelly >Reporter: Vasia Kalavri >Assignee: Vasia Kalavri > > The TypeExtractor is called without information about the input type in > {{mapVertices}} and {{mapEdges}} although this information can be easily > retrieved. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2842: [FLINK-5097][gelly] Add missing input type information to...
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/2842 @vasia should `Graph` be using the function specific `TypeExtractor.getMapReturnTypes` (not sure why this is plural) and similar where core Functions are used? For `Gelly` functions, I think you are right that this capability is lacking, i.e. `NeighborsFunctionWithVertexValue` has three input types and `TypeExtractor` only provides `getUnaryOperatorReturnType` and `getBinaryOperatorReturnType`? It also looks like `TypeExtractor` expects a specific though natural ordering (input types before output types). @twalthr what do you recommend? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2851: [FLINK-5124] [table] Support more temporal arithme...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2851#discussion_r89192379 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala --- @@ -656,38 +656,56 @@ object ScalarOperators { right: GeneratedExpression) : GeneratedExpression = { -val operator = if (plus) "+" else "-" +val op = if (plus) "+" else "-" (left.resultType, right.resultType) match { case (l: TimeIntervalTypeInfo[_], r: TimeIntervalTypeInfo[_]) if l == r => -generateArithmeticOperator(operator, nullCheck, l, left, right) +generateArithmeticOperator(op, nullCheck, l, left, right) case (SqlTimeTypeInfo.DATE, TimeIntervalTypeInfo.INTERVAL_MILLIS) | (TimeIntervalTypeInfo.INTERVAL_MILLIS, SqlTimeTypeInfo.DATE) => generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.DATE, left, right) { if (isTimePoint(left.resultType)) { -(leftTerm, rightTerm) => s"$leftTerm + ((int) ($rightTerm / ${MILLIS_PER_DAY}L))" +(leftTerm, rightTerm) => + s"$leftTerm $op ((int) ($rightTerm / ${MILLIS_PER_DAY}L))" } else { -(leftTerm, rightTerm) => s"((int) ($leftTerm / ${MILLIS_PER_DAY}L)) + $rightTerm" +(leftTerm, rightTerm) => + s"((int) ($leftTerm / ${MILLIS_PER_DAY}L)) $op $rightTerm" + } +} + + case (SqlTimeTypeInfo.DATE, TimeIntervalTypeInfo.INTERVAL_MONTHS) | + (TimeIntervalTypeInfo.INTERVAL_MONTHS, SqlTimeTypeInfo.DATE) => +generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.DATE, left, right) { + if (isTimePoint(left.resultType)) { +(leftTerm, rightTerm) => + s"${qualifyMethod(BuiltInMethod.ADD_MONTHS.method)}($leftTerm, $op($rightTerm))" + } else { +(leftTerm, rightTerm) => + s"${qualifyMethod(BuiltInMethod.ADD_MONTHS.method)}($rightTerm, $op($leftTerm))" } } case (SqlTimeTypeInfo.TIME, TimeIntervalTypeInfo.INTERVAL_MILLIS) | (TimeIntervalTypeInfo.INTERVAL_MILLIS, SqlTimeTypeInfo.TIME) => generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.TIME, left, right) { if (isTimePoint(left.resultType)) { -(leftTerm, rightTerm) => s"$leftTerm + ((int) ($rightTerm))" +(leftTerm, rightTerm) => s"$leftTerm $op ((int) ($rightTerm))" } else { -(leftTerm, rightTerm) => s"((int) ($leftTerm)) + $rightTerm" +(leftTerm, rightTerm) => s"((int) ($leftTerm)) $op $rightTerm" } } case (SqlTimeTypeInfo.TIMESTAMP, TimeIntervalTypeInfo.INTERVAL_MILLIS) => --- End diff -- Don't we need the inverse case `(TimeIntervalTypeInfo.INTERVAL_MILLIS, SqlTimeTypeInfo.TIMESTAMP)` as well? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5124) Support more temporal arithmetic
[ https://issues.apache.org/jira/browse/FLINK-5124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15687674#comment-15687674 ] ASF GitHub Bot commented on FLINK-5124: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2851#discussion_r89192110 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala --- @@ -656,38 +656,56 @@ object ScalarOperators { right: GeneratedExpression) : GeneratedExpression = { -val operator = if (plus) "+" else "-" +val op = if (plus) "+" else "-" (left.resultType, right.resultType) match { case (l: TimeIntervalTypeInfo[_], r: TimeIntervalTypeInfo[_]) if l == r => -generateArithmeticOperator(operator, nullCheck, l, left, right) +generateArithmeticOperator(op, nullCheck, l, left, right) case (SqlTimeTypeInfo.DATE, TimeIntervalTypeInfo.INTERVAL_MILLIS) | (TimeIntervalTypeInfo.INTERVAL_MILLIS, SqlTimeTypeInfo.DATE) => generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.DATE, left, right) { if (isTimePoint(left.resultType)) { --- End diff -- Wouldn't it be easier to read if this case would be handled by the outer pattern matching with separate cases for `(SqlTimeTypeInfo.DATE, TimeIntervalTypeInfo.INTERVAL_MILLIS)` and `(TimeIntervalTypeInfo.INTERVAL_MILLIS, SqlTimeTypeInfo.DATE)`? > Support more temporal arithmetic > > > Key: FLINK-5124 > URL: https://issues.apache.org/jira/browse/FLINK-5124 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Multiple TPC-H queries fail because of missing temporal arithmetic support. > Since CALCITE-308 has been fixed we can add additional operations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5124) Support more temporal arithmetic
[ https://issues.apache.org/jira/browse/FLINK-5124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15687675#comment-15687675 ] ASF GitHub Bot commented on FLINK-5124: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2851#discussion_r89192420 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala --- @@ -656,38 +656,56 @@ object ScalarOperators { right: GeneratedExpression) : GeneratedExpression = { -val operator = if (plus) "+" else "-" +val op = if (plus) "+" else "-" (left.resultType, right.resultType) match { case (l: TimeIntervalTypeInfo[_], r: TimeIntervalTypeInfo[_]) if l == r => -generateArithmeticOperator(operator, nullCheck, l, left, right) +generateArithmeticOperator(op, nullCheck, l, left, right) case (SqlTimeTypeInfo.DATE, TimeIntervalTypeInfo.INTERVAL_MILLIS) | (TimeIntervalTypeInfo.INTERVAL_MILLIS, SqlTimeTypeInfo.DATE) => generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.DATE, left, right) { if (isTimePoint(left.resultType)) { -(leftTerm, rightTerm) => s"$leftTerm + ((int) ($rightTerm / ${MILLIS_PER_DAY}L))" +(leftTerm, rightTerm) => + s"$leftTerm $op ((int) ($rightTerm / ${MILLIS_PER_DAY}L))" } else { -(leftTerm, rightTerm) => s"((int) ($leftTerm / ${MILLIS_PER_DAY}L)) + $rightTerm" +(leftTerm, rightTerm) => + s"((int) ($leftTerm / ${MILLIS_PER_DAY}L)) $op $rightTerm" + } +} + + case (SqlTimeTypeInfo.DATE, TimeIntervalTypeInfo.INTERVAL_MONTHS) | + (TimeIntervalTypeInfo.INTERVAL_MONTHS, SqlTimeTypeInfo.DATE) => +generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.DATE, left, right) { + if (isTimePoint(left.resultType)) { +(leftTerm, rightTerm) => + s"${qualifyMethod(BuiltInMethod.ADD_MONTHS.method)}($leftTerm, $op($rightTerm))" + } else { +(leftTerm, rightTerm) => + s"${qualifyMethod(BuiltInMethod.ADD_MONTHS.method)}($rightTerm, $op($leftTerm))" } } case (SqlTimeTypeInfo.TIME, TimeIntervalTypeInfo.INTERVAL_MILLIS) | (TimeIntervalTypeInfo.INTERVAL_MILLIS, SqlTimeTypeInfo.TIME) => generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.TIME, left, right) { if (isTimePoint(left.resultType)) { -(leftTerm, rightTerm) => s"$leftTerm + ((int) ($rightTerm))" +(leftTerm, rightTerm) => s"$leftTerm $op ((int) ($rightTerm))" } else { -(leftTerm, rightTerm) => s"((int) ($leftTerm)) + $rightTerm" +(leftTerm, rightTerm) => s"((int) ($leftTerm)) $op $rightTerm" } } case (SqlTimeTypeInfo.TIMESTAMP, TimeIntervalTypeInfo.INTERVAL_MILLIS) => generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.TIMESTAMP, left, right) { - (leftTerm, rightTerm) => s"$leftTerm + $rightTerm" + (leftTerm, rightTerm) => s"$leftTerm $op $rightTerm" } - // TODO more operations when CALCITE-308 is fixed + case (SqlTimeTypeInfo.TIMESTAMP, TimeIntervalTypeInfo.INTERVAL_MONTHS) => --- End diff -- Do we need the inverse case? > Support more temporal arithmetic > > > Key: FLINK-5124 > URL: https://issues.apache.org/jira/browse/FLINK-5124 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Multiple TPC-H queries fail because of missing temporal arithmetic support. > Since CALCITE-308 has been fixed we can add additional operations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2851: [FLINK-5124] [table] Support more temporal arithme...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2851#discussion_r89192110 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala --- @@ -656,38 +656,56 @@ object ScalarOperators { right: GeneratedExpression) : GeneratedExpression = { -val operator = if (plus) "+" else "-" +val op = if (plus) "+" else "-" (left.resultType, right.resultType) match { case (l: TimeIntervalTypeInfo[_], r: TimeIntervalTypeInfo[_]) if l == r => -generateArithmeticOperator(operator, nullCheck, l, left, right) +generateArithmeticOperator(op, nullCheck, l, left, right) case (SqlTimeTypeInfo.DATE, TimeIntervalTypeInfo.INTERVAL_MILLIS) | (TimeIntervalTypeInfo.INTERVAL_MILLIS, SqlTimeTypeInfo.DATE) => generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.DATE, left, right) { if (isTimePoint(left.resultType)) { --- End diff -- Wouldn't it be easier to read if this case would be handled by the outer pattern matching with separate cases for `(SqlTimeTypeInfo.DATE, TimeIntervalTypeInfo.INTERVAL_MILLIS)` and `(TimeIntervalTypeInfo.INTERVAL_MILLIS, SqlTimeTypeInfo.DATE)`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5124) Support more temporal arithmetic
[ https://issues.apache.org/jira/browse/FLINK-5124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15687673#comment-15687673 ] ASF GitHub Bot commented on FLINK-5124: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2851#discussion_r89192379 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala --- @@ -656,38 +656,56 @@ object ScalarOperators { right: GeneratedExpression) : GeneratedExpression = { -val operator = if (plus) "+" else "-" +val op = if (plus) "+" else "-" (left.resultType, right.resultType) match { case (l: TimeIntervalTypeInfo[_], r: TimeIntervalTypeInfo[_]) if l == r => -generateArithmeticOperator(operator, nullCheck, l, left, right) +generateArithmeticOperator(op, nullCheck, l, left, right) case (SqlTimeTypeInfo.DATE, TimeIntervalTypeInfo.INTERVAL_MILLIS) | (TimeIntervalTypeInfo.INTERVAL_MILLIS, SqlTimeTypeInfo.DATE) => generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.DATE, left, right) { if (isTimePoint(left.resultType)) { -(leftTerm, rightTerm) => s"$leftTerm + ((int) ($rightTerm / ${MILLIS_PER_DAY}L))" +(leftTerm, rightTerm) => + s"$leftTerm $op ((int) ($rightTerm / ${MILLIS_PER_DAY}L))" } else { -(leftTerm, rightTerm) => s"((int) ($leftTerm / ${MILLIS_PER_DAY}L)) + $rightTerm" +(leftTerm, rightTerm) => + s"((int) ($leftTerm / ${MILLIS_PER_DAY}L)) $op $rightTerm" + } +} + + case (SqlTimeTypeInfo.DATE, TimeIntervalTypeInfo.INTERVAL_MONTHS) | + (TimeIntervalTypeInfo.INTERVAL_MONTHS, SqlTimeTypeInfo.DATE) => +generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.DATE, left, right) { + if (isTimePoint(left.resultType)) { +(leftTerm, rightTerm) => + s"${qualifyMethod(BuiltInMethod.ADD_MONTHS.method)}($leftTerm, $op($rightTerm))" + } else { +(leftTerm, rightTerm) => + s"${qualifyMethod(BuiltInMethod.ADD_MONTHS.method)}($rightTerm, $op($leftTerm))" } } case (SqlTimeTypeInfo.TIME, TimeIntervalTypeInfo.INTERVAL_MILLIS) | (TimeIntervalTypeInfo.INTERVAL_MILLIS, SqlTimeTypeInfo.TIME) => generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.TIME, left, right) { if (isTimePoint(left.resultType)) { -(leftTerm, rightTerm) => s"$leftTerm + ((int) ($rightTerm))" +(leftTerm, rightTerm) => s"$leftTerm $op ((int) ($rightTerm))" } else { -(leftTerm, rightTerm) => s"((int) ($leftTerm)) + $rightTerm" +(leftTerm, rightTerm) => s"((int) ($leftTerm)) $op $rightTerm" } } case (SqlTimeTypeInfo.TIMESTAMP, TimeIntervalTypeInfo.INTERVAL_MILLIS) => --- End diff -- Don't we need the inverse case `(TimeIntervalTypeInfo.INTERVAL_MILLIS, SqlTimeTypeInfo.TIMESTAMP)` as well? > Support more temporal arithmetic > > > Key: FLINK-5124 > URL: https://issues.apache.org/jira/browse/FLINK-5124 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Multiple TPC-H queries fail because of missing temporal arithmetic support. > Since CALCITE-308 has been fixed we can add additional operations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4895) Drop support for Hadoop 1
[ https://issues.apache.org/jira/browse/FLINK-4895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15687642#comment-15687642 ] ASF GitHub Bot commented on FLINK-4895: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2850 I haven't tried it yet. > Drop support for Hadoop 1 > - > > Key: FLINK-4895 > URL: https://issues.apache.org/jira/browse/FLINK-4895 > Project: Flink > Issue Type: Task > Components: Build System >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Robert Metzger > > As per this mailing list discussion: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DISCUSS-Drop-Hadoop-1-support-with-Flink-1-2-td9530.html > the community agreed to drop support for Hadoop 1. > The task includes > - removing the hadoop-1 / hadoop-2 build profiles, > - removing the scripts for generating hadoop-x poms > - updating the release script > - updating the nightly build script > - updating the travis configuration file > - updating the documentation > - updating the website -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2850: [FLINK-4895] Drop Hadoop1 support
Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2850 I haven't tried it yet. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2664: [FLINK-4861] [build] Package optional project artifacts
Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2664 Thank you for fixing the issue so quickly. I'm wondering whether the current approach is a good idea, because it requires manual checking of all transitive dependencies. We have something similar (=manual as well) in the quickstart, where we exclude some artifact, and it doesn't really happen that people update that list of excludes. So I fear that if somebody changes a connector for example, it will not be added here. I guess adding an assembly descriptor for each connector / module would solve the problem with the transitive dependencies. I don't know if there's a more efficient approach. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4861) Package optional project artifacts
[ https://issues.apache.org/jira/browse/FLINK-4861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15687629#comment-15687629 ] ASF GitHub Bot commented on FLINK-4861: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2664 Thank you for fixing the issue so quickly. I'm wondering whether the current approach is a good idea, because it requires manual checking of all transitive dependencies. We have something similar (=manual as well) in the quickstart, where we exclude some artifact, and it doesn't really happen that people update that list of excludes. So I fear that if somebody changes a connector for example, it will not be added here. I guess adding an assembly descriptor for each connector / module would solve the problem with the transitive dependencies. I don't know if there's a more efficient approach. What do you think? > Package optional project artifacts > -- > > Key: FLINK-4861 > URL: https://issues.apache.org/jira/browse/FLINK-4861 > Project: Flink > Issue Type: New Feature > Components: Build System >Affects Versions: 1.2.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.2.0 > > > Per the mailing list > [discussion|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Additional-project-downloads-td13223.html], > package the Flink libraries and connectors into subdirectories of a new > {{opt}} directory in the release/snapshot tarballs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2852: Fix misprint in condition
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2852 Thanks for the fix @BorisOsipov! +1 to merge --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4861) Package optional project artifacts
[ https://issues.apache.org/jira/browse/FLINK-4861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15687577#comment-15687577 ] ASF GitHub Bot commented on FLINK-4861: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/2664 @rmetzger it appears that project artifacts are not included as transitive dependencies and I had overlooked 0.9 as a dependency for the 0.10 connector. After correcting this the provided hierarchy is as follows: ``` opt/connectors/streaming/kafka-0.10 opt/connectors/streaming/kafka-0.10/flink-connector-kafka-0.10_2.10-1.2-SNAPSHOT.jar opt/connectors/streaming/kafka-0.10/flink-connector-kafka-0.9_2.10-1.2-SNAPSHOT.jar opt/connectors/streaming/kafka-0.10/flink-connector-kafka-base_2.10-1.2-SNAPSHOT.jar opt/connectors/streaming/kafka-0.10/kafka-clients-0.9.0.1.jar opt/connectors/streaming/kafka-0.10/lz4-1.2.0.jar opt/connectors/streaming/kafka-0.8 opt/connectors/streaming/kafka-0.8/flink-connector-kafka-0.8_2.10-1.2-SNAPSHOT.jar opt/connectors/streaming/kafka-0.8/flink-connector-kafka-base_2.10-1.2-SNAPSHOT.jar opt/connectors/streaming/kafka-0.8/kafka_2.10-0.8.2.2.jar opt/connectors/streaming/kafka-0.8/scala-library-2.10.4.jar opt/connectors/streaming/kafka-0.8/zkclient-0.3.jar opt/connectors/streaming/kafka-0.9 opt/connectors/streaming/kafka-0.9/flink-connector-kafka-0.9_2.10-1.2-SNAPSHOT.jar opt/connectors/streaming/kafka-0.9/flink-connector-kafka-base_2.10-1.2-SNAPSHOT.jar opt/connectors/streaming/kafka-0.9/kafka-clients-0.9.0.1.jar opt/connectors/streaming/kafka-0.9/lz4-1.2.0.jar ``` > Package optional project artifacts > -- > > Key: FLINK-4861 > URL: https://issues.apache.org/jira/browse/FLINK-4861 > Project: Flink > Issue Type: New Feature > Components: Build System >Affects Versions: 1.2.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.2.0 > > > Per the mailing list > [discussion|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Additional-project-downloads-td13223.html], > package the Flink libraries and connectors into subdirectories of a new > {{opt}} directory in the release/snapshot tarballs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2664: [FLINK-4861] [build] Package optional project artifacts
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/2664 @rmetzger it appears that project artifacts are not included as transitive dependencies and I had overlooked 0.9 as a dependency for the 0.10 connector. After correcting this the provided hierarchy is as follows: ``` opt/connectors/streaming/kafka-0.10 opt/connectors/streaming/kafka-0.10/flink-connector-kafka-0.10_2.10-1.2-SNAPSHOT.jar opt/connectors/streaming/kafka-0.10/flink-connector-kafka-0.9_2.10-1.2-SNAPSHOT.jar opt/connectors/streaming/kafka-0.10/flink-connector-kafka-base_2.10-1.2-SNAPSHOT.jar opt/connectors/streaming/kafka-0.10/kafka-clients-0.9.0.1.jar opt/connectors/streaming/kafka-0.10/lz4-1.2.0.jar opt/connectors/streaming/kafka-0.8 opt/connectors/streaming/kafka-0.8/flink-connector-kafka-0.8_2.10-1.2-SNAPSHOT.jar opt/connectors/streaming/kafka-0.8/flink-connector-kafka-base_2.10-1.2-SNAPSHOT.jar opt/connectors/streaming/kafka-0.8/kafka_2.10-0.8.2.2.jar opt/connectors/streaming/kafka-0.8/scala-library-2.10.4.jar opt/connectors/streaming/kafka-0.8/zkclient-0.3.jar opt/connectors/streaming/kafka-0.9 opt/connectors/streaming/kafka-0.9/flink-connector-kafka-0.9_2.10-1.2-SNAPSHOT.jar opt/connectors/streaming/kafka-0.9/flink-connector-kafka-base_2.10-1.2-SNAPSHOT.jar opt/connectors/streaming/kafka-0.9/kafka-clients-0.9.0.1.jar opt/connectors/streaming/kafka-0.9/lz4-1.2.0.jar ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---