[jira] [Updated] (FLINK-5132) Introduce the ResourceSpec for grouping different resource factors in API
[ https://issues.apache.org/jira/browse/FLINK-5132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5132: - Description: This is part of the fine-grained resource configuration. The current resource factors include cpu cores, heap memory, direct memory, native memory and state size. The *ResourceSpec* will provide some basic constructions for grouping different resource factors as needed and the construction can also be expanded easily for further requirements. > Introduce the ResourceSpec for grouping different resource factors in API > - > > Key: FLINK-5132 > URL: https://issues.apache.org/jira/browse/FLINK-5132 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: Zhijiang Wang > > This is part of the fine-grained resource configuration. > The current resource factors include cpu cores, heap memory, direct memory, > native memory and state size. > The *ResourceSpec* will provide some basic constructions for grouping > different resource factors as needed and the construction can also be > expanded easily for further requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5132) Introduce the ResourceSpec for grouping different resource factors in API
[ https://issues.apache.org/jira/browse/FLINK-5132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5132: - Description: This is part of the fine-grained resource configuration. The current resource factors include cpu cores, heap memory, direct memory, native memory and state size. The *ResourceSpec* will provide some basic constructions for grouping different resource factors as needed and the construction can also be expanded easily for further requirements. was: This is part of the fine-grained resource configuration. The current resource factors include cpu cores, heap memory, direct memory, native memory and state size. The *ResourceSpec* will provide some basic constructions for grouping different resource factors as needed and the construction can also be expanded easily for further requirements. > Introduce the ResourceSpec for grouping different resource factors in API > - > > Key: FLINK-5132 > URL: https://issues.apache.org/jira/browse/FLINK-5132 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: Zhijiang Wang > > This is part of the fine-grained resource configuration. > The current resource factors include cpu cores, heap memory, direct memory, > native memory and state size. > The *ResourceSpec* will provide some basic constructions for grouping > different resource factors as needed and the construction can also be > expanded easily for further requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688809#comment-15688809 ] ASF GitHub Bot commented on FLINK-5023: --- Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/2768 @aljoscha Thanks for your review. I have updated the PR according to your suggestion. > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2768: [FLINK-5023][FLINK-5024] Add SimpleStateDescriptor to cla...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/2768 @aljoscha Thanks for your review. I have updated the PR according to your suggestion. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-5133) Add new setResource API for DataStream and DataSet
[ https://issues.apache.org/jira/browse/FLINK-5133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5133: - Description: This is part of the fine-grained resource configuration. For *DataStream*, the *setResource* API will be setted onto *SingleOutputStreamOperator* similar with other existing properties like parallelism, name, etc. For *DataSet*, the *setResource* API will be setted onto *Operator* in the similar way. There are two parameters described with minimum *ResourceSpec* and maximum *ResourceSpec* separately in the API for considering resource resize in future improvements. > Add new setResource API for DataStream and DataSet > -- > > Key: FLINK-5133 > URL: https://issues.apache.org/jira/browse/FLINK-5133 > Project: Flink > Issue Type: Sub-task > Components: DataSet API, DataStream API >Reporter: Zhijiang Wang > > This is part of the fine-grained resource configuration. > For *DataStream*, the *setResource* API will be setted onto > *SingleOutputStreamOperator* similar with other existing properties like > parallelism, name, etc. > For *DataSet*, the *setResource* API will be setted onto *Operator* in the > similar way. > There are two parameters described with minimum *ResourceSpec* and maximum > *ResourceSpec* separately in the API for considering resource resize in > future improvements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating job graph
[ https://issues.apache.org/jira/browse/FLINK-5134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5134: - Description: In *JobGraph* generation, each *JobVertex* corresponds to a series of chained operators. The resource of *JobVertex* should be aggregation of individual resource in chained operators. For memory resource in *JobVertex*, the aggregation is the sum formula for chained operators. And for cpu cores resource in *JobVertex*, the aggregation is the maximum formula for chained operators. was: In *JobGraph* generation, each *JobVertex* corresponds to a series of chained operators. The resource of *JobVertex* should be aggregation of individual resource in chained operators. For memory resource in *JobVertex*, the aggregation is the sum formula for chained operators. And for cpu cores resource in *JobVertex*, the aggregation is the maximum formula for chained operators. > Aggregate ResourceSpe for chained operators when generating job graph > - > > Key: FLINK-5134 > URL: https://issues.apache.org/jira/browse/FLINK-5134 > Project: Flink > Issue Type: Sub-task > Components: DataStream API >Reporter: Zhijiang Wang > > In *JobGraph* generation, each *JobVertex* corresponds to a series of chained > operators. > The resource of *JobVertex* should be aggregation of individual resource in > chained operators. > For memory resource in *JobVertex*, the aggregation is the sum formula for > chained operators. > And for cpu cores resource in *JobVertex*, the aggregation is the maximum > formula for chained operators. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5136) Aggregation of slot ResourceProfile and framework memory for requesting resource from cluster by ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-5136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5136: - Summary: Aggregation of slot ResourceProfile and framework memory for requesting resource from cluster by ResourceManager (was: ResourceManager should consider both slot resource profile and framework memory for requesting resource from cluster) > Aggregation of slot ResourceProfile and framework memory for requesting > resource from cluster by ResourceManager > > > Key: FLINK-5136 > URL: https://issues.apache.org/jira/browse/FLINK-5136 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Zhijiang Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5132) Introduce the ResourceSpec for grouping different resource factors in API
Zhijiang Wang created FLINK-5132: Summary: Introduce the ResourceSpec for grouping different resource factors in API Key: FLINK-5132 URL: https://issues.apache.org/jira/browse/FLINK-5132 Project: Flink Issue Type: Sub-task Components: Core Reporter: Zhijiang Wang -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5131) Fine-grained Resource Configuration
[ https://issues.apache.org/jira/browse/FLINK-5131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5131: - Description: Normally the UDF just creates short-life small objects and these can be recycled quickly by JVM, so most of the memory resource is controlled and managed by *TaskManager* framework. But for some special cases, the UDF may consume much resource to create long-live big objects, so it is necessary to provide the options for professional users to define the resource usages if needed. The basic approach is the following: - Introduce the *ResourceSpec* structure to describe the different resource factors (cpu cores, heap memory, direct memory, native memory, etc) and provide some basic construction methods for resource group. - The *ResourceSpec* can be setted onto the internal transformation in DataStream and base operator in DataSet separately. - In stream graph generation, the *ResourceSpec* will be aggregated for chained operators. - When *JobManager* requests slot for submitting task from *ResourceManager*, the *ResourceProfile* will be expanded to correspondence with *ResourceSpec*. - The *ResourceManager* requests resource for container from cluster, it should consider extra framework memory except for slot *ResourceProfile*. - The framework memory is mainly used by *NetworkBufferPool* and *MemoryManager* in *TaskManager*, and it can be configured in job level. - Apart from resource, The JVM options attached with container should be supported and could also be configured in job level. This feature will be implemented directly into flip-6 branch. was: Normally the UDF just creates short-life small objects and these can be recycled quickly by JVM, so most of the memory resource is controlled and managed by *TaskManager* framework. But for some special cases, the UDF may consume much resource to create long-live big objects, so it is necessary to provide the options for professional users to define the resource usages if needed. The basic approach is the following: - Introduce the *ResourceSpec* structure to describe the different resource factors (cpu cores, heap memory, direct memory, native memory, etc) and provide some basic construction methods for resource group. - The *ResourceSpec* can be setted onto the internal transformation in DataStream and base operator in DataSet separately. - In stream graph generation, the *ResourceSpec* will be aggregated for chained operators. - When *JobManager* requests slot for submitting task from *ResourceManager*, the *ResourceProfile* will be expanded to correspondence with *ResourceSpec*. - The *ResourceManager* requests resource for container from cluster, it should consider extra framework memory except for slot *ResourceProfile*. - The framework memory is mainly used by *NetworkBufferPool* and *MemoryManager* in *TaskManager*, and it can be configured in job level. - Apart from resource, The JVM options attached with container should be supported and could also be configured in job level. > Fine-grained Resource Configuration > --- > > Key: FLINK-5131 > URL: https://issues.apache.org/jira/browse/FLINK-5131 > Project: Flink > Issue Type: New Feature > Components: DataSet API, DataStream API, JobManager, ResourceManager >Reporter: Zhijiang Wang > > Normally the UDF just creates short-life small objects and these can be > recycled quickly by JVM, so most of the memory resource is controlled and > managed by *TaskManager* framework. But for some special cases, the UDF may > consume much resource to create long-live big objects, so it is necessary to > provide the options for professional users to define the resource usages if > needed. > The basic approach is the following: > - Introduce the *ResourceSpec* structure to describe the different resource > factors (cpu cores, heap memory, direct memory, native memory, etc) and > provide some basic construction methods for resource group. > - The *ResourceSpec* can be setted onto the internal transformation in > DataStream and base operator in DataSet separately. > - In stream graph generation, the *ResourceSpec* will be aggregated for > chained operators. > - When *JobManager* requests slot for submitting task from > *ResourceManager*, the *ResourceProfile* will be expanded to correspondence > with *ResourceSpec*. > - The *ResourceManager* requests resource for container from cluster, it > should consider extra framework memory except for slot *ResourceProfile*. > - The framework memory is mainly used by *NetworkBufferPool* and > *MemoryManager* in *TaskManager*, and it can be configured in job level. > - Apart from resource, The JVM options attached with container should be > supported and could also be
[jira] [Updated] (FLINK-5131) Fine-grained Resource Configuration
[ https://issues.apache.org/jira/browse/FLINK-5131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5131: - Description: Normally the UDF just creates short-life small objects and these can be recycled quickly by JVM, so most of the memory resource is controlled and managed by *TaskManager* framework. But for some special cases, the UDF may consume much resource to create long-live big objects, so it is necessary to provide the options for professional users to define the resource usages if needed. The basic approach is the following: - Introduce the *ResourceSpec* structure to describe the different resource factors (cpu cores, heap memory, direct memory, native memory, etc) and provide some basic construction methods for resource group. - The *ResourceSpec* can be setted onto the internal transformation in DataStream and base operator in DataSet separately. - In job graph generation, the *ResourceSpec* will be aggregated for chained operators. - When *JobManager* requests slot for submitting task from *ResourceManager*, the *ResourceProfile* will be expanded to correspond with *ResourceSpec*. - The *ResourceManager* requests resource for container from cluster, it should consider extra framework memory except for slot *ResourceProfile*. - The framework memory is mainly used by *NetworkBufferPool* and *MemoryManager* in *TaskManager*, and it can be configured in job level. - Apart from resource, The JVM options attached with container should be supported and could also be configured in job level. This feature will be implemented directly into flip-6 branch. was: Normally the UDF just creates short-life small objects and these can be recycled quickly by JVM, so most of the memory resource is controlled and managed by *TaskManager* framework. But for some special cases, the UDF may consume much resource to create long-live big objects, so it is necessary to provide the options for professional users to define the resource usages if needed. The basic approach is the following: - Introduce the *ResourceSpec* structure to describe the different resource factors (cpu cores, heap memory, direct memory, native memory, etc) and provide some basic construction methods for resource group. - The *ResourceSpec* can be setted onto the internal transformation in DataStream and base operator in DataSet separately. - In job graph generation, the *ResourceSpec* will be aggregated for chained operators. - When *JobManager* requests slot for submitting task from *ResourceManager*, the *ResourceProfile* will be expanded to correspondence with *ResourceSpec*. - The *ResourceManager* requests resource for container from cluster, it should consider extra framework memory except for slot *ResourceProfile*. - The framework memory is mainly used by *NetworkBufferPool* and *MemoryManager* in *TaskManager*, and it can be configured in job level. - Apart from resource, The JVM options attached with container should be supported and could also be configured in job level. This feature will be implemented directly into flip-6 branch. > Fine-grained Resource Configuration > --- > > Key: FLINK-5131 > URL: https://issues.apache.org/jira/browse/FLINK-5131 > Project: Flink > Issue Type: New Feature > Components: DataSet API, DataStream API, JobManager, ResourceManager >Reporter: Zhijiang Wang > > Normally the UDF just creates short-life small objects and these can be > recycled quickly by JVM, so most of the memory resource is controlled and > managed by *TaskManager* framework. But for some special cases, the UDF may > consume much resource to create long-live big objects, so it is necessary to > provide the options for professional users to define the resource usages if > needed. > The basic approach is the following: > - Introduce the *ResourceSpec* structure to describe the different resource > factors (cpu cores, heap memory, direct memory, native memory, etc) and > provide some basic construction methods for resource group. > - The *ResourceSpec* can be setted onto the internal transformation in > DataStream and base operator in DataSet separately. > - In job graph generation, the *ResourceSpec* will be aggregated for > chained operators. > - When *JobManager* requests slot for submitting task from > *ResourceManager*, the *ResourceProfile* will be expanded to correspond with > *ResourceSpec*. > - The *ResourceManager* requests resource for container from cluster, it > should consider extra framework memory except for slot *ResourceProfile*. > - The framework memory is mainly used by *NetworkBufferPool* and > *MemoryManager* in *TaskManager*, and it can be configured in job level. > - Apart from resource, The JVM options attached with
[jira] [Updated] (FLINK-5135) ResourceProfile for slot request should be expanded to correspond with ResourceSpec
[ https://issues.apache.org/jira/browse/FLINK-5135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5135: - Summary: ResourceProfile for slot request should be expanded to correspond with ResourceSpec (was: ResourceProfile should be expanded to correspond with ResourceSpec) > ResourceProfile for slot request should be expanded to correspond with > ResourceSpec > --- > > Key: FLINK-5135 > URL: https://issues.apache.org/jira/browse/FLINK-5135 > Project: Flink > Issue Type: Sub-task > Components: JobManager, ResourceManager >Reporter: Zhijiang Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5136) Aggregation of slot ResourceProfile and framework memory by ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-5136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5136: - Summary: Aggregation of slot ResourceProfile and framework memory by ResourceManager (was: Aggregation of slot ResourceProfile and framework memory for requesting resource from cluster by ResourceManager) > Aggregation of slot ResourceProfile and framework memory by ResourceManager > --- > > Key: FLINK-5136 > URL: https://issues.apache.org/jira/browse/FLINK-5136 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Zhijiang Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5139) consum kafka can set offset
jing lining created FLINK-5139: -- Summary: consum kafka can set offset Key: FLINK-5139 URL: https://issues.apache.org/jira/browse/FLINK-5139 Project: Flink Issue Type: Improvement Components: Kafka Connector Affects Versions: 1.1.3 Reporter: jing lining now get offset by this org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08#getInvalidOffsetBehavior class whether can put in offset which is number -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5133) Add new setResource API for DataStream and DataSet
Zhijiang Wang created FLINK-5133: Summary: Add new setResource API for DataStream and DataSet Key: FLINK-5133 URL: https://issues.apache.org/jira/browse/FLINK-5133 Project: Flink Issue Type: Sub-task Components: DataSet API, DataStream API Reporter: Zhijiang Wang -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating stream graph
[ https://issues.apache.org/jira/browse/FLINK-5134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5134: - Description: In datastream API, > Aggregate ResourceSpe for chained operators when generating stream graph > > > Key: FLINK-5134 > URL: https://issues.apache.org/jira/browse/FLINK-5134 > Project: Flink > Issue Type: Sub-task > Components: DataStream API >Reporter: Zhijiang Wang > > In datastream API, -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating job graph
[ https://issues.apache.org/jira/browse/FLINK-5134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5134: - Description: In *JobGraph* generation, each *JobVertex* corresponds to a series of chained operators. The resource of *JobVertex* should be aggregation of individual resource in chained operators. For memory resource in *JobVertex*, the aggregation is the sum formula for chained operators. And for cpu cores resource in *JobVertex*, the aggregation is the maximum formula for chained operators. was:In DataStream API, the *ResourceSpec* is setted onto the internal transformation > Aggregate ResourceSpe for chained operators when generating job graph > - > > Key: FLINK-5134 > URL: https://issues.apache.org/jira/browse/FLINK-5134 > Project: Flink > Issue Type: Sub-task > Components: DataStream API >Reporter: Zhijiang Wang > > In *JobGraph* generation, each *JobVertex* corresponds to a series of chained > operators. > The resource of *JobVertex* should be aggregation of individual resource in > chained operators. > For memory resource in *JobVertex*, the aggregation is the sum formula for > chained operators. And for cpu cores resource in *JobVertex*, the aggregation > is the maximum formula for chained operators. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89247312 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -39,66 +45,69 @@ object AggregateUtil { type JavaList[T] = java.util.List[T] /** - * Create Flink operator functions for aggregates. It includes 2 implementations of Flink - * operator functions: - * [[org.apache.flink.api.common.functions.MapFunction]] and - * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's partial aggregate, - * should also implement [[org.apache.flink.api.common.functions.CombineFunction]] as well). - * The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the - * intermediate aggregate values of all aggregate function, it's stored in Row by the following - * format: - * - * {{{ - * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 - * | | - * v v - *+-+-+++++ - *|groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | - *+-+-+++++ - * ^ - * | - * sum(y) aggOffsetInRow = 4 - * }}} - * - */ - def createOperatorFunctionsForAggregates( - namedAggregates: Seq[CalcitePair[AggregateCall, String]], - inputType: RelDataType, - outputType: RelDataType, - groupings: Array[Int]) -: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = { - -val aggregateFunctionsAndFieldIndexes = - transformToAggregateFunctions(namedAggregates.map(_.getKey), inputType, groupings.length) -// store the aggregate fields of each aggregate function, by the same order of aggregates. -val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1 -val aggregates = aggregateFunctionsAndFieldIndexes._2 +* Create prepare MapFunction for aggregates. +* The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the +* intermediate aggregate values of all aggregate function, it's stored in Row by the following +* format: +* +* {{{ +* avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 +* | | +* v v +*+-+-+++++ +*|groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | +*+-+-+++++ +* ^ +* | +* sum(y) aggOffsetInRow = 4 +* }}} +* +*/ + private[flink] def createPrepareMapFunction( +aggregates: Array[Aggregate[_ <: Any]], --- End diff -- Cool,I think use Calcite's namedAggregates is a good idea. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API
[ https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688606#comment-15688606 ] ASF GitHub Bot commented on FLINK-4937: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89247312 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -39,66 +45,69 @@ object AggregateUtil { type JavaList[T] = java.util.List[T] /** - * Create Flink operator functions for aggregates. It includes 2 implementations of Flink - * operator functions: - * [[org.apache.flink.api.common.functions.MapFunction]] and - * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's partial aggregate, - * should also implement [[org.apache.flink.api.common.functions.CombineFunction]] as well). - * The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the - * intermediate aggregate values of all aggregate function, it's stored in Row by the following - * format: - * - * {{{ - * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 - * | | - * v v - *+-+-+++++ - *|groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | - *+-+-+++++ - * ^ - * | - * sum(y) aggOffsetInRow = 4 - * }}} - * - */ - def createOperatorFunctionsForAggregates( - namedAggregates: Seq[CalcitePair[AggregateCall, String]], - inputType: RelDataType, - outputType: RelDataType, - groupings: Array[Int]) -: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = { - -val aggregateFunctionsAndFieldIndexes = - transformToAggregateFunctions(namedAggregates.map(_.getKey), inputType, groupings.length) -// store the aggregate fields of each aggregate function, by the same order of aggregates. -val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1 -val aggregates = aggregateFunctionsAndFieldIndexes._2 +* Create prepare MapFunction for aggregates. +* The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the +* intermediate aggregate values of all aggregate function, it's stored in Row by the following +* format: +* +* {{{ +* avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 +* | | +* v v +*+-+-+++++ +*|groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | +*+-+-+++++ +* ^ +* | +* sum(y) aggOffsetInRow = 4 +* }}} +* +*/ + private[flink] def createPrepareMapFunction( +aggregates: Array[Aggregate[_ <: Any]], --- End diff -- Cool,I think use Calcite's namedAggregates is a good idea. > Add incremental group window aggregation for streaming Table API > > > Key: FLINK-4937 > URL: https://issues.apache.org/jira/browse/FLINK-4937 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: sunjincheng > > Group-window aggregates for streaming tables are currently not done in an > incremental fashion. This means that the window collects all records and > performs the aggregation when the window is closed instead of eagerly > updating a partial aggregate for every added record. Since records are > buffered, non-incremental aggregation requires more storage space than > incremental aggregation. > The DataStream API which is used under the hood of the streaming Table API > features [incremental > aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation] > using a {{ReduceFunction}}. > We should add support for incremental aggregation in group-windows. > This is a follow-up task of
[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89247362 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -39,66 +45,69 @@ object AggregateUtil { type JavaList[T] = java.util.List[T] /** - * Create Flink operator functions for aggregates. It includes 2 implementations of Flink - * operator functions: - * [[org.apache.flink.api.common.functions.MapFunction]] and - * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's partial aggregate, - * should also implement [[org.apache.flink.api.common.functions.CombineFunction]] as well). - * The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the - * intermediate aggregate values of all aggregate function, it's stored in Row by the following - * format: - * - * {{{ - * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 - * | | - * v v - *+-+-+++++ - *|groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | - *+-+-+++++ - * ^ - * | - * sum(y) aggOffsetInRow = 4 - * }}} - * - */ - def createOperatorFunctionsForAggregates( - namedAggregates: Seq[CalcitePair[AggregateCall, String]], - inputType: RelDataType, - outputType: RelDataType, - groupings: Array[Int]) -: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = { - -val aggregateFunctionsAndFieldIndexes = - transformToAggregateFunctions(namedAggregates.map(_.getKey), inputType, groupings.length) -// store the aggregate fields of each aggregate function, by the same order of aggregates. -val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1 -val aggregates = aggregateFunctionsAndFieldIndexes._2 +* Create prepare MapFunction for aggregates. +* The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the +* intermediate aggregate values of all aggregate function, it's stored in Row by the following +* format: +* +* {{{ +* avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 +* | | +* v v +*+-+-+++++ +*|groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | +*+-+-+++++ +* ^ +* | +* sum(y) aggOffsetInRow = 4 +* }}} +* +*/ + private[flink] def createPrepareMapFunction( +aggregates: Array[Aggregate[_ <: Any]], +aggFieldIndexes: Array[Int], +groupings: Array[Int], +inputType: +RelDataType): MapFunction[Any, Row] = { val mapReturnType: RowTypeInfo = createAggregateBufferDataType(groupings, aggregates, inputType) val mapFunction = new AggregateMapFunction[Row, Row]( -aggregates, aggFieldIndexes, groupings, - mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]] - -// the mapping relation between field index of intermediate aggregate Row and output Row. -val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings) - -// the mapping relation between aggregate function index in list and its corresponding -// field index in output Row. -val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType) - -if (groupingOffsetMapping.length != groupings.length || -aggOffsetMapping.length != namedAggregates.length) { - throw new TableException("Could not find output field in input data type " + - "or aggregate functions.") -} - -val allPartialAggregate = aggregates.map(_.supportPartial).forall(x => x) + aggregates, + aggFieldIndexes, + groupings, + mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]] -val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum +
[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API
[ https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688608#comment-15688608 ] ASF GitHub Bot commented on FLINK-4937: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89247362 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -39,66 +45,69 @@ object AggregateUtil { type JavaList[T] = java.util.List[T] /** - * Create Flink operator functions for aggregates. It includes 2 implementations of Flink - * operator functions: - * [[org.apache.flink.api.common.functions.MapFunction]] and - * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's partial aggregate, - * should also implement [[org.apache.flink.api.common.functions.CombineFunction]] as well). - * The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the - * intermediate aggregate values of all aggregate function, it's stored in Row by the following - * format: - * - * {{{ - * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 - * | | - * v v - *+-+-+++++ - *|groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | - *+-+-+++++ - * ^ - * | - * sum(y) aggOffsetInRow = 4 - * }}} - * - */ - def createOperatorFunctionsForAggregates( - namedAggregates: Seq[CalcitePair[AggregateCall, String]], - inputType: RelDataType, - outputType: RelDataType, - groupings: Array[Int]) -: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = { - -val aggregateFunctionsAndFieldIndexes = - transformToAggregateFunctions(namedAggregates.map(_.getKey), inputType, groupings.length) -// store the aggregate fields of each aggregate function, by the same order of aggregates. -val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1 -val aggregates = aggregateFunctionsAndFieldIndexes._2 +* Create prepare MapFunction for aggregates. +* The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the +* intermediate aggregate values of all aggregate function, it's stored in Row by the following +* format: +* +* {{{ +* avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 +* | | +* v v +*+-+-+++++ +*|groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | +*+-+-+++++ +* ^ +* | +* sum(y) aggOffsetInRow = 4 +* }}} +* +*/ + private[flink] def createPrepareMapFunction( +aggregates: Array[Aggregate[_ <: Any]], +aggFieldIndexes: Array[Int], +groupings: Array[Int], +inputType: +RelDataType): MapFunction[Any, Row] = { val mapReturnType: RowTypeInfo = createAggregateBufferDataType(groupings, aggregates, inputType) val mapFunction = new AggregateMapFunction[Row, Row]( -aggregates, aggFieldIndexes, groupings, - mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]] - -// the mapping relation between field index of intermediate aggregate Row and output Row. -val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings) - -// the mapping relation between aggregate function index in list and its corresponding -// field index in output Row. -val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType) - -if (groupingOffsetMapping.length != groupings.length || -aggOffsetMapping.length != namedAggregates.length) { - throw new TableException("Could not find output field in input data type " + - "or aggregate functions.") -} - -val allPartialAggregate = aggregates.map(_.supportPartial).forall(x => x) + aggregates, + aggFieldIndexes,
[jira] [Updated] (FLINK-5131) Fine-grained Resource Configuration
[ https://issues.apache.org/jira/browse/FLINK-5131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5131: - Description: Normally the UDF just creates short-life small objects and these can be recycled quickly by JVM, so most of the memory resource is controlled and managed by *TaskManager* framework. But for some special cases, the UDF may consume much resource to create long-live big objects, so it is necessary to provide the options for professional users to define the resource usages if needed. The basic approach is the following: - Introduce the *ResourceSpec* structure to describe the different resource factors (cpu cores, heap memory, direct memory, native memory, etc) and provide some basic construction methods for resource group. - The *ResourceSpec* can be setted onto the internal transformation in DataStream and base operator in DataSet separately. - In stream graph generation, the *ResourceSpec* will be aggregated for chained operators. - When *JobManager* requests slot for submitting task from *ResourceManager*, the *ResourceProfile* will be expanded to correspondence with *ResourceSpec*. - The *ResourceManager* requests resource for container from cluster, it should consider extra framework memory except for slot *ResourceProfile*. - The framework memory is mainly used by *NetworkBufferPool* and *MemoryManager* in *TaskManager*, and it can be configured in job level. - Apart from resource, The JVM options attached with container should be supported and could also be configured in job level. > Fine-grained Resource Configuration > --- > > Key: FLINK-5131 > URL: https://issues.apache.org/jira/browse/FLINK-5131 > Project: Flink > Issue Type: New Feature > Components: DataSet API, DataStream API, JobManager, ResourceManager >Reporter: Zhijiang Wang > > Normally the UDF just creates short-life small objects and these can be > recycled quickly by JVM, so most of the memory resource is controlled and > managed by *TaskManager* framework. But for some special cases, the UDF may > consume much resource to create long-live big objects, so it is necessary to > provide the options for professional users to define the resource usages if > needed. > The basic approach is the following: > - Introduce the *ResourceSpec* structure to describe the different resource > factors (cpu cores, heap memory, direct memory, native memory, etc) and > provide some basic construction methods for resource group. > - The *ResourceSpec* can be setted onto the internal transformation in > DataStream and base operator in DataSet separately. > - In stream graph generation, the *ResourceSpec* will be aggregated for > chained operators. > - When *JobManager* requests slot for submitting task from > *ResourceManager*, the *ResourceProfile* will be expanded to correspondence > with *ResourceSpec*. > - The *ResourceManager* requests resource for container from cluster, it > should consider extra framework memory except for slot *ResourceProfile*. > - The framework memory is mainly used by *NetworkBufferPool* and > *MemoryManager* in *TaskManager*, and it can be configured in job level. > - Apart from resource, The JVM options attached with container should be > supported and could also be configured in job level. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688781#comment-15688781 ] ASF GitHub Bot commented on FLINK-4469: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r89252908 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -135,6 +138,32 @@ object UserDefinedFunctionUtils { } /** +* Returns eval method matching the given signature of [[TypeInformation]]. +*/ + def getEvalMethod( +function: EvaluableFunction, --- End diff -- hi @fhueske When we declare the ScalarFunction and TableFunction as follows: trait ScalarFunction, trait TableFunction [T], it will become very useful. The current version is optional. > Add support for user defined table function in Table API & SQL > -- > > Key: FLINK-4469 > URL: https://issues.apache.org/jira/browse/FLINK-4469 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Normal user-defined functions, such as concat(), take in a single input row > and output a single output row. In contrast, table-generating functions > transform a single input row to multiple output rows. It is very useful in > some cases, such as look up in HBase by rowkey and return one or more rows. > Adding a user defined table function should: > 1. inherit from UDTF class with specific generic type T > 2. define one or more evel function. > NOTE: > 1. the eval method must be public and non-static. > 2. the generic type T is the row type returned by table function. Because of > Java type erasure, we can’t extract T from the Iterable. > 3. use {{collect(T)}} to emit table row > 4. eval method can be overload. Blink will choose the best match eval method > to call according to parameter types and number. > {code} > public class Word { > public String word; > public Integer length; > } > public class SplitStringUDTF extends UDTF { > public Iterable eval(String str) { > if (str != null) { > for (String s : str.split(",")) { > collect(new Word(s, s.length())); > } > } > } > } > // in SQL > tableEnv.registerFunction("split", new SplitStringUDTF()) > tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS > t(w,l)") > // in Java Table API > tableEnv.registerFunction("split", new SplitStringUDTF()) > // rename split table columns to “w” and “l” > table.crossApply("split(c) as (w, l)") > .select("a, b, w, l") > // without renaming, we will use the origin field names in the POJO/case/... > table.crossApply("split(c)") > .select("a, b, word, length") > // in Scala Table API > val split = new SplitStringUDTF() > table.crossApply(split('c) as ('w, 'l)) > .select('a, 'b, 'w, 'l) > // outerApply for outer join to a UDTF > table.outerApply(split('c)) > .select('a, 'b, 'word, 'length) > {code} > See [1] for more information about UDTF design. > [1] > https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit# -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r89252908 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -135,6 +138,32 @@ object UserDefinedFunctionUtils { } /** +* Returns eval method matching the given signature of [[TypeInformation]]. +*/ + def getEvalMethod( +function: EvaluableFunction, --- End diff -- hi @fhueske When we declare the ScalarFunction and TableFunction as follows: trait ScalarFunction, trait TableFunction [T], it will become very useful. The current version is optional. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating job graph
[ https://issues.apache.org/jira/browse/FLINK-5134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5134: - Summary: Aggregate ResourceSpe for chained operators when generating job graph (was: Aggregate ResourceSpe for chained operators when generating stream graph) > Aggregate ResourceSpe for chained operators when generating job graph > - > > Key: FLINK-5134 > URL: https://issues.apache.org/jira/browse/FLINK-5134 > Project: Flink > Issue Type: Sub-task > Components: DataStream API >Reporter: Zhijiang Wang > > In DataStream API, the *ResourceSpec* is setted onto the internal > transformation -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating stream graph
[ https://issues.apache.org/jira/browse/FLINK-5134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5134: - Description: In DataStream API, the *ResourceSpec* is setted onto the internal transformation (was: In datastream API,) > Aggregate ResourceSpe for chained operators when generating stream graph > > > Key: FLINK-5134 > URL: https://issues.apache.org/jira/browse/FLINK-5134 > Project: Flink > Issue Type: Sub-task > Components: DataStream API >Reporter: Zhijiang Wang > > In DataStream API, the *ResourceSpec* is setted onto the internal > transformation -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5136) ResourceManager should consider both slot resource profile and framework memory for requesting resource from cluster
Zhijiang Wang created FLINK-5136: Summary: ResourceManager should consider both slot resource profile and framework memory for requesting resource from cluster Key: FLINK-5136 URL: https://issues.apache.org/jira/browse/FLINK-5136 Project: Flink Issue Type: Sub-task Components: ResourceManager Reporter: Zhijiang Wang -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5131) Fine-grained Resource Configuration
[ https://issues.apache.org/jira/browse/FLINK-5131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5131: - Description: Normally the UDF just creates short-life small objects and these can be recycled quickly by JVM, so most of the memory resource is controlled and managed by *TaskManager* framework. But for some special cases, the UDF may consume much resource to create long-live big objects, so it is necessary to provide the options for professional users to define the resource usages if needed. The basic approach is the following: - Introduce the *ResourceSpec* structure to describe the different resource factors (cpu cores, heap memory, direct memory, native memory, etc) and provide some basic construction methods for resource group. - The *ResourceSpec* can be setted onto the internal transformation in DataStream and base operator in DataSet separately. - In job graph generation, the *ResourceSpec* will be aggregated for chained operators. - When *JobManager* requests slot for submitting task from *ResourceManager*, the *ResourceProfile* will be expanded to correspondence with *ResourceSpec*. - The *ResourceManager* requests resource for container from cluster, it should consider extra framework memory except for slot *ResourceProfile*. - The framework memory is mainly used by *NetworkBufferPool* and *MemoryManager* in *TaskManager*, and it can be configured in job level. - Apart from resource, The JVM options attached with container should be supported and could also be configured in job level. This feature will be implemented directly into flip-6 branch. was: Normally the UDF just creates short-life small objects and these can be recycled quickly by JVM, so most of the memory resource is controlled and managed by *TaskManager* framework. But for some special cases, the UDF may consume much resource to create long-live big objects, so it is necessary to provide the options for professional users to define the resource usages if needed. The basic approach is the following: - Introduce the *ResourceSpec* structure to describe the different resource factors (cpu cores, heap memory, direct memory, native memory, etc) and provide some basic construction methods for resource group. - The *ResourceSpec* can be setted onto the internal transformation in DataStream and base operator in DataSet separately. - In stream graph generation, the *ResourceSpec* will be aggregated for chained operators. - When *JobManager* requests slot for submitting task from *ResourceManager*, the *ResourceProfile* will be expanded to correspondence with *ResourceSpec*. - The *ResourceManager* requests resource for container from cluster, it should consider extra framework memory except for slot *ResourceProfile*. - The framework memory is mainly used by *NetworkBufferPool* and *MemoryManager* in *TaskManager*, and it can be configured in job level. - Apart from resource, The JVM options attached with container should be supported and could also be configured in job level. This feature will be implemented directly into flip-6 branch. > Fine-grained Resource Configuration > --- > > Key: FLINK-5131 > URL: https://issues.apache.org/jira/browse/FLINK-5131 > Project: Flink > Issue Type: New Feature > Components: DataSet API, DataStream API, JobManager, ResourceManager >Reporter: Zhijiang Wang > > Normally the UDF just creates short-life small objects and these can be > recycled quickly by JVM, so most of the memory resource is controlled and > managed by *TaskManager* framework. But for some special cases, the UDF may > consume much resource to create long-live big objects, so it is necessary to > provide the options for professional users to define the resource usages if > needed. > The basic approach is the following: > - Introduce the *ResourceSpec* structure to describe the different resource > factors (cpu cores, heap memory, direct memory, native memory, etc) and > provide some basic construction methods for resource group. > - The *ResourceSpec* can be setted onto the internal transformation in > DataStream and base operator in DataSet separately. > - In job graph generation, the *ResourceSpec* will be aggregated for > chained operators. > - When *JobManager* requests slot for submitting task from > *ResourceManager*, the *ResourceProfile* will be expanded to correspondence > with *ResourceSpec*. > - The *ResourceManager* requests resource for container from cluster, it > should consider extra framework memory except for slot *ResourceProfile*. > - The framework memory is mainly used by *NetworkBufferPool* and > *MemoryManager* in *TaskManager*, and it can be configured in job level. > - Apart from resource, The JVM options attached
[jira] [Updated] (FLINK-5131) Fine-grained Resource Configuration
[ https://issues.apache.org/jira/browse/FLINK-5131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5131: - Description: Normally the UDF just creates short-life small objects and these can be recycled quickly by JVM, so most of the memory resource is controlled and managed by *TaskManager* framework. But for some special cases, the UDF may consume much resource to create long-live big objects, so it is necessary to provide the options for professional users to define the resource usages if needed. The basic approach is the following: - Introduce the *ResourceSpec* structure to describe the different resource factors (cpu cores, heap memory, direct memory, native memory, etc) and provide some basic construction methods for resource group. - The *ResourceSpec* can be setted onto the internal transformation in DataStream and base operator in DataSet separately. - The *StateBackend* should provide the method to estimate the memory usage based on hint of state size in *ResourceSpec*. - In job graph generation, the *ResourceSpec* will be aggregated for chained operators. - When *JobManager* requests slot for submitting task from *ResourceManager*, the *ResourceProfile* will be expanded to correspond with *ResourceSpec*. - The *ResourceManager* requests resource for container from cluster, it should consider extra framework memory except for slot *ResourceProfile*. - The framework memory is mainly used by *NetworkBufferPool* and *MemoryManager* in *TaskManager*, and it can be configured in job level. - Apart from resource, The JVM options attached with container should be supported and could also be configured in job level. This feature will be implemented directly into flip-6 branch. was: Normally the UDF just creates short-life small objects and these can be recycled quickly by JVM, so most of the memory resource is controlled and managed by *TaskManager* framework. But for some special cases, the UDF may consume much resource to create long-live big objects, so it is necessary to provide the options for professional users to define the resource usages if needed. The basic approach is the following: - Introduce the *ResourceSpec* structure to describe the different resource factors (cpu cores, heap memory, direct memory, native memory, etc) and provide some basic construction methods for resource group. - The *ResourceSpec* can be setted onto the internal transformation in DataStream and base operator in DataSet separately. - In job graph generation, the *ResourceSpec* will be aggregated for chained operators. - When *JobManager* requests slot for submitting task from *ResourceManager*, the *ResourceProfile* will be expanded to correspond with *ResourceSpec*. - The *ResourceManager* requests resource for container from cluster, it should consider extra framework memory except for slot *ResourceProfile*. - The framework memory is mainly used by *NetworkBufferPool* and *MemoryManager* in *TaskManager*, and it can be configured in job level. - Apart from resource, The JVM options attached with container should be supported and could also be configured in job level. This feature will be implemented directly into flip-6 branch. > Fine-grained Resource Configuration > --- > > Key: FLINK-5131 > URL: https://issues.apache.org/jira/browse/FLINK-5131 > Project: Flink > Issue Type: New Feature > Components: DataSet API, DataStream API, JobManager, ResourceManager >Reporter: Zhijiang Wang > > Normally the UDF just creates short-life small objects and these can be > recycled quickly by JVM, so most of the memory resource is controlled and > managed by *TaskManager* framework. But for some special cases, the UDF may > consume much resource to create long-live big objects, so it is necessary to > provide the options for professional users to define the resource usages if > needed. > The basic approach is the following: > - Introduce the *ResourceSpec* structure to describe the different resource > factors (cpu cores, heap memory, direct memory, native memory, etc) and > provide some basic construction methods for resource group. > - The *ResourceSpec* can be setted onto the internal transformation in > DataStream and base operator in DataSet separately. > - The *StateBackend* should provide the method to estimate the memory usage > based on hint of state size in *ResourceSpec*. > - In job graph generation, the *ResourceSpec* will be aggregated for > chained operators. > - When *JobManager* requests slot for submitting task from > *ResourceManager*, the *ResourceProfile* will be expanded to correspond with > *ResourceSpec*. > - The *ResourceManager* requests resource for container from cluster, it > should consider extra framework
[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API
[ https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688661#comment-15688661 ] ASF GitHub Bot commented on FLINK-4937: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89249046 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -115,14 +124,210 @@ object AggregateUtil { intermediateRowArity, outputType.getFieldCount) } +groupReduceFunction + } + + /** +* Create IncrementalAggregateReduceFunction for Incremental aggregates. It implement +* [[org.apache.flink.api.common.functions.ReduceFunction]] +* +*/ + private[flink] def createIncrementalAggregateReduceFunction( +aggregates: Array[Aggregate[_ <: Any]], +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): IncrementalAggregateReduceFunction = { +val groupingOffsetMapping = + getGroupingOffsetAndaggOffsetMapping( +namedAggregates, +inputType, +outputType, +groupings)._1 +val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum +val reduceFunction = new IncrementalAggregateReduceFunction( + aggregates, + groupingOffsetMapping, + intermediateRowArity) +reduceFunction + } + + /** +* @return groupingOffsetMapping (mapping relation between field index of intermediate +* aggregate Row and output Row.) +* and aggOffsetMapping (the mapping relation between aggregate function index in list +* and its corresponding field index in output Row.) +*/ + def getGroupingOffsetAndaggOffsetMapping( +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = { + +// the mapping relation between field index of intermediate aggregate Row and output Row. +val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings) + +// the mapping relation between aggregate function index in list and its corresponding +// field index in output Row. +val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType) -(mapFunction, reduceGroupFunction) +if (groupingOffsetMapping.length != groupings.length || + aggOffsetMapping.length != namedAggregates.length) { + throw new TableException( +"Could not find output field in input data type " + + "or aggregate functions.") +} +(groupingOffsetMapping, aggOffsetMapping) + } + + + private[flink] def createAllWindowAggregationFunction( +window: LogicalWindow, +properties: Seq[NamedWindowProperty], +aggFunction: RichGroupReduceFunction[Row, Row]) --- End diff -- yes. > Add incremental group window aggregation for streaming Table API > > > Key: FLINK-4937 > URL: https://issues.apache.org/jira/browse/FLINK-4937 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: sunjincheng > > Group-window aggregates for streaming tables are currently not done in an > incremental fashion. This means that the window collects all records and > performs the aggregation when the window is closed instead of eagerly > updating a partial aggregate for every added record. Since records are > buffered, non-incremental aggregation requires more storage space than > incremental aggregation. > The DataStream API which is used under the hood of the streaming Table API > features [incremental > aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation] > using a {{ReduceFunction}}. > We should add support for incremental aggregation in group-windows. > This is a follow-up task of FLINK-4691. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89249046 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -115,14 +124,210 @@ object AggregateUtil { intermediateRowArity, outputType.getFieldCount) } +groupReduceFunction + } + + /** +* Create IncrementalAggregateReduceFunction for Incremental aggregates. It implement +* [[org.apache.flink.api.common.functions.ReduceFunction]] +* +*/ + private[flink] def createIncrementalAggregateReduceFunction( +aggregates: Array[Aggregate[_ <: Any]], +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): IncrementalAggregateReduceFunction = { +val groupingOffsetMapping = + getGroupingOffsetAndaggOffsetMapping( +namedAggregates, +inputType, +outputType, +groupings)._1 +val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum +val reduceFunction = new IncrementalAggregateReduceFunction( + aggregates, + groupingOffsetMapping, + intermediateRowArity) +reduceFunction + } + + /** +* @return groupingOffsetMapping (mapping relation between field index of intermediate +* aggregate Row and output Row.) +* and aggOffsetMapping (the mapping relation between aggregate function index in list +* and its corresponding field index in output Row.) +*/ + def getGroupingOffsetAndaggOffsetMapping( +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = { + +// the mapping relation between field index of intermediate aggregate Row and output Row. +val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings) + +// the mapping relation between aggregate function index in list and its corresponding +// field index in output Row. +val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType) -(mapFunction, reduceGroupFunction) +if (groupingOffsetMapping.length != groupings.length || + aggOffsetMapping.length != namedAggregates.length) { + throw new TableException( +"Could not find output field in input data type " + + "or aggregate functions.") +} +(groupingOffsetMapping, aggOffsetMapping) + } + + + private[flink] def createAllWindowAggregationFunction( +window: LogicalWindow, +properties: Seq[NamedWindowProperty], +aggFunction: RichGroupReduceFunction[Row, Row]) --- End diff -- yes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5061) Remove ContinuousEventTimeTrigger
[ https://issues.apache.org/jira/browse/FLINK-5061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688654#comment-15688654 ] Manu Zhang commented on FLINK-5061: --- Agreed. Just some of my thoughts and should not block the changes. > Remove ContinuousEventTimeTrigger > - > > Key: FLINK-5061 > URL: https://issues.apache.org/jira/browse/FLINK-5061 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 1.2.0 > > > The Trigger does not do what people think it does. The problem is that a > watermark T signals that we won't see elements with timestamp < T in the > future. It does not signal that we have not yet seen elements with timestamp > > T. So it cannot really be used to trigger at different stages of processing > an event-time window. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating stream graph
Zhijiang Wang created FLINK-5134: Summary: Aggregate ResourceSpe for chained operators when generating stream graph Key: FLINK-5134 URL: https://issues.apache.org/jira/browse/FLINK-5134 Project: Flink Issue Type: Sub-task Components: DataStream API Reporter: Zhijiang Wang -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5135) ResourceProfile should be expanded to correspond with ResourceSpec
Zhijiang Wang created FLINK-5135: Summary: ResourceProfile should be expanded to correspond with ResourceSpec Key: FLINK-5135 URL: https://issues.apache.org/jira/browse/FLINK-5135 Project: Flink Issue Type: Sub-task Components: JobManager, ResourceManager Reporter: Zhijiang Wang -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5137) Config JVM options and set onto the TaskManager container
[ https://issues.apache.org/jira/browse/FLINK-5137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5137: - Description: The users can config JVM options in job level. And the *ResourceManager* will set related JVM options for launching *TaskManager* container based on framework memory and slot resource profile. > Config JVM options and set onto the TaskManager container > - > > Key: FLINK-5137 > URL: https://issues.apache.org/jira/browse/FLINK-5137 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager, TaskManager >Reporter: Zhijiang Wang > > The users can config JVM options in job level. And the *ResourceManager* will > set related JVM options for launching *TaskManager* container based on > framework memory and slot resource profile. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5136) Aggregation of slot ResourceProfile and framework memory by ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-5136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang reassigned FLINK-5136: Assignee: Zhijiang Wang > Aggregation of slot ResourceProfile and framework memory by ResourceManager > --- > > Key: FLINK-5136 > URL: https://issues.apache.org/jira/browse/FLINK-5136 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > When *ResourceManager* requests container resource from cluster, the > framework memory should be considered together with slot resource. > Currently the framework memory is mainly used by *MemoryManager* and > *NetworkBufferPool* in *TaskManager*, and this memory can be got from > configuration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5135) ResourceProfile for slot request should be expanded to correspond with ResourceSpec
[ https://issues.apache.org/jira/browse/FLINK-5135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang reassigned FLINK-5135: Assignee: Zhijiang Wang > ResourceProfile for slot request should be expanded to correspond with > ResourceSpec > --- > > Key: FLINK-5135 > URL: https://issues.apache.org/jira/browse/FLINK-5135 > Project: Flink > Issue Type: Sub-task > Components: JobManager, ResourceManager >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > The *JobManager* requests slot by *ResourceProfile* from *ResourceManager* > before submitting tasks. Currently the *ResourceProfile* only contains cpu > cores and memory properties. The memory should be expanded to different types > such as heap memory, direct memory and native memory which corresponds with > memory in *ResourceSpec*. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating job graph
[ https://issues.apache.org/jira/browse/FLINK-5134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang reassigned FLINK-5134: Assignee: Zhijiang Wang > Aggregate ResourceSpe for chained operators when generating job graph > - > > Key: FLINK-5134 > URL: https://issues.apache.org/jira/browse/FLINK-5134 > Project: Flink > Issue Type: Sub-task > Components: DataStream API >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > In *JobGraph* generation, each *JobVertex* corresponds to a series of chained > operators. > The resource of *JobVertex* should be aggregation of individual resource in > chained operators. > For memory resource in *JobVertex*, the aggregation is the sum formula for > chained operators. > And for cpu cores resource in *JobVertex*, the aggregation is the maximum > formula for chained operators. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5133) Add new setResource API for DataStream and DataSet
[ https://issues.apache.org/jira/browse/FLINK-5133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang reassigned FLINK-5133: Assignee: Zhijiang Wang > Add new setResource API for DataStream and DataSet > -- > > Key: FLINK-5133 > URL: https://issues.apache.org/jira/browse/FLINK-5133 > Project: Flink > Issue Type: Sub-task > Components: DataSet API, DataStream API >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > This is part of the fine-grained resource configuration. > For *DataStream*, the *setResource* API will be setted onto > *SingleOutputStreamOperator* similar with other existing properties like > parallelism, name, etc. > For *DataSet*, the *setResource* API will be setted onto *Operator* in the > similar way. > There are two parameters described with minimum *ResourceSpec* and maximum > *ResourceSpec* separately in the API for considering resource resize in > future improvements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5132) Introduce the ResourceSpec for grouping different resource factors in API
[ https://issues.apache.org/jira/browse/FLINK-5132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang reassigned FLINK-5132: Assignee: Zhijiang Wang > Introduce the ResourceSpec for grouping different resource factors in API > - > > Key: FLINK-5132 > URL: https://issues.apache.org/jira/browse/FLINK-5132 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > This is part of the fine-grained resource configuration. > The current resource factors include cpu cores, heap memory, direct memory, > native memory and state size. > The *ResourceSpec* will provide some basic constructions for grouping > different resource factors as needed and the construction can also be > expanded easily for further requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5137) Config JVM options and set onto the TaskManager container
[ https://issues.apache.org/jira/browse/FLINK-5137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang reassigned FLINK-5137: Assignee: Zhijiang Wang > Config JVM options and set onto the TaskManager container > - > > Key: FLINK-5137 > URL: https://issues.apache.org/jira/browse/FLINK-5137 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager, TaskManager >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > The users can config JVM options in job level. And the *ResourceManager* will > set related JVM options for launching *TaskManager* container based on > framework memory and slot resource profile. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API
[ https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688612#comment-15688612 ] ASF GitHub Bot commented on FLINK-4937: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89247407 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -115,14 +124,210 @@ object AggregateUtil { intermediateRowArity, outputType.getFieldCount) } +groupReduceFunction + } + + /** +* Create IncrementalAggregateReduceFunction for Incremental aggregates. It implement +* [[org.apache.flink.api.common.functions.ReduceFunction]] +* +*/ + private[flink] def createIncrementalAggregateReduceFunction( +aggregates: Array[Aggregate[_ <: Any]], +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): IncrementalAggregateReduceFunction = { +val groupingOffsetMapping = + getGroupingOffsetAndaggOffsetMapping( +namedAggregates, +inputType, +outputType, +groupings)._1 +val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum +val reduceFunction = new IncrementalAggregateReduceFunction( + aggregates, + groupingOffsetMapping, + intermediateRowArity) +reduceFunction + } + + /** +* @return groupingOffsetMapping (mapping relation between field index of intermediate +* aggregate Row and output Row.) +* and aggOffsetMapping (the mapping relation between aggregate function index in list +* and its corresponding field index in output Row.) +*/ + def getGroupingOffsetAndaggOffsetMapping( +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = { + +// the mapping relation between field index of intermediate aggregate Row and output Row. +val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings) + +// the mapping relation between aggregate function index in list and its corresponding +// field index in output Row. +val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType) -(mapFunction, reduceGroupFunction) +if (groupingOffsetMapping.length != groupings.length || + aggOffsetMapping.length != namedAggregates.length) { + throw new TableException( +"Could not find output field in input data type " + + "or aggregate functions.") +} +(groupingOffsetMapping, aggOffsetMapping) + } + + + private[flink] def createAllWindowAggregationFunction( +window: LogicalWindow, +properties: Seq[NamedWindowProperty], +aggFunction: RichGroupReduceFunction[Row, Row]) + : AllWindowFunction[Row, Row, DataStreamWindow] = { + +if (isTimeWindow(window)) { + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos) + .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]] +} else { + new AggregateAllWindowFunction(aggFunction) +} + + } + + + private[flink] def createWindowAggregationFunction( +window: LogicalWindow, +properties: Seq[NamedWindowProperty], +aggFunction: RichGroupReduceFunction[Row, Row]) + : WindowFunction[Row, Row, Tuple, DataStreamWindow] = { + +if (isTimeWindow(window)) { + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + new AggregateTimeWindowFunction(aggFunction, startPos, endPos) + .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]] +} else { + new AggregateWindowFunction(aggFunction) +} + + } + + private[flink] def createAllWindowIncrementalAggregationFunction( +window: LogicalWindow, +aggregates: Array[Aggregate[_ <: Any]], --- End diff -- sure. agree. > Add incremental group window aggregation for streaming Table API > > > Key: FLINK-4937 > URL: https://issues.apache.org/jira/browse/FLINK-4937 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.2.0 >
[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API
[ https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688613#comment-15688613 ] ASF GitHub Bot commented on FLINK-4937: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89247416 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -115,14 +124,210 @@ object AggregateUtil { intermediateRowArity, outputType.getFieldCount) } +groupReduceFunction + } + + /** +* Create IncrementalAggregateReduceFunction for Incremental aggregates. It implement +* [[org.apache.flink.api.common.functions.ReduceFunction]] +* +*/ + private[flink] def createIncrementalAggregateReduceFunction( +aggregates: Array[Aggregate[_ <: Any]], +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): IncrementalAggregateReduceFunction = { +val groupingOffsetMapping = + getGroupingOffsetAndaggOffsetMapping( +namedAggregates, +inputType, +outputType, +groupings)._1 +val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum +val reduceFunction = new IncrementalAggregateReduceFunction( + aggregates, + groupingOffsetMapping, + intermediateRowArity) +reduceFunction + } + + /** +* @return groupingOffsetMapping (mapping relation between field index of intermediate +* aggregate Row and output Row.) +* and aggOffsetMapping (the mapping relation between aggregate function index in list +* and its corresponding field index in output Row.) +*/ + def getGroupingOffsetAndaggOffsetMapping( +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = { + +// the mapping relation between field index of intermediate aggregate Row and output Row. +val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings) + +// the mapping relation between aggregate function index in list and its corresponding +// field index in output Row. +val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType) -(mapFunction, reduceGroupFunction) +if (groupingOffsetMapping.length != groupings.length || + aggOffsetMapping.length != namedAggregates.length) { + throw new TableException( +"Could not find output field in input data type " + + "or aggregate functions.") +} +(groupingOffsetMapping, aggOffsetMapping) + } + + + private[flink] def createAllWindowAggregationFunction( +window: LogicalWindow, +properties: Seq[NamedWindowProperty], +aggFunction: RichGroupReduceFunction[Row, Row]) + : AllWindowFunction[Row, Row, DataStreamWindow] = { + +if (isTimeWindow(window)) { + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos) + .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]] +} else { + new AggregateAllWindowFunction(aggFunction) +} + + } + + + private[flink] def createWindowAggregationFunction( +window: LogicalWindow, +properties: Seq[NamedWindowProperty], +aggFunction: RichGroupReduceFunction[Row, Row]) + : WindowFunction[Row, Row, Tuple, DataStreamWindow] = { + +if (isTimeWindow(window)) { + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + new AggregateTimeWindowFunction(aggFunction, startPos, endPos) + .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]] +} else { + new AggregateWindowFunction(aggFunction) +} + + } + + private[flink] def createAllWindowIncrementalAggregationFunction( +window: LogicalWindow, +aggregates: Array[Aggregate[_ <: Any]], +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int], +properties: Seq[NamedWindowProperty]) + : AllWindowFunction[Row, Row, DataStreamWindow] = { + +val (groupingOffsetMapping, aggOffsetMapping) = + getGroupingOffsetAndaggOffsetMapping( + namedAggregates, + inputType,
[jira] [Created] (FLINK-5131) Fine-grained Resource Configuration
Zhijiang Wang created FLINK-5131: Summary: Fine-grained Resource Configuration Key: FLINK-5131 URL: https://issues.apache.org/jira/browse/FLINK-5131 Project: Flink Issue Type: New Feature Components: DataSet API, DataStream API, JobManager, ResourceManager Reporter: Zhijiang Wang -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5137) Config JVM options and set onto the TaskManager container
Zhijiang Wang created FLINK-5137: Summary: Config JVM options and set onto the TaskManager container Key: FLINK-5137 URL: https://issues.apache.org/jira/browse/FLINK-5137 Project: Flink Issue Type: Sub-task Components: ResourceManager, TaskManager Reporter: Zhijiang Wang -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5138) The StateBackend provides the method to estimate memory usage based on hint of state size in ResourceSpec
Zhijiang Wang created FLINK-5138: Summary: The StateBackend provides the method to estimate memory usage based on hint of state size in ResourceSpec Key: FLINK-5138 URL: https://issues.apache.org/jira/browse/FLINK-5138 Project: Flink Issue Type: Sub-task Components: DataSet API, DataStream API Reporter: Zhijiang Wang -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5138) The StateBackend provides the method to estimate memory usage based on hint of state size in ResourceSpec
[ https://issues.apache.org/jira/browse/FLINK-5138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5138: - Component/s: Core > The StateBackend provides the method to estimate memory usage based on hint > of state size in ResourceSpec > - > > Key: FLINK-5138 > URL: https://issues.apache.org/jira/browse/FLINK-5138 > Project: Flink > Issue Type: Sub-task > Components: Core, DataSet API, DataStream API >Reporter: Zhijiang Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5138) The StateBackend provides the method to estimate memory usage based on hint of state size in ResourceSpec
[ https://issues.apache.org/jira/browse/FLINK-5138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5138: - Description: Users may specify the state size in *setResource* API, then the different *StateBackend* implementation should roughly estimate the different kinds of memory usages based on the state size. This part of estimate memory will be aggregated with other memories in *ResourceSpec*. There are two advantages to do this: - For RocksDB backend, the proper memory setting for RocksDB can get better performance in read and write. - This estimate memory will be considered when requesting resource for container, so the total memory usage will not exceed the container limit. > The StateBackend provides the method to estimate memory usage based on hint > of state size in ResourceSpec > - > > Key: FLINK-5138 > URL: https://issues.apache.org/jira/browse/FLINK-5138 > Project: Flink > Issue Type: Sub-task > Components: Core, DataSet API, DataStream API >Reporter: Zhijiang Wang > > Users may specify the state size in *setResource* API, then the different > *StateBackend* implementation should roughly estimate the different kinds of > memory usages based on the state size. This part of estimate memory will be > aggregated with other memories in *ResourceSpec*. There are two advantages to > do this: > - For RocksDB backend, the proper memory setting for RocksDB can get better > performance in read and write. > - This estimate memory will be considered when requesting resource for > container, so the total memory usage will not exceed the container limit. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5140) NoResourceAvailableException is thrown while starting a new job
Biao Liu created FLINK-5140: --- Summary: NoResourceAvailableException is thrown while starting a new job Key: FLINK-5140 URL: https://issues.apache.org/jira/browse/FLINK-5140 Project: Flink Issue Type: Bug Components: Cluster Management Environment: FLIP-6 feature branch Reporter: Biao Liu Priority: Minor Here is the stack trace: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: not connected to ResourceManager and no slot available at org.apache.flink.runtime.instance.SlotPool.internalAllocateSlot(SlotPool.java:281) at org.apache.flink.runtime.instance.SlotPool.allocateSlot(SlotPool.java:256) ... Currently I have to set RestartStrategy to handle this exception. But in some test cases, I want to test failure about the cluster. It will make this much more complicated. After discussing with [~tiemsn], maybe we can fix this problem by executing executionGraph after registered to resource manager. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89247416 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -115,14 +124,210 @@ object AggregateUtil { intermediateRowArity, outputType.getFieldCount) } +groupReduceFunction + } + + /** +* Create IncrementalAggregateReduceFunction for Incremental aggregates. It implement +* [[org.apache.flink.api.common.functions.ReduceFunction]] +* +*/ + private[flink] def createIncrementalAggregateReduceFunction( +aggregates: Array[Aggregate[_ <: Any]], +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): IncrementalAggregateReduceFunction = { +val groupingOffsetMapping = + getGroupingOffsetAndaggOffsetMapping( +namedAggregates, +inputType, +outputType, +groupings)._1 +val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum +val reduceFunction = new IncrementalAggregateReduceFunction( + aggregates, + groupingOffsetMapping, + intermediateRowArity) +reduceFunction + } + + /** +* @return groupingOffsetMapping (mapping relation between field index of intermediate +* aggregate Row and output Row.) +* and aggOffsetMapping (the mapping relation between aggregate function index in list +* and its corresponding field index in output Row.) +*/ + def getGroupingOffsetAndaggOffsetMapping( +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = { + +// the mapping relation between field index of intermediate aggregate Row and output Row. +val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings) + +// the mapping relation between aggregate function index in list and its corresponding +// field index in output Row. +val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType) -(mapFunction, reduceGroupFunction) +if (groupingOffsetMapping.length != groupings.length || + aggOffsetMapping.length != namedAggregates.length) { + throw new TableException( +"Could not find output field in input data type " + + "or aggregate functions.") +} +(groupingOffsetMapping, aggOffsetMapping) + } + + + private[flink] def createAllWindowAggregationFunction( +window: LogicalWindow, +properties: Seq[NamedWindowProperty], +aggFunction: RichGroupReduceFunction[Row, Row]) + : AllWindowFunction[Row, Row, DataStreamWindow] = { + +if (isTimeWindow(window)) { + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos) + .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]] +} else { + new AggregateAllWindowFunction(aggFunction) +} + + } + + + private[flink] def createWindowAggregationFunction( +window: LogicalWindow, +properties: Seq[NamedWindowProperty], +aggFunction: RichGroupReduceFunction[Row, Row]) + : WindowFunction[Row, Row, Tuple, DataStreamWindow] = { + +if (isTimeWindow(window)) { + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + new AggregateTimeWindowFunction(aggFunction, startPos, endPos) + .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]] +} else { + new AggregateWindowFunction(aggFunction) +} + + } + + private[flink] def createAllWindowIncrementalAggregationFunction( +window: LogicalWindow, +aggregates: Array[Aggregate[_ <: Any]], +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int], +properties: Seq[NamedWindowProperty]) + : AllWindowFunction[Row, Row, DataStreamWindow] = { + +val (groupingOffsetMapping, aggOffsetMapping) = + getGroupingOffsetAndaggOffsetMapping( + namedAggregates, + inputType, + outputType, + groupings) + +val finalRowArity = outputType.getFieldCount + +if (isTimeWindow(window)) { + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) +
[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89247407 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -115,14 +124,210 @@ object AggregateUtil { intermediateRowArity, outputType.getFieldCount) } +groupReduceFunction + } + + /** +* Create IncrementalAggregateReduceFunction for Incremental aggregates. It implement +* [[org.apache.flink.api.common.functions.ReduceFunction]] +* +*/ + private[flink] def createIncrementalAggregateReduceFunction( +aggregates: Array[Aggregate[_ <: Any]], +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): IncrementalAggregateReduceFunction = { +val groupingOffsetMapping = + getGroupingOffsetAndaggOffsetMapping( +namedAggregates, +inputType, +outputType, +groupings)._1 +val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum +val reduceFunction = new IncrementalAggregateReduceFunction( + aggregates, + groupingOffsetMapping, + intermediateRowArity) +reduceFunction + } + + /** +* @return groupingOffsetMapping (mapping relation between field index of intermediate +* aggregate Row and output Row.) +* and aggOffsetMapping (the mapping relation between aggregate function index in list +* and its corresponding field index in output Row.) +*/ + def getGroupingOffsetAndaggOffsetMapping( +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = { + +// the mapping relation between field index of intermediate aggregate Row and output Row. +val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings) + +// the mapping relation between aggregate function index in list and its corresponding +// field index in output Row. +val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType) -(mapFunction, reduceGroupFunction) +if (groupingOffsetMapping.length != groupings.length || + aggOffsetMapping.length != namedAggregates.length) { + throw new TableException( +"Could not find output field in input data type " + + "or aggregate functions.") +} +(groupingOffsetMapping, aggOffsetMapping) + } + + + private[flink] def createAllWindowAggregationFunction( +window: LogicalWindow, +properties: Seq[NamedWindowProperty], +aggFunction: RichGroupReduceFunction[Row, Row]) + : AllWindowFunction[Row, Row, DataStreamWindow] = { + +if (isTimeWindow(window)) { + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos) + .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]] +} else { + new AggregateAllWindowFunction(aggFunction) +} + + } + + + private[flink] def createWindowAggregationFunction( +window: LogicalWindow, +properties: Seq[NamedWindowProperty], +aggFunction: RichGroupReduceFunction[Row, Row]) + : WindowFunction[Row, Row, Tuple, DataStreamWindow] = { + +if (isTimeWindow(window)) { + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + new AggregateTimeWindowFunction(aggFunction, startPos, endPos) + .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]] +} else { + new AggregateWindowFunction(aggFunction) +} + + } + + private[flink] def createAllWindowIncrementalAggregationFunction( +window: LogicalWindow, +aggregates: Array[Aggregate[_ <: Any]], --- End diff -- sure. agree. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-5136) Aggregation of slot ResourceProfile and framework memory by ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-5136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5136: - Description: When *ResourceManager* requests container resource from cluster, the framework memory should be considered together with slot resource. Currently the framework memory is mainly used by *MemoryManager* and *NetworkBufferPool* in *TaskManager*, and this memory can be got from configuration. > Aggregation of slot ResourceProfile and framework memory by ResourceManager > --- > > Key: FLINK-5136 > URL: https://issues.apache.org/jira/browse/FLINK-5136 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Zhijiang Wang > > When *ResourceManager* requests container resource from cluster, the > framework memory should be considered together with slot resource. > Currently the framework memory is mainly used by *MemoryManager* and > *NetworkBufferPool* in *TaskManager*, and this memory can be got from > configuration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API
[ https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688860#comment-15688860 ] ASF GitHub Bot commented on FLINK-4937: --- Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/2792 @fhueske thanks a lot for the review. I have refactored AggregateUtil and updated the PR . > Add incremental group window aggregation for streaming Table API > > > Key: FLINK-4937 > URL: https://issues.apache.org/jira/browse/FLINK-4937 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: sunjincheng > > Group-window aggregates for streaming tables are currently not done in an > incremental fashion. This means that the window collects all records and > performs the aggregation when the window is closed instead of eagerly > updating a partial aggregate for every added record. Since records are > buffered, non-incremental aggregation requires more storage space than > incremental aggregation. > The DataStream API which is used under the hood of the streaming Table API > features [incremental > aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation] > using a {{ReduceFunction}}. > We should add support for incremental aggregation in group-windows. > This is a follow-up task of FLINK-4691. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2792: [FLINK-4937] [Table] Add incremental group window aggrega...
Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/2792 @fhueske thanks a lot for the review. I have refactored AggregateUtil and updated the PR . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-5135) ResourceProfile for slot request should be expanded to correspond with ResourceSpec
[ https://issues.apache.org/jira/browse/FLINK-5135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5135: - Description: The *JobManager* requests slot by *ResourceProfile* from *ResourceManager* before submitting tasks. Currently the *ResourceProfile* only contains cpu cores and memory properties. The memory should be expanded to different types such as heap memory, direct memory and native memory which corresponds with memory in *ResourceSpec*. > ResourceProfile for slot request should be expanded to correspond with > ResourceSpec > --- > > Key: FLINK-5135 > URL: https://issues.apache.org/jira/browse/FLINK-5135 > Project: Flink > Issue Type: Sub-task > Components: JobManager, ResourceManager >Reporter: Zhijiang Wang > > The *JobManager* requests slot by *ResourceProfile* from *ResourceManager* > before submitting tasks. Currently the *ResourceProfile* only contains cpu > cores and memory properties. The memory should be expanded to different types > such as heap memory, direct memory and native memory which corresponds with > memory in *ResourceSpec*. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89242998 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -61,25 +61,108 @@ object AggregateUtil { * }}} * */ - def createOperatorFunctionsForAggregates( +def createOperatorFunctionsForAggregates( namedAggregates: Seq[CalcitePair[AggregateCall, String]], inputType: RelDataType, outputType: RelDataType, groupings: Array[Int]) : (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = { -val aggregateFunctionsAndFieldIndexes = - transformToAggregateFunctions(namedAggregates.map(_.getKey), inputType, groupings.length) -// store the aggregate fields of each aggregate function, by the same order of aggregates. -val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1 -val aggregates = aggregateFunctionsAndFieldIndexes._2 + val (aggFieldIndexes, aggregates) = + transformToAggregateFunctions(namedAggregates.map(_.getKey), + inputType, groupings.length) -val mapReturnType: RowTypeInfo = - createAggregateBufferDataType(groupings, aggregates, inputType) +createOperatorFunctionsForAggregates(namedAggregates, + inputType, + outputType, + groupings, + aggregates,aggFieldIndexes) +} -val mapFunction = new AggregateMapFunction[Row, Row]( -aggregates, aggFieldIndexes, groupings, - mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]] +def createOperatorFunctionsForAggregates( +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int], +aggregates:Array[Aggregate[_ <: Any]], +aggFieldIndexes:Array[Int]) +: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row])= { + + val mapFunction = createAggregateMapFunction(aggregates, +aggFieldIndexes, groupings, inputType) + + // the mapping relation between field index of intermediate aggregate Row and output Row. + val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings) + + // the mapping relation between aggregate function index in list and its corresponding + // field index in output Row. + val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType) + + if (groupingOffsetMapping.length != groupings.length || +aggOffsetMapping.length != namedAggregates.length) { +throw new TableException("Could not find output field in input data type " + + "or aggregate functions.") + } + + val allPartialAggregate = aggregates.map(_.supportPartial).forall(x => x) + + val intermediateRowArity = groupings.length + +aggregates.map(_.intermediateDataType.length).sum + + val reduceGroupFunction = +if (allPartialAggregate) { + new AggregateReduceCombineFunction( +aggregates, +groupingOffsetMapping, +aggOffsetMapping, +intermediateRowArity, +outputType.getFieldCount) +} +else { + new AggregateReduceGroupFunction( +aggregates, +groupingOffsetMapping, +aggOffsetMapping, +intermediateRowArity, +outputType.getFieldCount) +} + + (mapFunction, reduceGroupFunction) + } + + /** +* Create Flink operator functions for Incremental aggregates. +* It includes 2 implementations of Flink operator functions: +* [[org.apache.flink.api.common.functions.MapFunction]] and +* [[org.apache.flink.api.common.functions.ReduceFunction]] +* The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the +* intermediate aggregate values of all aggregate function, it's stored in Row by the following +* format: +* +* {{{ +* avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 +* | | +* v v +*+-+-+++++ +*|groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | +*+-+-+++++ +*
[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API
[ https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688485#comment-15688485 ] ASF GitHub Bot commented on FLINK-4937: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89242998 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -61,25 +61,108 @@ object AggregateUtil { * }}} * */ - def createOperatorFunctionsForAggregates( +def createOperatorFunctionsForAggregates( namedAggregates: Seq[CalcitePair[AggregateCall, String]], inputType: RelDataType, outputType: RelDataType, groupings: Array[Int]) : (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = { -val aggregateFunctionsAndFieldIndexes = - transformToAggregateFunctions(namedAggregates.map(_.getKey), inputType, groupings.length) -// store the aggregate fields of each aggregate function, by the same order of aggregates. -val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1 -val aggregates = aggregateFunctionsAndFieldIndexes._2 + val (aggFieldIndexes, aggregates) = + transformToAggregateFunctions(namedAggregates.map(_.getKey), + inputType, groupings.length) -val mapReturnType: RowTypeInfo = - createAggregateBufferDataType(groupings, aggregates, inputType) +createOperatorFunctionsForAggregates(namedAggregates, + inputType, + outputType, + groupings, + aggregates,aggFieldIndexes) +} -val mapFunction = new AggregateMapFunction[Row, Row]( -aggregates, aggFieldIndexes, groupings, - mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]] +def createOperatorFunctionsForAggregates( +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int], +aggregates:Array[Aggregate[_ <: Any]], +aggFieldIndexes:Array[Int]) +: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row])= { + + val mapFunction = createAggregateMapFunction(aggregates, +aggFieldIndexes, groupings, inputType) + + // the mapping relation between field index of intermediate aggregate Row and output Row. + val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings) + + // the mapping relation between aggregate function index in list and its corresponding + // field index in output Row. + val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType) + + if (groupingOffsetMapping.length != groupings.length || +aggOffsetMapping.length != namedAggregates.length) { +throw new TableException("Could not find output field in input data type " + + "or aggregate functions.") + } + + val allPartialAggregate = aggregates.map(_.supportPartial).forall(x => x) + + val intermediateRowArity = groupings.length + +aggregates.map(_.intermediateDataType.length).sum + + val reduceGroupFunction = +if (allPartialAggregate) { + new AggregateReduceCombineFunction( +aggregates, +groupingOffsetMapping, +aggOffsetMapping, +intermediateRowArity, +outputType.getFieldCount) +} +else { + new AggregateReduceGroupFunction( +aggregates, +groupingOffsetMapping, +aggOffsetMapping, +intermediateRowArity, +outputType.getFieldCount) +} + + (mapFunction, reduceGroupFunction) + } + + /** +* Create Flink operator functions for Incremental aggregates. +* It includes 2 implementations of Flink operator functions: +* [[org.apache.flink.api.common.functions.MapFunction]] and +* [[org.apache.flink.api.common.functions.ReduceFunction]] +* The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the +* intermediate aggregate values of all aggregate function, it's stored in Row by the following +* format: +* +* {{{ +* avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 +* | | +* v v +*
[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89244761 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -115,14 +124,210 @@ object AggregateUtil { intermediateRowArity, outputType.getFieldCount) } +groupReduceFunction + } + + /** +* Create IncrementalAggregateReduceFunction for Incremental aggregates. It implement +* [[org.apache.flink.api.common.functions.ReduceFunction]] +* +*/ + private[flink] def createIncrementalAggregateReduceFunction( +aggregates: Array[Aggregate[_ <: Any]], +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): IncrementalAggregateReduceFunction = { +val groupingOffsetMapping = + getGroupingOffsetAndaggOffsetMapping( +namedAggregates, +inputType, +outputType, +groupings)._1 +val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum +val reduceFunction = new IncrementalAggregateReduceFunction( + aggregates, + groupingOffsetMapping, + intermediateRowArity) +reduceFunction + } + + /** +* @return groupingOffsetMapping (mapping relation between field index of intermediate +* aggregate Row and output Row.) +* and aggOffsetMapping (the mapping relation between aggregate function index in list +* and its corresponding field index in output Row.) +*/ + def getGroupingOffsetAndaggOffsetMapping( --- End diff -- yes.i'd do it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API
[ https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688533#comment-15688533 ] ASF GitHub Bot commented on FLINK-4937: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89244761 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -115,14 +124,210 @@ object AggregateUtil { intermediateRowArity, outputType.getFieldCount) } +groupReduceFunction + } + + /** +* Create IncrementalAggregateReduceFunction for Incremental aggregates. It implement +* [[org.apache.flink.api.common.functions.ReduceFunction]] +* +*/ + private[flink] def createIncrementalAggregateReduceFunction( +aggregates: Array[Aggregate[_ <: Any]], +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): IncrementalAggregateReduceFunction = { +val groupingOffsetMapping = + getGroupingOffsetAndaggOffsetMapping( +namedAggregates, +inputType, +outputType, +groupings)._1 +val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum +val reduceFunction = new IncrementalAggregateReduceFunction( + aggregates, + groupingOffsetMapping, + intermediateRowArity) +reduceFunction + } + + /** +* @return groupingOffsetMapping (mapping relation between field index of intermediate +* aggregate Row and output Row.) +* and aggOffsetMapping (the mapping relation between aggregate function index in list +* and its corresponding field index in output Row.) +*/ + def getGroupingOffsetAndaggOffsetMapping( --- End diff -- yes.i'd do it. > Add incremental group window aggregation for streaming Table API > > > Key: FLINK-4937 > URL: https://issues.apache.org/jira/browse/FLINK-4937 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: sunjincheng > > Group-window aggregates for streaming tables are currently not done in an > incremental fashion. This means that the window collects all records and > performs the aggregation when the window is closed instead of eagerly > updating a partial aggregate for every added record. Since records are > buffered, non-incremental aggregation requires more storage space than > incremental aggregation. > The DataStream API which is used under the hood of the streaming Table API > features [incremental > aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation] > using a {{ReduceFunction}}. > We should add support for incremental aggregation in group-windows. > This is a follow-up task of FLINK-4691. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5122) Elasticsearch Sink loses documents when cluster has high load
[ https://issues.apache.org/jira/browse/FLINK-5122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686075#comment-15686075 ] Flavio Pompermaier commented on FLINK-5122: --- Can you try and see if our pull request fix your issue (https://github.com/apache/flink/pull/2790/)? We also had this problem of missing documents and the problem was the RunTimeException thrown by the close(). > Elasticsearch Sink loses documents when cluster has high load > - > > Key: FLINK-5122 > URL: https://issues.apache.org/jira/browse/FLINK-5122 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.2.0 >Reporter: static-max > > My cluster had high load and documents got not indexed. This violates the "at > least once" semantics in the ES connector. > I gave pressure on my cluster to test Flink, causing new indices to be > created and balanced. On those errors the bulk should be tried again instead > of being discarded. > Primary shard not active because ES decided to rebalance the index: > 2016-11-15 15:35:16,123 ERROR > org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink - > Failed to index document in Elasticsearch: > UnavailableShardsException[[index-name][3] primary shard is not active > Timeout: [1m], request: [BulkShardRequest to [index-name] containing [20] > requests]] > Bulk queue on node full (I set queue to a low value to reproduce error): > 22:37:57,702 ERROR > org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink - > Failed to index document in Elasticsearch: > RemoteTransportException[[node1][192.168.1.240:9300][indices:data/write/bulk[s][p]]]; > nested: EsRejectedExecutionException[rejected execution of > org.elasticsearch.transport.TransportService$4@727e677c on > EsThreadPoolExecutor[bulk, queue capacity = 1, > org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@51322d37[Running, > pool size = 2, active threads = 2, queued tasks = 1, completed tasks = > 2939]]]; > I can try to propose a PR for this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4895) Drop support for Hadoop 1
[ https://issues.apache.org/jira/browse/FLINK-4895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger reassigned FLINK-4895: - Assignee: Robert Metzger > Drop support for Hadoop 1 > - > > Key: FLINK-4895 > URL: https://issues.apache.org/jira/browse/FLINK-4895 > Project: Flink > Issue Type: Task > Components: Build System >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Robert Metzger > > As per this mailing list discussion: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DISCUSS-Drop-Hadoop-1-support-with-Flink-1-2-td9530.html > the community agreed to drop support for Hadoop 1. > The task includes > - removing the hadoop-1 / hadoop-2 build profiles, > - removing the scripts for generating hadoop-x poms > - updating the release script > - updating the nightly build script > - updating the travis configuration file > - updating the documentation > - updating the website -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5057) Cancellation timeouts are picked from wrong config
[ https://issues.apache.org/jira/browse/FLINK-5057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686230#comment-15686230 ] ASF GitHub Bot commented on FLINK-5057: --- Github user uce closed the pull request at: https://github.com/apache/flink/pull/2794 > Cancellation timeouts are picked from wrong config > -- > > Key: FLINK-5057 > URL: https://issues.apache.org/jira/browse/FLINK-5057 > Project: Flink > Issue Type: Bug >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > Fix For: 1.2.0, 1.1.4 > > > The cancellation timeouts are read from the Task configuration instead of the > TaskManager configuration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89101873 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -115,14 +124,210 @@ object AggregateUtil { intermediateRowArity, outputType.getFieldCount) } +groupReduceFunction + } + + /** +* Create IncrementalAggregateReduceFunction for Incremental aggregates. It implement +* [[org.apache.flink.api.common.functions.ReduceFunction]] +* +*/ + private[flink] def createIncrementalAggregateReduceFunction( +aggregates: Array[Aggregate[_ <: Any]], +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): IncrementalAggregateReduceFunction = { +val groupingOffsetMapping = + getGroupingOffsetAndaggOffsetMapping( +namedAggregates, +inputType, +outputType, +groupings)._1 +val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum +val reduceFunction = new IncrementalAggregateReduceFunction( + aggregates, + groupingOffsetMapping, + intermediateRowArity) +reduceFunction + } + + /** +* @return groupingOffsetMapping (mapping relation between field index of intermediate +* aggregate Row and output Row.) +* and aggOffsetMapping (the mapping relation between aggregate function index in list +* and its corresponding field index in output Row.) +*/ + def getGroupingOffsetAndaggOffsetMapping( +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = { + +// the mapping relation between field index of intermediate aggregate Row and output Row. +val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings) + +// the mapping relation between aggregate function index in list and its corresponding +// field index in output Row. +val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType) -(mapFunction, reduceGroupFunction) +if (groupingOffsetMapping.length != groupings.length || + aggOffsetMapping.length != namedAggregates.length) { + throw new TableException( +"Could not find output field in input data type " + + "or aggregate functions.") +} +(groupingOffsetMapping, aggOffsetMapping) + } + + + private[flink] def createAllWindowAggregationFunction( +window: LogicalWindow, +properties: Seq[NamedWindowProperty], +aggFunction: RichGroupReduceFunction[Row, Row]) + : AllWindowFunction[Row, Row, DataStreamWindow] = { + +if (isTimeWindow(window)) { + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos) + .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]] +} else { + new AggregateAllWindowFunction(aggFunction) +} + + } + + + private[flink] def createWindowAggregationFunction( +window: LogicalWindow, +properties: Seq[NamedWindowProperty], +aggFunction: RichGroupReduceFunction[Row, Row]) --- End diff -- Same as above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API
[ https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686702#comment-15686702 ] ASF GitHub Bot commented on FLINK-4937: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89088322 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -39,66 +45,69 @@ object AggregateUtil { type JavaList[T] = java.util.List[T] /** - * Create Flink operator functions for aggregates. It includes 2 implementations of Flink - * operator functions: - * [[org.apache.flink.api.common.functions.MapFunction]] and - * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's partial aggregate, - * should also implement [[org.apache.flink.api.common.functions.CombineFunction]] as well). - * The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the - * intermediate aggregate values of all aggregate function, it's stored in Row by the following - * format: - * - * {{{ - * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 - * | | - * v v - *+-+-+++++ - *|groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | - *+-+-+++++ - * ^ - * | - * sum(y) aggOffsetInRow = 4 - * }}} - * - */ - def createOperatorFunctionsForAggregates( - namedAggregates: Seq[CalcitePair[AggregateCall, String]], - inputType: RelDataType, - outputType: RelDataType, - groupings: Array[Int]) -: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = { - -val aggregateFunctionsAndFieldIndexes = - transformToAggregateFunctions(namedAggregates.map(_.getKey), inputType, groupings.length) -// store the aggregate fields of each aggregate function, by the same order of aggregates. -val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1 -val aggregates = aggregateFunctionsAndFieldIndexes._2 +* Create prepare MapFunction for aggregates. +* The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the +* intermediate aggregate values of all aggregate function, it's stored in Row by the following +* format: +* +* {{{ +* avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 +* | | +* v v +*+-+-+++++ +*|groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | +*+-+-+++++ +* ^ +* | +* sum(y) aggOffsetInRow = 4 +* }}} +* +*/ + private[flink] def createPrepareMapFunction( +aggregates: Array[Aggregate[_ <: Any]], --- End diff -- Let's just use `namedAggregates: Seq[CalcitePair[AggregateCall, String]]` and compute `aggregates` and `aggFieldIndexes` from it. > Add incremental group window aggregation for streaming Table API > > > Key: FLINK-4937 > URL: https://issues.apache.org/jira/browse/FLINK-4937 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: sunjincheng > > Group-window aggregates for streaming tables are currently not done in an > incremental fashion. This means that the window collects all records and > performs the aggregation when the window is closed instead of eagerly > updating a partial aggregate for every added record. Since records are > buffered, non-incremental aggregation requires more storage space than > incremental aggregation. > The DataStream API which is used under the hood of the streaming Table API > features [incremental > aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation] > using a {{ReduceFunction}}. > We should add support for incremental
[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API
[ https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686703#comment-15686703 ] ASF GitHub Bot commented on FLINK-4937: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89101976 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -115,14 +124,210 @@ object AggregateUtil { intermediateRowArity, outputType.getFieldCount) } +groupReduceFunction + } + + /** +* Create IncrementalAggregateReduceFunction for Incremental aggregates. It implement +* [[org.apache.flink.api.common.functions.ReduceFunction]] +* +*/ + private[flink] def createIncrementalAggregateReduceFunction( +aggregates: Array[Aggregate[_ <: Any]], +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): IncrementalAggregateReduceFunction = { +val groupingOffsetMapping = + getGroupingOffsetAndaggOffsetMapping( +namedAggregates, +inputType, +outputType, +groupings)._1 +val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum +val reduceFunction = new IncrementalAggregateReduceFunction( + aggregates, + groupingOffsetMapping, + intermediateRowArity) +reduceFunction + } + + /** +* @return groupingOffsetMapping (mapping relation between field index of intermediate +* aggregate Row and output Row.) +* and aggOffsetMapping (the mapping relation between aggregate function index in list +* and its corresponding field index in output Row.) +*/ + def getGroupingOffsetAndaggOffsetMapping( +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = { + +// the mapping relation between field index of intermediate aggregate Row and output Row. +val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings) + +// the mapping relation between aggregate function index in list and its corresponding +// field index in output Row. +val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType) -(mapFunction, reduceGroupFunction) +if (groupingOffsetMapping.length != groupings.length || + aggOffsetMapping.length != namedAggregates.length) { + throw new TableException( +"Could not find output field in input data type " + + "or aggregate functions.") +} +(groupingOffsetMapping, aggOffsetMapping) + } + + + private[flink] def createAllWindowAggregationFunction( +window: LogicalWindow, +properties: Seq[NamedWindowProperty], +aggFunction: RichGroupReduceFunction[Row, Row]) + : AllWindowFunction[Row, Row, DataStreamWindow] = { + +if (isTimeWindow(window)) { + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos) + .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]] +} else { + new AggregateAllWindowFunction(aggFunction) +} + + } + + + private[flink] def createWindowAggregationFunction( +window: LogicalWindow, +properties: Seq[NamedWindowProperty], +aggFunction: RichGroupReduceFunction[Row, Row]) + : WindowFunction[Row, Row, Tuple, DataStreamWindow] = { + +if (isTimeWindow(window)) { + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + new AggregateTimeWindowFunction(aggFunction, startPos, endPos) + .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]] +} else { + new AggregateWindowFunction(aggFunction) +} + + } + + private[flink] def createAllWindowIncrementalAggregationFunction( +window: LogicalWindow, +aggregates: Array[Aggregate[_ <: Any]], --- End diff -- compute `aggregates` from `namedAggregates`. > Add incremental group window aggregation for streaming Table API > > > Key: FLINK-4937 > URL: https://issues.apache.org/jira/browse/FLINK-4937 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions:
[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89090109 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -115,14 +124,210 @@ object AggregateUtil { intermediateRowArity, outputType.getFieldCount) } +groupReduceFunction + } + + /** +* Create IncrementalAggregateReduceFunction for Incremental aggregates. It implement +* [[org.apache.flink.api.common.functions.ReduceFunction]] +* +*/ + private[flink] def createIncrementalAggregateReduceFunction( +aggregates: Array[Aggregate[_ <: Any]], +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): IncrementalAggregateReduceFunction = { +val groupingOffsetMapping = + getGroupingOffsetAndaggOffsetMapping( +namedAggregates, +inputType, +outputType, +groupings)._1 +val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum +val reduceFunction = new IncrementalAggregateReduceFunction( + aggregates, + groupingOffsetMapping, + intermediateRowArity) +reduceFunction + } + + /** +* @return groupingOffsetMapping (mapping relation between field index of intermediate +* aggregate Row and output Row.) +* and aggOffsetMapping (the mapping relation between aggregate function index in list +* and its corresponding field index in output Row.) +*/ + def getGroupingOffsetAndaggOffsetMapping( +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = { + +// the mapping relation between field index of intermediate aggregate Row and output Row. +val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings) + +// the mapping relation between aggregate function index in list and its corresponding +// field index in output Row. +val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType) -(mapFunction, reduceGroupFunction) +if (groupingOffsetMapping.length != groupings.length || + aggOffsetMapping.length != namedAggregates.length) { + throw new TableException( +"Could not find output field in input data type " + + "or aggregate functions.") +} +(groupingOffsetMapping, aggOffsetMapping) + } + + + private[flink] def createAllWindowAggregationFunction( +window: LogicalWindow, +properties: Seq[NamedWindowProperty], +aggFunction: RichGroupReduceFunction[Row, Row]) + : AllWindowFunction[Row, Row, DataStreamWindow] = { + +if (isTimeWindow(window)) { + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos) + .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]] +} else { + new AggregateAllWindowFunction(aggFunction) +} + + } + + + private[flink] def createWindowAggregationFunction( +window: LogicalWindow, +properties: Seq[NamedWindowProperty], +aggFunction: RichGroupReduceFunction[Row, Row]) + : WindowFunction[Row, Row, Tuple, DataStreamWindow] = { + +if (isTimeWindow(window)) { + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + new AggregateTimeWindowFunction(aggFunction, startPos, endPos) + .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]] +} else { + new AggregateWindowFunction(aggFunction) +} + + } + + private[flink] def createAllWindowIncrementalAggregationFunction( +window: LogicalWindow, +aggregates: Array[Aggregate[_ <: Any]], +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int], +properties: Seq[NamedWindowProperty]) + : AllWindowFunction[Row, Row, DataStreamWindow] = { + +val (groupingOffsetMapping, aggOffsetMapping) = + getGroupingOffsetAndaggOffsetMapping( + namedAggregates, + inputType, + outputType, + groupings) + +val finalRowArity = outputType.getFieldCount + +if (isTimeWindow(window)) { + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + new
[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API
[ https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686709#comment-15686709 ] ASF GitHub Bot commented on FLINK-4937: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89088819 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -39,66 +45,69 @@ object AggregateUtil { type JavaList[T] = java.util.List[T] /** - * Create Flink operator functions for aggregates. It includes 2 implementations of Flink - * operator functions: - * [[org.apache.flink.api.common.functions.MapFunction]] and - * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's partial aggregate, - * should also implement [[org.apache.flink.api.common.functions.CombineFunction]] as well). - * The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the - * intermediate aggregate values of all aggregate function, it's stored in Row by the following - * format: - * - * {{{ - * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 - * | | - * v v - *+-+-+++++ - *|groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | - *+-+-+++++ - * ^ - * | - * sum(y) aggOffsetInRow = 4 - * }}} - * - */ - def createOperatorFunctionsForAggregates( - namedAggregates: Seq[CalcitePair[AggregateCall, String]], - inputType: RelDataType, - outputType: RelDataType, - groupings: Array[Int]) -: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = { - -val aggregateFunctionsAndFieldIndexes = - transformToAggregateFunctions(namedAggregates.map(_.getKey), inputType, groupings.length) -// store the aggregate fields of each aggregate function, by the same order of aggregates. -val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1 -val aggregates = aggregateFunctionsAndFieldIndexes._2 +* Create prepare MapFunction for aggregates. +* The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the +* intermediate aggregate values of all aggregate function, it's stored in Row by the following +* format: +* +* {{{ +* avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 +* | | +* v v +*+-+-+++++ +*|groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | +*+-+-+++++ +* ^ +* | +* sum(y) aggOffsetInRow = 4 +* }}} +* +*/ + private[flink] def createPrepareMapFunction( +aggregates: Array[Aggregate[_ <: Any]], --- End diff -- I think we should make the arguments of the `create*Function` methods more consistent. Either we use Calcite's `namedAggregates` or we use the transformed `aggregates` and `aggFieldIndexes`. > Add incremental group window aggregation for streaming Table API > > > Key: FLINK-4937 > URL: https://issues.apache.org/jira/browse/FLINK-4937 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: sunjincheng > > Group-window aggregates for streaming tables are currently not done in an > incremental fashion. This means that the window collects all records and > performs the aggregation when the window is closed instead of eagerly > updating a partial aggregate for every added record. Since records are > buffered, non-incremental aggregation requires more storage space than > incremental aggregation. > The DataStream API which is used under the hood of the streaming Table API > features [incremental > aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation] > using a
[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89088166 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -39,66 +45,69 @@ object AggregateUtil { type JavaList[T] = java.util.List[T] /** - * Create Flink operator functions for aggregates. It includes 2 implementations of Flink - * operator functions: - * [[org.apache.flink.api.common.functions.MapFunction]] and - * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's partial aggregate, - * should also implement [[org.apache.flink.api.common.functions.CombineFunction]] as well). - * The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the - * intermediate aggregate values of all aggregate function, it's stored in Row by the following - * format: - * - * {{{ - * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 - * | | - * v v - *+-+-+++++ - *|groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | - *+-+-+++++ - * ^ - * | - * sum(y) aggOffsetInRow = 4 - * }}} - * - */ - def createOperatorFunctionsForAggregates( - namedAggregates: Seq[CalcitePair[AggregateCall, String]], - inputType: RelDataType, - outputType: RelDataType, - groupings: Array[Int]) -: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = { - -val aggregateFunctionsAndFieldIndexes = - transformToAggregateFunctions(namedAggregates.map(_.getKey), inputType, groupings.length) -// store the aggregate fields of each aggregate function, by the same order of aggregates. -val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1 -val aggregates = aggregateFunctionsAndFieldIndexes._2 +* Create prepare MapFunction for aggregates. +* The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the +* intermediate aggregate values of all aggregate function, it's stored in Row by the following +* format: +* +* {{{ +* avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 +* | | +* v v +*+-+-+++++ +*|groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | +*+-+-+++++ +* ^ +* | +* sum(y) aggOffsetInRow = 4 +* }}} +* +*/ + private[flink] def createPrepareMapFunction( +aggregates: Array[Aggregate[_ <: Any]], +aggFieldIndexes: Array[Int], +groupings: Array[Int], +inputType: +RelDataType): MapFunction[Any, Row] = { val mapReturnType: RowTypeInfo = createAggregateBufferDataType(groupings, aggregates, inputType) val mapFunction = new AggregateMapFunction[Row, Row]( -aggregates, aggFieldIndexes, groupings, - mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]] - -// the mapping relation between field index of intermediate aggregate Row and output Row. -val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings) - -// the mapping relation between aggregate function index in list and its corresponding -// field index in output Row. -val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType) - -if (groupingOffsetMapping.length != groupings.length || -aggOffsetMapping.length != namedAggregates.length) { - throw new TableException("Could not find output field in input data type " + - "or aggregate functions.") -} - -val allPartialAggregate = aggregates.map(_.supportPartial).forall(x => x) + aggregates, + aggFieldIndexes, + groupings, + mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]] -val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum +mapFunction
[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89089242 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -115,14 +124,210 @@ object AggregateUtil { intermediateRowArity, outputType.getFieldCount) } +groupReduceFunction + } + + /** +* Create IncrementalAggregateReduceFunction for Incremental aggregates. It implement +* [[org.apache.flink.api.common.functions.ReduceFunction]] +* +*/ + private[flink] def createIncrementalAggregateReduceFunction( +aggregates: Array[Aggregate[_ <: Any]], +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): IncrementalAggregateReduceFunction = { +val groupingOffsetMapping = + getGroupingOffsetAndaggOffsetMapping( +namedAggregates, +inputType, +outputType, +groupings)._1 +val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum +val reduceFunction = new IncrementalAggregateReduceFunction( + aggregates, + groupingOffsetMapping, + intermediateRowArity) +reduceFunction + } + + /** +* @return groupingOffsetMapping (mapping relation between field index of intermediate +* aggregate Row and output Row.) +* and aggOffsetMapping (the mapping relation between aggregate function index in list +* and its corresponding field index in output Row.) +*/ + def getGroupingOffsetAndaggOffsetMapping( --- End diff -- Can you move this method below the `create*Function` methods and make it `private`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89102066 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -115,14 +124,210 @@ object AggregateUtil { intermediateRowArity, outputType.getFieldCount) } +groupReduceFunction + } + + /** +* Create IncrementalAggregateReduceFunction for Incremental aggregates. It implement +* [[org.apache.flink.api.common.functions.ReduceFunction]] +* +*/ + private[flink] def createIncrementalAggregateReduceFunction( +aggregates: Array[Aggregate[_ <: Any]], +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): IncrementalAggregateReduceFunction = { +val groupingOffsetMapping = + getGroupingOffsetAndaggOffsetMapping( +namedAggregates, +inputType, +outputType, +groupings)._1 +val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum +val reduceFunction = new IncrementalAggregateReduceFunction( + aggregates, + groupingOffsetMapping, + intermediateRowArity) +reduceFunction + } + + /** +* @return groupingOffsetMapping (mapping relation between field index of intermediate +* aggregate Row and output Row.) +* and aggOffsetMapping (the mapping relation between aggregate function index in list +* and its corresponding field index in output Row.) +*/ + def getGroupingOffsetAndaggOffsetMapping( +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = { + +// the mapping relation between field index of intermediate aggregate Row and output Row. +val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings) + +// the mapping relation between aggregate function index in list and its corresponding +// field index in output Row. +val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType) -(mapFunction, reduceGroupFunction) +if (groupingOffsetMapping.length != groupings.length || + aggOffsetMapping.length != namedAggregates.length) { + throw new TableException( +"Could not find output field in input data type " + + "or aggregate functions.") +} +(groupingOffsetMapping, aggOffsetMapping) + } + + + private[flink] def createAllWindowAggregationFunction( +window: LogicalWindow, +properties: Seq[NamedWindowProperty], +aggFunction: RichGroupReduceFunction[Row, Row]) + : AllWindowFunction[Row, Row, DataStreamWindow] = { + +if (isTimeWindow(window)) { + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos) + .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]] +} else { + new AggregateAllWindowFunction(aggFunction) +} + + } + + + private[flink] def createWindowAggregationFunction( +window: LogicalWindow, +properties: Seq[NamedWindowProperty], +aggFunction: RichGroupReduceFunction[Row, Row]) + : WindowFunction[Row, Row, Tuple, DataStreamWindow] = { + +if (isTimeWindow(window)) { + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + new AggregateTimeWindowFunction(aggFunction, startPos, endPos) + .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]] +} else { + new AggregateWindowFunction(aggFunction) +} + + } + + private[flink] def createAllWindowIncrementalAggregationFunction( +window: LogicalWindow, +aggregates: Array[Aggregate[_ <: Any]], +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int], +properties: Seq[NamedWindowProperty]) + : AllWindowFunction[Row, Row, DataStreamWindow] = { + +val (groupingOffsetMapping, aggOffsetMapping) = + getGroupingOffsetAndaggOffsetMapping( + namedAggregates, + inputType, + outputType, + groupings) + +val finalRowArity = outputType.getFieldCount + +if (isTimeWindow(window)) { + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + new
[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89101806 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -115,14 +124,210 @@ object AggregateUtil { intermediateRowArity, outputType.getFieldCount) } +groupReduceFunction + } + + /** +* Create IncrementalAggregateReduceFunction for Incremental aggregates. It implement +* [[org.apache.flink.api.common.functions.ReduceFunction]] +* +*/ + private[flink] def createIncrementalAggregateReduceFunction( +aggregates: Array[Aggregate[_ <: Any]], +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): IncrementalAggregateReduceFunction = { +val groupingOffsetMapping = + getGroupingOffsetAndaggOffsetMapping( +namedAggregates, +inputType, +outputType, +groupings)._1 +val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum +val reduceFunction = new IncrementalAggregateReduceFunction( + aggregates, + groupingOffsetMapping, + intermediateRowArity) +reduceFunction + } + + /** +* @return groupingOffsetMapping (mapping relation between field index of intermediate +* aggregate Row and output Row.) +* and aggOffsetMapping (the mapping relation between aggregate function index in list +* and its corresponding field index in output Row.) +*/ + def getGroupingOffsetAndaggOffsetMapping( +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = { + +// the mapping relation between field index of intermediate aggregate Row and output Row. +val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings) + +// the mapping relation between aggregate function index in list and its corresponding +// field index in output Row. +val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType) -(mapFunction, reduceGroupFunction) +if (groupingOffsetMapping.length != groupings.length || + aggOffsetMapping.length != namedAggregates.length) { + throw new TableException( +"Could not find output field in input data type " + + "or aggregate functions.") +} +(groupingOffsetMapping, aggOffsetMapping) + } + + + private[flink] def createAllWindowAggregationFunction( +window: LogicalWindow, +properties: Seq[NamedWindowProperty], +aggFunction: RichGroupReduceFunction[Row, Row]) --- End diff -- I'd generate the `GroupReduceFunction` directly in this method. That will make the calling code simpler. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89086106 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -39,66 +45,69 @@ object AggregateUtil { type JavaList[T] = java.util.List[T] /** - * Create Flink operator functions for aggregates. It includes 2 implementations of Flink - * operator functions: - * [[org.apache.flink.api.common.functions.MapFunction]] and - * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's partial aggregate, - * should also implement [[org.apache.flink.api.common.functions.CombineFunction]] as well). - * The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the - * intermediate aggregate values of all aggregate function, it's stored in Row by the following - * format: - * - * {{{ - * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 - * | | - * v v - *+-+-+++++ - *|groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | - *+-+-+++++ - * ^ - * | - * sum(y) aggOffsetInRow = 4 - * }}} - * - */ - def createOperatorFunctionsForAggregates( - namedAggregates: Seq[CalcitePair[AggregateCall, String]], - inputType: RelDataType, - outputType: RelDataType, - groupings: Array[Int]) -: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = { - -val aggregateFunctionsAndFieldIndexes = - transformToAggregateFunctions(namedAggregates.map(_.getKey), inputType, groupings.length) -// store the aggregate fields of each aggregate function, by the same order of aggregates. -val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1 -val aggregates = aggregateFunctionsAndFieldIndexes._2 +* Create prepare MapFunction for aggregates. +* The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the +* intermediate aggregate values of all aggregate function, it's stored in Row by the following +* format: +* +* {{{ +* avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 +* | | +* v v +*+-+-+++++ +*|groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | +*+-+-+++++ +* ^ +* | +* sum(y) aggOffsetInRow = 4 +* }}} +* +*/ + private[flink] def createPrepareMapFunction( +aggregates: Array[Aggregate[_ <: Any]], +aggFieldIndexes: Array[Int], +groupings: Array[Int], +inputType: +RelDataType): MapFunction[Any, Row] = { --- End diff -- remove this line break. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89088443 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -115,14 +124,210 @@ object AggregateUtil { intermediateRowArity, outputType.getFieldCount) } +groupReduceFunction + } + + /** +* Create IncrementalAggregateReduceFunction for Incremental aggregates. It implement +* [[org.apache.flink.api.common.functions.ReduceFunction]] +* +*/ + private[flink] def createIncrementalAggregateReduceFunction( +aggregates: Array[Aggregate[_ <: Any]], --- End diff -- can we just use `namedAggregates` and compute `aggregates` from it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2822: [FLINK-5075] [kinesis] Make Kinesis consumer fail-proof t...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2822 Thanks for the review @StephanEwen. I'm pretty sure this doesn't affect the normal Kinesis shard discovery. I'll give it some final tests before merging (would like to get this in before the next 1.1.4 RC). Yes, using `describeStream(streamName)` only will be problematic for users with large numbers of shards, because the whole list may not be able to be returned in a single call. So that's most likely not a solution we can consider. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5075) Kinesis consumer incorrectly determines shards as newly discovered when tested against Kinesalite
[ https://issues.apache.org/jira/browse/FLINK-5075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686551#comment-15686551 ] ASF GitHub Bot commented on FLINK-5075: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2822#discussion_r89100942 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -283,7 +285,7 @@ public String getShardIterator(KinesisStreamShard shard, String shardIteratorTyp * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result) * @return the result of the describe stream operation */ - private DescribeStreamResult describeStream(String streamName, String startShardId) throws InterruptedException { + private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException { --- End diff -- Does only Kinesalite supply the `startShardId` parameter? > Kinesis consumer incorrectly determines shards as newly discovered when > tested against Kinesalite > - > > Key: FLINK-5075 > URL: https://issues.apache.org/jira/browse/FLINK-5075 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > A user reported that when our Kinesis connector is used against Kinesalite > (https://github.com/mhart/kinesalite), we're incorrectly determining already > found shards as newly discovered: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Subtask-keeps-on-discovering-new-Kinesis-shard-when-using-Kinesalite-td10133.html > I suspect the problem to be the mock Kinesis API implementations of > Kinesalite doesn't completely match with the official AWS Kinesis behaviour. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5061) Remove ContinuousEventTimeTrigger
[ https://issues.apache.org/jira/browse/FLINK-5061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686800#comment-15686800 ] Aljoscha Krettek commented on FLINK-5061: - We can also leave it in since it's not a maintenance overhead. My main motivation for removing this was that I saw too many people on the mailing lists that were using this but were not aware of what it actually does. > Remove ContinuousEventTimeTrigger > - > > Key: FLINK-5061 > URL: https://issues.apache.org/jira/browse/FLINK-5061 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 1.2.0 > > > The Trigger does not do what people think it does. The problem is that a > watermark T signals that we won't see elements with timestamp < T in the > future. It does not signal that we have not yet seen elements with timestamp > > T. So it cannot really be used to trigger at different stages of processing > an event-time window. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2822: [FLINK-5075] [kinesis] Make Kinesis consumer fail-...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2822#discussion_r89119126 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -283,7 +285,7 @@ public String getShardIterator(KinesisStreamShard shard, String shardIteratorTyp * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result) * @return the result of the describe stream operation */ - private DescribeStreamResult describeStream(String streamName, String startShardId) throws InterruptedException { + private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException { --- End diff -- Only Kinesis does. Kinesalite incorrectly ignores `startShardId`. It's marked `Nullable` here, because on fresh startups this method will be called with a `null` as the start ID (on startup there will be no shard Id to start from). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5075) Kinesis consumer incorrectly determines shards as newly discovered when tested against Kinesalite
[ https://issues.apache.org/jira/browse/FLINK-5075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686816#comment-15686816 ] ASF GitHub Bot commented on FLINK-5075: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2822#discussion_r89119126 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -283,7 +285,7 @@ public String getShardIterator(KinesisStreamShard shard, String shardIteratorTyp * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result) * @return the result of the describe stream operation */ - private DescribeStreamResult describeStream(String streamName, String startShardId) throws InterruptedException { + private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException { --- End diff -- Only Kinesis does. Kinesalite incorrectly ignores `startShardId`. It's marked `Nullable` here, because on fresh startups this method will be called with a `null` as the start ID (on startup there will be no shard Id to start from). > Kinesis consumer incorrectly determines shards as newly discovered when > tested against Kinesalite > - > > Key: FLINK-5075 > URL: https://issues.apache.org/jira/browse/FLINK-5075 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > A user reported that when our Kinesis connector is used against Kinesalite > (https://github.com/mhart/kinesalite), we're incorrectly determining already > found shards as newly discovered: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Subtask-keeps-on-discovering-new-Kinesis-shard-when-using-Kinesalite-td10133.html > I suspect the problem to be the mock Kinesis API implementations of > Kinesalite doesn't completely match with the official AWS Kinesis behaviour. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer
[ https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686760#comment-15686760 ] ASF GitHub Bot commented on FLINK-4964: --- Github user tfournier314 commented on the issue: https://github.com/apache/flink/pull/2740 Hello @thvasilo @greghogan Ok I've updated documentation. I stay tuned for updating code. Regards Thomas > FlinkML - Add StringIndexer > --- > > Key: FLINK-4964 > URL: https://issues.apache.org/jira/browse/FLINK-4964 > Project: Flink > Issue Type: New Feature >Reporter: Thomas FOURNIER >Priority: Minor > > Add StringIndexer as described here: > http://spark.apache.org/docs/latest/ml-features.html#stringindexer > This will be added in package preprocessing of FlinkML -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2740: [FLINK-4964] [ml]
Github user tfournier314 commented on the issue: https://github.com/apache/flink/pull/2740 Hello @thvasilo @greghogan Ok I've updated documentation. I stay tuned for updating code. Regards Thomas --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89088819 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -39,66 +45,69 @@ object AggregateUtil { type JavaList[T] = java.util.List[T] /** - * Create Flink operator functions for aggregates. It includes 2 implementations of Flink - * operator functions: - * [[org.apache.flink.api.common.functions.MapFunction]] and - * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's partial aggregate, - * should also implement [[org.apache.flink.api.common.functions.CombineFunction]] as well). - * The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the - * intermediate aggregate values of all aggregate function, it's stored in Row by the following - * format: - * - * {{{ - * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 - * | | - * v v - *+-+-+++++ - *|groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | - *+-+-+++++ - * ^ - * | - * sum(y) aggOffsetInRow = 4 - * }}} - * - */ - def createOperatorFunctionsForAggregates( - namedAggregates: Seq[CalcitePair[AggregateCall, String]], - inputType: RelDataType, - outputType: RelDataType, - groupings: Array[Int]) -: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = { - -val aggregateFunctionsAndFieldIndexes = - transformToAggregateFunctions(namedAggregates.map(_.getKey), inputType, groupings.length) -// store the aggregate fields of each aggregate function, by the same order of aggregates. -val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1 -val aggregates = aggregateFunctionsAndFieldIndexes._2 +* Create prepare MapFunction for aggregates. +* The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the +* intermediate aggregate values of all aggregate function, it's stored in Row by the following +* format: +* +* {{{ +* avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 +* | | +* v v +*+-+-+++++ +*|groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | +*+-+-+++++ +* ^ +* | +* sum(y) aggOffsetInRow = 4 +* }}} +* +*/ + private[flink] def createPrepareMapFunction( +aggregates: Array[Aggregate[_ <: Any]], --- End diff -- I think we should make the arguments of the `create*Function` methods more consistent. Either we use Calcite's `namedAggregates` or we use the transformed `aggregates` and `aggFieldIndexes`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API
[ https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686711#comment-15686711 ] ASF GitHub Bot commented on FLINK-4937: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89102066 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -115,14 +124,210 @@ object AggregateUtil { intermediateRowArity, outputType.getFieldCount) } +groupReduceFunction + } + + /** +* Create IncrementalAggregateReduceFunction for Incremental aggregates. It implement +* [[org.apache.flink.api.common.functions.ReduceFunction]] +* +*/ + private[flink] def createIncrementalAggregateReduceFunction( +aggregates: Array[Aggregate[_ <: Any]], +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): IncrementalAggregateReduceFunction = { +val groupingOffsetMapping = + getGroupingOffsetAndaggOffsetMapping( +namedAggregates, +inputType, +outputType, +groupings)._1 +val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum +val reduceFunction = new IncrementalAggregateReduceFunction( + aggregates, + groupingOffsetMapping, + intermediateRowArity) +reduceFunction + } + + /** +* @return groupingOffsetMapping (mapping relation between field index of intermediate +* aggregate Row and output Row.) +* and aggOffsetMapping (the mapping relation between aggregate function index in list +* and its corresponding field index in output Row.) +*/ + def getGroupingOffsetAndaggOffsetMapping( +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = { + +// the mapping relation between field index of intermediate aggregate Row and output Row. +val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings) + +// the mapping relation between aggregate function index in list and its corresponding +// field index in output Row. +val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType) -(mapFunction, reduceGroupFunction) +if (groupingOffsetMapping.length != groupings.length || + aggOffsetMapping.length != namedAggregates.length) { + throw new TableException( +"Could not find output field in input data type " + + "or aggregate functions.") +} +(groupingOffsetMapping, aggOffsetMapping) + } + + + private[flink] def createAllWindowAggregationFunction( +window: LogicalWindow, +properties: Seq[NamedWindowProperty], +aggFunction: RichGroupReduceFunction[Row, Row]) + : AllWindowFunction[Row, Row, DataStreamWindow] = { + +if (isTimeWindow(window)) { + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos) + .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]] +} else { + new AggregateAllWindowFunction(aggFunction) +} + + } + + + private[flink] def createWindowAggregationFunction( +window: LogicalWindow, +properties: Seq[NamedWindowProperty], +aggFunction: RichGroupReduceFunction[Row, Row]) + : WindowFunction[Row, Row, Tuple, DataStreamWindow] = { + +if (isTimeWindow(window)) { + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + new AggregateTimeWindowFunction(aggFunction, startPos, endPos) + .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]] +} else { + new AggregateWindowFunction(aggFunction) +} + + } + + private[flink] def createAllWindowIncrementalAggregationFunction( +window: LogicalWindow, +aggregates: Array[Aggregate[_ <: Any]], +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int], +properties: Seq[NamedWindowProperty]) + : AllWindowFunction[Row, Row, DataStreamWindow] = { + +val (groupingOffsetMapping, aggOffsetMapping) = + getGroupingOffsetAndaggOffsetMapping( + namedAggregates, + inputType, +
[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API
[ https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686705#comment-15686705 ] ASF GitHub Bot commented on FLINK-4937: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89088443 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -115,14 +124,210 @@ object AggregateUtil { intermediateRowArity, outputType.getFieldCount) } +groupReduceFunction + } + + /** +* Create IncrementalAggregateReduceFunction for Incremental aggregates. It implement +* [[org.apache.flink.api.common.functions.ReduceFunction]] +* +*/ + private[flink] def createIncrementalAggregateReduceFunction( +aggregates: Array[Aggregate[_ <: Any]], --- End diff -- can we just use `namedAggregates` and compute `aggregates` from it? > Add incremental group window aggregation for streaming Table API > > > Key: FLINK-4937 > URL: https://issues.apache.org/jira/browse/FLINK-4937 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: sunjincheng > > Group-window aggregates for streaming tables are currently not done in an > incremental fashion. This means that the window collects all records and > performs the aggregation when the window is closed instead of eagerly > updating a partial aggregate for every added record. Since records are > buffered, non-incremental aggregation requires more storage space than > incremental aggregation. > The DataStream API which is used under the hood of the streaming Table API > features [incremental > aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation] > using a {{ReduceFunction}}. > We should add support for incremental aggregation in group-windows. > This is a follow-up task of FLINK-4691. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API
[ https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686700#comment-15686700 ] ASF GitHub Bot commented on FLINK-4937: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89089095 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -115,14 +124,210 @@ object AggregateUtil { intermediateRowArity, outputType.getFieldCount) } +groupReduceFunction + } + + /** +* Create IncrementalAggregateReduceFunction for Incremental aggregates. It implement +* [[org.apache.flink.api.common.functions.ReduceFunction]] +* +*/ + private[flink] def createIncrementalAggregateReduceFunction( +aggregates: Array[Aggregate[_ <: Any]], +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): IncrementalAggregateReduceFunction = { +val groupingOffsetMapping = + getGroupingOffsetAndaggOffsetMapping( +namedAggregates, +inputType, +outputType, +groupings)._1 +val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum +val reduceFunction = new IncrementalAggregateReduceFunction( + aggregates, + groupingOffsetMapping, + intermediateRowArity) +reduceFunction + } + + /** +* @return groupingOffsetMapping (mapping relation between field index of intermediate +* aggregate Row and output Row.) +* and aggOffsetMapping (the mapping relation between aggregate function index in list +* and its corresponding field index in output Row.) +*/ + def getGroupingOffsetAndaggOffsetMapping( --- End diff -- Please fix camel case: `getGroupingOffsetAndAggOffsetMapping` > Add incremental group window aggregation for streaming Table API > > > Key: FLINK-4937 > URL: https://issues.apache.org/jira/browse/FLINK-4937 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: sunjincheng > > Group-window aggregates for streaming tables are currently not done in an > incremental fashion. This means that the window collects all records and > performs the aggregation when the window is closed instead of eagerly > updating a partial aggregate for every added record. Since records are > buffered, non-incremental aggregation requires more storage space than > incremental aggregation. > The DataStream API which is used under the hood of the streaming Table API > features [incremental > aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation] > using a {{ReduceFunction}}. > We should add support for incremental aggregation in group-windows. > This is a follow-up task of FLINK-4691. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89086247 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -39,66 +45,69 @@ object AggregateUtil { type JavaList[T] = java.util.List[T] /** - * Create Flink operator functions for aggregates. It includes 2 implementations of Flink - * operator functions: - * [[org.apache.flink.api.common.functions.MapFunction]] and - * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's partial aggregate, - * should also implement [[org.apache.flink.api.common.functions.CombineFunction]] as well). - * The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the - * intermediate aggregate values of all aggregate function, it's stored in Row by the following - * format: - * - * {{{ - * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 - * | | - * v v - *+-+-+++++ - *|groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | - *+-+-+++++ - * ^ - * | - * sum(y) aggOffsetInRow = 4 - * }}} - * - */ - def createOperatorFunctionsForAggregates( - namedAggregates: Seq[CalcitePair[AggregateCall, String]], - inputType: RelDataType, - outputType: RelDataType, - groupings: Array[Int]) -: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = { - -val aggregateFunctionsAndFieldIndexes = - transformToAggregateFunctions(namedAggregates.map(_.getKey), inputType, groupings.length) -// store the aggregate fields of each aggregate function, by the same order of aggregates. -val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1 -val aggregates = aggregateFunctionsAndFieldIndexes._2 +* Create prepare MapFunction for aggregates. +* The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the +* intermediate aggregate values of all aggregate function, it's stored in Row by the following +* format: +* +* {{{ +* avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 +* | | +* v v +*+-+-+++++ +*|groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | +*+-+-+++++ +* ^ +* | +* sum(y) aggOffsetInRow = 4 +* }}} +* +*/ + private[flink] def createPrepareMapFunction( +aggregates: Array[Aggregate[_ <: Any]], +aggFieldIndexes: Array[Int], +groupings: Array[Int], +inputType: +RelDataType): MapFunction[Any, Row] = { val mapReturnType: RowTypeInfo = createAggregateBufferDataType(groupings, aggregates, inputType) val mapFunction = new AggregateMapFunction[Row, Row]( -aggregates, aggFieldIndexes, groupings, - mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]] - -// the mapping relation between field index of intermediate aggregate Row and output Row. -val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings) - -// the mapping relation between aggregate function index in list and its corresponding -// field index in output Row. -val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType) - -if (groupingOffsetMapping.length != groupings.length || -aggOffsetMapping.length != namedAggregates.length) { - throw new TableException("Could not find output field in input data type " + - "or aggregate functions.") -} - -val allPartialAggregate = aggregates.map(_.supportPartial).forall(x => x) + aggregates, + aggFieldIndexes, + groupings, + mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]] -val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum +mapFunction
[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89088322 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -39,66 +45,69 @@ object AggregateUtil { type JavaList[T] = java.util.List[T] /** - * Create Flink operator functions for aggregates. It includes 2 implementations of Flink - * operator functions: - * [[org.apache.flink.api.common.functions.MapFunction]] and - * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's partial aggregate, - * should also implement [[org.apache.flink.api.common.functions.CombineFunction]] as well). - * The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the - * intermediate aggregate values of all aggregate function, it's stored in Row by the following - * format: - * - * {{{ - * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 - * | | - * v v - *+-+-+++++ - *|groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | - *+-+-+++++ - * ^ - * | - * sum(y) aggOffsetInRow = 4 - * }}} - * - */ - def createOperatorFunctionsForAggregates( - namedAggregates: Seq[CalcitePair[AggregateCall, String]], - inputType: RelDataType, - outputType: RelDataType, - groupings: Array[Int]) -: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = { - -val aggregateFunctionsAndFieldIndexes = - transformToAggregateFunctions(namedAggregates.map(_.getKey), inputType, groupings.length) -// store the aggregate fields of each aggregate function, by the same order of aggregates. -val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1 -val aggregates = aggregateFunctionsAndFieldIndexes._2 +* Create prepare MapFunction for aggregates. +* The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the +* intermediate aggregate values of all aggregate function, it's stored in Row by the following +* format: +* +* {{{ +* avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 +* | | +* v v +*+-+-+++++ +*|groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | +*+-+-+++++ +* ^ +* | +* sum(y) aggOffsetInRow = 4 +* }}} +* +*/ + private[flink] def createPrepareMapFunction( +aggregates: Array[Aggregate[_ <: Any]], --- End diff -- Let's just use `namedAggregates: Seq[CalcitePair[AggregateCall, String]]` and compute `aggregates` and `aggFieldIndexes` from it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API
[ https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686712#comment-15686712 ] ASF GitHub Bot commented on FLINK-4937: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89090109 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -115,14 +124,210 @@ object AggregateUtil { intermediateRowArity, outputType.getFieldCount) } +groupReduceFunction + } + + /** +* Create IncrementalAggregateReduceFunction for Incremental aggregates. It implement +* [[org.apache.flink.api.common.functions.ReduceFunction]] +* +*/ + private[flink] def createIncrementalAggregateReduceFunction( +aggregates: Array[Aggregate[_ <: Any]], +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): IncrementalAggregateReduceFunction = { +val groupingOffsetMapping = + getGroupingOffsetAndaggOffsetMapping( +namedAggregates, +inputType, +outputType, +groupings)._1 +val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum +val reduceFunction = new IncrementalAggregateReduceFunction( + aggregates, + groupingOffsetMapping, + intermediateRowArity) +reduceFunction + } + + /** +* @return groupingOffsetMapping (mapping relation between field index of intermediate +* aggregate Row and output Row.) +* and aggOffsetMapping (the mapping relation between aggregate function index in list +* and its corresponding field index in output Row.) +*/ + def getGroupingOffsetAndaggOffsetMapping( +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = { + +// the mapping relation between field index of intermediate aggregate Row and output Row. +val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings) + +// the mapping relation between aggregate function index in list and its corresponding +// field index in output Row. +val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType) -(mapFunction, reduceGroupFunction) +if (groupingOffsetMapping.length != groupings.length || + aggOffsetMapping.length != namedAggregates.length) { + throw new TableException( +"Could not find output field in input data type " + + "or aggregate functions.") +} +(groupingOffsetMapping, aggOffsetMapping) + } + + + private[flink] def createAllWindowAggregationFunction( +window: LogicalWindow, +properties: Seq[NamedWindowProperty], +aggFunction: RichGroupReduceFunction[Row, Row]) + : AllWindowFunction[Row, Row, DataStreamWindow] = { + +if (isTimeWindow(window)) { + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos) + .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]] +} else { + new AggregateAllWindowFunction(aggFunction) +} + + } + + + private[flink] def createWindowAggregationFunction( +window: LogicalWindow, +properties: Seq[NamedWindowProperty], +aggFunction: RichGroupReduceFunction[Row, Row]) + : WindowFunction[Row, Row, Tuple, DataStreamWindow] = { + +if (isTimeWindow(window)) { + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + new AggregateTimeWindowFunction(aggFunction, startPos, endPos) + .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]] +} else { + new AggregateWindowFunction(aggFunction) +} + + } + + private[flink] def createAllWindowIncrementalAggregationFunction( +window: LogicalWindow, +aggregates: Array[Aggregate[_ <: Any]], +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int], +properties: Seq[NamedWindowProperty]) + : AllWindowFunction[Row, Row, DataStreamWindow] = { + +val (groupingOffsetMapping, aggOffsetMapping) = + getGroupingOffsetAndaggOffsetMapping( + namedAggregates, + inputType, +
[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API
[ https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686706#comment-15686706 ] ASF GitHub Bot commented on FLINK-4937: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89088166 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -39,66 +45,69 @@ object AggregateUtil { type JavaList[T] = java.util.List[T] /** - * Create Flink operator functions for aggregates. It includes 2 implementations of Flink - * operator functions: - * [[org.apache.flink.api.common.functions.MapFunction]] and - * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's partial aggregate, - * should also implement [[org.apache.flink.api.common.functions.CombineFunction]] as well). - * The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the - * intermediate aggregate values of all aggregate function, it's stored in Row by the following - * format: - * - * {{{ - * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 - * | | - * v v - *+-+-+++++ - *|groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | - *+-+-+++++ - * ^ - * | - * sum(y) aggOffsetInRow = 4 - * }}} - * - */ - def createOperatorFunctionsForAggregates( - namedAggregates: Seq[CalcitePair[AggregateCall, String]], - inputType: RelDataType, - outputType: RelDataType, - groupings: Array[Int]) -: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = { - -val aggregateFunctionsAndFieldIndexes = - transformToAggregateFunctions(namedAggregates.map(_.getKey), inputType, groupings.length) -// store the aggregate fields of each aggregate function, by the same order of aggregates. -val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1 -val aggregates = aggregateFunctionsAndFieldIndexes._2 +* Create prepare MapFunction for aggregates. +* The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the +* intermediate aggregate values of all aggregate function, it's stored in Row by the following +* format: +* +* {{{ +* avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 +* | | +* v v +*+-+-+++++ +*|groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | +*+-+-+++++ +* ^ +* | +* sum(y) aggOffsetInRow = 4 +* }}} +* +*/ + private[flink] def createPrepareMapFunction( +aggregates: Array[Aggregate[_ <: Any]], +aggFieldIndexes: Array[Int], +groupings: Array[Int], +inputType: +RelDataType): MapFunction[Any, Row] = { val mapReturnType: RowTypeInfo = createAggregateBufferDataType(groupings, aggregates, inputType) val mapFunction = new AggregateMapFunction[Row, Row]( -aggregates, aggFieldIndexes, groupings, - mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]] - -// the mapping relation between field index of intermediate aggregate Row and output Row. -val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings) - -// the mapping relation between aggregate function index in list and its corresponding -// field index in output Row. -val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType) - -if (groupingOffsetMapping.length != groupings.length || -aggOffsetMapping.length != namedAggregates.length) { - throw new TableException("Could not find output field in input data type " + - "or aggregate functions.") -} - -val allPartialAggregate = aggregates.map(_.supportPartial).forall(x => x) + aggregates, + aggFieldIndexes, +
[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89089095 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -115,14 +124,210 @@ object AggregateUtil { intermediateRowArity, outputType.getFieldCount) } +groupReduceFunction + } + + /** +* Create IncrementalAggregateReduceFunction for Incremental aggregates. It implement +* [[org.apache.flink.api.common.functions.ReduceFunction]] +* +*/ + private[flink] def createIncrementalAggregateReduceFunction( +aggregates: Array[Aggregate[_ <: Any]], +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): IncrementalAggregateReduceFunction = { +val groupingOffsetMapping = + getGroupingOffsetAndaggOffsetMapping( +namedAggregates, +inputType, +outputType, +groupings)._1 +val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum +val reduceFunction = new IncrementalAggregateReduceFunction( + aggregates, + groupingOffsetMapping, + intermediateRowArity) +reduceFunction + } + + /** +* @return groupingOffsetMapping (mapping relation between field index of intermediate +* aggregate Row and output Row.) +* and aggOffsetMapping (the mapping relation between aggregate function index in list +* and its corresponding field index in output Row.) +*/ + def getGroupingOffsetAndaggOffsetMapping( --- End diff -- Please fix camel case: `getGroupingOffsetAndAggOffsetMapping` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API
[ https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686707#comment-15686707 ] ASF GitHub Bot commented on FLINK-4937: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89086247 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -39,66 +45,69 @@ object AggregateUtil { type JavaList[T] = java.util.List[T] /** - * Create Flink operator functions for aggregates. It includes 2 implementations of Flink - * operator functions: - * [[org.apache.flink.api.common.functions.MapFunction]] and - * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's partial aggregate, - * should also implement [[org.apache.flink.api.common.functions.CombineFunction]] as well). - * The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the - * intermediate aggregate values of all aggregate function, it's stored in Row by the following - * format: - * - * {{{ - * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 - * | | - * v v - *+-+-+++++ - *|groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | - *+-+-+++++ - * ^ - * | - * sum(y) aggOffsetInRow = 4 - * }}} - * - */ - def createOperatorFunctionsForAggregates( - namedAggregates: Seq[CalcitePair[AggregateCall, String]], - inputType: RelDataType, - outputType: RelDataType, - groupings: Array[Int]) -: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = { - -val aggregateFunctionsAndFieldIndexes = - transformToAggregateFunctions(namedAggregates.map(_.getKey), inputType, groupings.length) -// store the aggregate fields of each aggregate function, by the same order of aggregates. -val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1 -val aggregates = aggregateFunctionsAndFieldIndexes._2 +* Create prepare MapFunction for aggregates. +* The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the +* intermediate aggregate values of all aggregate function, it's stored in Row by the following +* format: +* +* {{{ +* avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 +* | | +* v v +*+-+-+++++ +*|groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | +*+-+-+++++ +* ^ +* | +* sum(y) aggOffsetInRow = 4 +* }}} +* +*/ + private[flink] def createPrepareMapFunction( +aggregates: Array[Aggregate[_ <: Any]], +aggFieldIndexes: Array[Int], +groupings: Array[Int], +inputType: +RelDataType): MapFunction[Any, Row] = { val mapReturnType: RowTypeInfo = createAggregateBufferDataType(groupings, aggregates, inputType) val mapFunction = new AggregateMapFunction[Row, Row]( -aggregates, aggFieldIndexes, groupings, - mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]] - -// the mapping relation between field index of intermediate aggregate Row and output Row. -val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings) - -// the mapping relation between aggregate function index in list and its corresponding -// field index in output Row. -val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType) - -if (groupingOffsetMapping.length != groupings.length || -aggOffsetMapping.length != namedAggregates.length) { - throw new TableException("Could not find output field in input data type " + - "or aggregate functions.") -} - -val allPartialAggregate = aggregates.map(_.supportPartial).forall(x => x) + aggregates, + aggFieldIndexes, +
[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API
[ https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686708#comment-15686708 ] ASF GitHub Bot commented on FLINK-4937: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89086106 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -39,66 +45,69 @@ object AggregateUtil { type JavaList[T] = java.util.List[T] /** - * Create Flink operator functions for aggregates. It includes 2 implementations of Flink - * operator functions: - * [[org.apache.flink.api.common.functions.MapFunction]] and - * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's partial aggregate, - * should also implement [[org.apache.flink.api.common.functions.CombineFunction]] as well). - * The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the - * intermediate aggregate values of all aggregate function, it's stored in Row by the following - * format: - * - * {{{ - * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 - * | | - * v v - *+-+-+++++ - *|groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | - *+-+-+++++ - * ^ - * | - * sum(y) aggOffsetInRow = 4 - * }}} - * - */ - def createOperatorFunctionsForAggregates( - namedAggregates: Seq[CalcitePair[AggregateCall, String]], - inputType: RelDataType, - outputType: RelDataType, - groupings: Array[Int]) -: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = { - -val aggregateFunctionsAndFieldIndexes = - transformToAggregateFunctions(namedAggregates.map(_.getKey), inputType, groupings.length) -// store the aggregate fields of each aggregate function, by the same order of aggregates. -val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1 -val aggregates = aggregateFunctionsAndFieldIndexes._2 +* Create prepare MapFunction for aggregates. +* The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the +* intermediate aggregate values of all aggregate function, it's stored in Row by the following +* format: +* +* {{{ +* avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 +* | | +* v v +*+-+-+++++ +*|groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | +*+-+-+++++ +* ^ +* | +* sum(y) aggOffsetInRow = 4 +* }}} +* +*/ + private[flink] def createPrepareMapFunction( +aggregates: Array[Aggregate[_ <: Any]], +aggFieldIndexes: Array[Int], +groupings: Array[Int], +inputType: +RelDataType): MapFunction[Any, Row] = { --- End diff -- remove this line break. > Add incremental group window aggregation for streaming Table API > > > Key: FLINK-4937 > URL: https://issues.apache.org/jira/browse/FLINK-4937 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: sunjincheng > > Group-window aggregates for streaming tables are currently not done in an > incremental fashion. This means that the window collects all records and > performs the aggregation when the window is closed instead of eagerly > updating a partial aggregate for every added record. Since records are > buffered, non-incremental aggregation requires more storage space than > incremental aggregation. > The DataStream API which is used under the hood of the streaming Table API > features [incremental > aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation] > using a {{ReduceFunction}}. > We should
[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API
[ https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686704#comment-15686704 ] ASF GitHub Bot commented on FLINK-4937: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89101806 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -115,14 +124,210 @@ object AggregateUtil { intermediateRowArity, outputType.getFieldCount) } +groupReduceFunction + } + + /** +* Create IncrementalAggregateReduceFunction for Incremental aggregates. It implement +* [[org.apache.flink.api.common.functions.ReduceFunction]] +* +*/ + private[flink] def createIncrementalAggregateReduceFunction( +aggregates: Array[Aggregate[_ <: Any]], +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): IncrementalAggregateReduceFunction = { +val groupingOffsetMapping = + getGroupingOffsetAndaggOffsetMapping( +namedAggregates, +inputType, +outputType, +groupings)._1 +val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum +val reduceFunction = new IncrementalAggregateReduceFunction( + aggregates, + groupingOffsetMapping, + intermediateRowArity) +reduceFunction + } + + /** +* @return groupingOffsetMapping (mapping relation between field index of intermediate +* aggregate Row and output Row.) +* and aggOffsetMapping (the mapping relation between aggregate function index in list +* and its corresponding field index in output Row.) +*/ + def getGroupingOffsetAndaggOffsetMapping( +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = { + +// the mapping relation between field index of intermediate aggregate Row and output Row. +val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings) + +// the mapping relation between aggregate function index in list and its corresponding +// field index in output Row. +val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType) -(mapFunction, reduceGroupFunction) +if (groupingOffsetMapping.length != groupings.length || + aggOffsetMapping.length != namedAggregates.length) { + throw new TableException( +"Could not find output field in input data type " + + "or aggregate functions.") +} +(groupingOffsetMapping, aggOffsetMapping) + } + + + private[flink] def createAllWindowAggregationFunction( +window: LogicalWindow, +properties: Seq[NamedWindowProperty], +aggFunction: RichGroupReduceFunction[Row, Row]) --- End diff -- I'd generate the `GroupReduceFunction` directly in this method. That will make the calling code simpler. > Add incremental group window aggregation for streaming Table API > > > Key: FLINK-4937 > URL: https://issues.apache.org/jira/browse/FLINK-4937 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: sunjincheng > > Group-window aggregates for streaming tables are currently not done in an > incremental fashion. This means that the window collects all records and > performs the aggregation when the window is closed instead of eagerly > updating a partial aggregate for every added record. Since records are > buffered, non-incremental aggregation requires more storage space than > incremental aggregation. > The DataStream API which is used under the hood of the streaming Table API > features [incremental > aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation] > using a {{ReduceFunction}}. > We should add support for incremental aggregation in group-windows. > This is a follow-up task of FLINK-4691. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89101976 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -115,14 +124,210 @@ object AggregateUtil { intermediateRowArity, outputType.getFieldCount) } +groupReduceFunction + } + + /** +* Create IncrementalAggregateReduceFunction for Incremental aggregates. It implement +* [[org.apache.flink.api.common.functions.ReduceFunction]] +* +*/ + private[flink] def createIncrementalAggregateReduceFunction( +aggregates: Array[Aggregate[_ <: Any]], +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): IncrementalAggregateReduceFunction = { +val groupingOffsetMapping = + getGroupingOffsetAndaggOffsetMapping( +namedAggregates, +inputType, +outputType, +groupings)._1 +val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum +val reduceFunction = new IncrementalAggregateReduceFunction( + aggregates, + groupingOffsetMapping, + intermediateRowArity) +reduceFunction + } + + /** +* @return groupingOffsetMapping (mapping relation between field index of intermediate +* aggregate Row and output Row.) +* and aggOffsetMapping (the mapping relation between aggregate function index in list +* and its corresponding field index in output Row.) +*/ + def getGroupingOffsetAndaggOffsetMapping( +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = { + +// the mapping relation between field index of intermediate aggregate Row and output Row. +val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings) + +// the mapping relation between aggregate function index in list and its corresponding +// field index in output Row. +val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType) -(mapFunction, reduceGroupFunction) +if (groupingOffsetMapping.length != groupings.length || + aggOffsetMapping.length != namedAggregates.length) { + throw new TableException( +"Could not find output field in input data type " + + "or aggregate functions.") +} +(groupingOffsetMapping, aggOffsetMapping) + } + + + private[flink] def createAllWindowAggregationFunction( +window: LogicalWindow, +properties: Seq[NamedWindowProperty], +aggFunction: RichGroupReduceFunction[Row, Row]) + : AllWindowFunction[Row, Row, DataStreamWindow] = { + +if (isTimeWindow(window)) { + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos) + .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]] +} else { + new AggregateAllWindowFunction(aggFunction) +} + + } + + + private[flink] def createWindowAggregationFunction( +window: LogicalWindow, +properties: Seq[NamedWindowProperty], +aggFunction: RichGroupReduceFunction[Row, Row]) + : WindowFunction[Row, Row, Tuple, DataStreamWindow] = { + +if (isTimeWindow(window)) { + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + new AggregateTimeWindowFunction(aggFunction, startPos, endPos) + .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]] +} else { + new AggregateWindowFunction(aggFunction) +} + + } + + private[flink] def createAllWindowIncrementalAggregationFunction( +window: LogicalWindow, +aggregates: Array[Aggregate[_ <: Any]], --- End diff -- compute `aggregates` from `namedAggregates`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API
[ https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686710#comment-15686710 ] ASF GitHub Bot commented on FLINK-4937: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r89101873 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala --- @@ -115,14 +124,210 @@ object AggregateUtil { intermediateRowArity, outputType.getFieldCount) } +groupReduceFunction + } + + /** +* Create IncrementalAggregateReduceFunction for Incremental aggregates. It implement +* [[org.apache.flink.api.common.functions.ReduceFunction]] +* +*/ + private[flink] def createIncrementalAggregateReduceFunction( +aggregates: Array[Aggregate[_ <: Any]], +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): IncrementalAggregateReduceFunction = { +val groupingOffsetMapping = + getGroupingOffsetAndaggOffsetMapping( +namedAggregates, +inputType, +outputType, +groupings)._1 +val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum +val reduceFunction = new IncrementalAggregateReduceFunction( + aggregates, + groupingOffsetMapping, + intermediateRowArity) +reduceFunction + } + + /** +* @return groupingOffsetMapping (mapping relation between field index of intermediate +* aggregate Row and output Row.) +* and aggOffsetMapping (the mapping relation between aggregate function index in list +* and its corresponding field index in output Row.) +*/ + def getGroupingOffsetAndaggOffsetMapping( +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +outputType: RelDataType, +groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = { + +// the mapping relation between field index of intermediate aggregate Row and output Row. +val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings) + +// the mapping relation between aggregate function index in list and its corresponding +// field index in output Row. +val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType) -(mapFunction, reduceGroupFunction) +if (groupingOffsetMapping.length != groupings.length || + aggOffsetMapping.length != namedAggregates.length) { + throw new TableException( +"Could not find output field in input data type " + + "or aggregate functions.") +} +(groupingOffsetMapping, aggOffsetMapping) + } + + + private[flink] def createAllWindowAggregationFunction( +window: LogicalWindow, +properties: Seq[NamedWindowProperty], +aggFunction: RichGroupReduceFunction[Row, Row]) + : AllWindowFunction[Row, Row, DataStreamWindow] = { + +if (isTimeWindow(window)) { + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos) + .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]] +} else { + new AggregateAllWindowFunction(aggFunction) +} + + } + + + private[flink] def createWindowAggregationFunction( +window: LogicalWindow, +properties: Seq[NamedWindowProperty], +aggFunction: RichGroupReduceFunction[Row, Row]) --- End diff -- Same as above. > Add incremental group window aggregation for streaming Table API > > > Key: FLINK-4937 > URL: https://issues.apache.org/jira/browse/FLINK-4937 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: sunjincheng > > Group-window aggregates for streaming tables are currently not done in an > incremental fashion. This means that the window collects all records and > performs the aggregation when the window is closed instead of eagerly > updating a partial aggregate for every added record. Since records are > buffered, non-incremental aggregation requires more storage space than > incremental aggregation. > The DataStream API which is used under the hood of the streaming Table API > features [incremental >
[GitHub] flink issue #2842: [FLINK-5097][gelly] Add missing input type information to...
Github user vasia commented on the issue: https://github.com/apache/flink/pull/2842 Hi @greghogan, I was able to fix the problem in `fromDataSet()` and `groupReduceOnEdges()` with `EdgesFunction`. In the rest of the uses, I don't seem to find a way to pass all the input types correctly. The remaining cases all have 3 input types and the `createTypeInfo()` method only accepts two. I have also tried extracting the input types from the wrapping functions, but that didn't work either. Any ideas? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5097) The TypeExtractor is missing input type information in some Graph methods
[ https://issues.apache.org/jira/browse/FLINK-5097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686757#comment-15686757 ] ASF GitHub Bot commented on FLINK-5097: --- Github user vasia commented on the issue: https://github.com/apache/flink/pull/2842 Hi @greghogan, I was able to fix the problem in `fromDataSet()` and `groupReduceOnEdges()` with `EdgesFunction`. In the rest of the uses, I don't seem to find a way to pass all the input types correctly. The remaining cases all have 3 input types and the `createTypeInfo()` method only accepts two. I have also tried extracting the input types from the wrapping functions, but that didn't work either. Any ideas? > The TypeExtractor is missing input type information in some Graph methods > - > > Key: FLINK-5097 > URL: https://issues.apache.org/jira/browse/FLINK-5097 > Project: Flink > Issue Type: Bug > Components: Gelly >Reporter: Vasia Kalavri >Assignee: Vasia Kalavri > > The TypeExtractor is called without information about the input type in > {{mapVertices}} and {{mapEdges}} although this information can be easily > retrieved. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4155) Get Kafka producer partition info in open method instead of constructor
[ https://issues.apache.org/jira/browse/FLINK-4155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686810#comment-15686810 ] ASF GitHub Bot commented on FLINK-4155: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2681 Thanks for the review @rmetzger. No, actually I haven't refactored the table tests yet. I'll file up the follow-up JIRAs mentioned in the comments, and merge the PR as is. > Get Kafka producer partition info in open method instead of constructor > --- > > Key: FLINK-4155 > URL: https://issues.apache.org/jira/browse/FLINK-4155 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.0, 1.0.3 >Reporter: Gyula Fora >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0 > > > Currently the Flink Kafka producer does not really do any error handling if > something is wrong with the partition metadata as it is serialized with the > user function. > This means that in some cases the job can go into an error loop when using > the checkpoints. Getting the partition info in the open method would solve > this problem (like restarting from a savepoint which re-runs the constructor). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2822: [FLINK-5075] [kinesis] Make Kinesis consumer fail-...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2822#discussion_r89100942 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -283,7 +285,7 @@ public String getShardIterator(KinesisStreamShard shard, String shardIteratorTyp * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result) * @return the result of the describe stream operation */ - private DescribeStreamResult describeStream(String streamName, String startShardId) throws InterruptedException { + private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException { --- End diff -- Does only Kinesalite supply the `startShardId` parameter? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5049) Instability in QueryableStateITCase.testQueryableStateWithTaskManagerFailure
[ https://issues.apache.org/jira/browse/FLINK-5049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1568#comment-1568 ] Robert Metzger commented on FLINK-5049: --- Another instance of the issue: https://api.travis-ci.org/jobs/177925657/log.txt?deansi=true > Instability in QueryableStateITCase.testQueryableStateWithTaskManagerFailure > > > Key: FLINK-5049 > URL: https://issues.apache.org/jira/browse/FLINK-5049 > Project: Flink > Issue Type: Bug > Components: TaskManager >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > Labels: test-stability > > https://api.travis-ci.org/jobs/174843846/log.txt?deansi=true > {code} > org.apache.flink.test.query.QueryableStateITCase > testQueryableStateWithTaskManagerFailure(org.apache.flink.test.query.QueryableStateITCase) > Time elapsed: 8.096 sec <<< FAILURE! > java.lang.AssertionError: Count moved backwards > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.assertTrue(Assert.java:41) > at > org.apache.flink.test.query.QueryableStateITCase.testQueryableStateWithTaskManagerFailure(QueryableStateITCase.java:470) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2681: [FLINK-4155] [kafka] Move partition list fetching to open...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2681 Thanks for the review @rmetzger. No, actually I haven't refactored the table tests yet. I'll file up the follow-up JIRAs mentioned in the comments, and merge the PR as is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2810: [FLINK-3848] Add ProjectableTableSource interface and tra...
Github user tonycox commented on the issue: https://github.com/apache/flink/pull/2810 > In addition, the TableSource should know project information (including order) not just fieldIncluded. So maybe we should also adapt RowCsvInputFormat. Do you mean we need to shuffle field in row according to projection while scanning a file? Why not to map row fields after that? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---