[jira] [Commented] (FLINK-3529) Add pull request template
[ https://issues.apache.org/jira/browse/FLINK-3529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15177442#comment-15177442 ] ASF GitHub Bot commented on FLINK-3529: --- Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/1729#issuecomment-191639327 Should be a link from CONTRIBUTING page to this one. > Add pull request template > - > > Key: FLINK-3529 > URL: https://issues.apache.org/jira/browse/FLINK-3529 > Project: Flink > Issue Type: Task > Components: other >Reporter: Martin Liesenberg >Assignee: Martin Liesenberg >Priority: Minor > > Add a template for pull requests, checking if prerequisites of opening a PR > have been fulfilled. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: FLINK-3529 Add template for pull requests
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/1729#issuecomment-191639327 Should be a link from CONTRIBUTING page to this one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3474) Partial aggregate interface design and sort-based implementation
[ https://issues.apache.org/jira/browse/FLINK-3474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15177390#comment-15177390 ] ASF GitHub Bot commented on FLINK-3474: --- Github user ChengXiangLi commented on the pull request: https://github.com/apache/flink/pull/1746#issuecomment-191621309 Hi, @fhueske , i found 2 test failure due to the input data of `AggregateMapFunction` is `Tuple` instead of `Row` while table config is `EFFICIENT`. I thought `Tuple` as other types are only supported as data source, inside SQL operators only support `Row`. Why this is introduced? more efficient? I thought the road map would go to Row which store binary data. > Partial aggregate interface design and sort-based implementation > > > Key: FLINK-3474 > URL: https://issues.apache.org/jira/browse/FLINK-3474 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Chengxiang Li >Assignee: Chengxiang Li > > The scope of this sub task includes: > # Partial aggregate interface. > # Simple aggregate function implementation, such as SUM/AVG/COUNT/MIN/MAX. > # DataSetAggregateRule which translate logical calcite aggregate node to > Flink user functions. As hash-based combiner is not available yet(see PR > #1517), we would use sort-based combine as default. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3474] support partial aggregate
Github user ChengXiangLi commented on the pull request: https://github.com/apache/flink/pull/1746#issuecomment-191621309 Hi, @fhueske , i found 2 test failure due to the input data of `AggregateMapFunction` is `Tuple` instead of `Row` while table config is `EFFICIENT`. I thought `Tuple` as other types are only supported as data source, inside SQL operators only support `Row`. Why this is introduced? more efficient? I thought the road map would go to Row which store binary data. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-3473) Add partial aggregate support in Flink
[ https://issues.apache.org/jira/browse/FLINK-3473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chengxiang Li updated FLINK-3473: - Attachment: PartialAggregateinFlink_v2.pdf > Add partial aggregate support in Flink > -- > > Key: FLINK-3473 > URL: https://issues.apache.org/jira/browse/FLINK-3473 > Project: Flink > Issue Type: Improvement > Components: Table API >Reporter: Chengxiang Li >Assignee: Chengxiang Li > Attachments: PartialAggregateinFlink_v1.pdf, > PartialAggregateinFlink_v2.pdf > > > For decomposable aggregate function, partial aggregate is more efficient as > it significantly reduce the network traffic during shuffle and parallelize > part of the aggregate calculation. This is an umbrella task for partial > aggregate. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3474) Partial aggregate interface design and sort-based implementation
[ https://issues.apache.org/jira/browse/FLINK-3474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15177260#comment-15177260 ] ASF GitHub Bot commented on FLINK-3474: --- Github user ChengXiangLi commented on a diff in the pull request: https://github.com/apache/flink/pull/1746#discussion_r54838441 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala --- @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.runtime.aggregate + +import java.lang.Iterable + +import com.google.common.base.Preconditions +import org.apache.flink.api.common.functions.{CombineFunction, RichGroupReduceFunction, RichMapPartitionFunction} +import org.apache.flink.api.table.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.util.Collector + +import scala.collection.JavaConversions._ + +/** + * It wraps the aggregate logic inside of + * [[org.apache.flink.api.java.operators.GroupReduceOperator]]. + * + * @param aggregates The aggregate functions. + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row + * and output Row. + * @param aggregateMapping The index mapping between aggregate function list and aggregated value + * index in output Row. + */ +class AggregateReduceGroupFunction( +private val aggregates: Array[Aggregate[_ <: Any]], +private val groupKeysMapping: Array[(Int, Int)], +private val aggregateMapping: Array[(Int, Int)], +private val intermediateRowArity: Int) +extends RichGroupReduceFunction[Row, Row] { + + private val finalRowLength: Int = groupKeysMapping.length + aggregateMapping.length + private var aggregateBuffer: Row = _ + private var output: Row = _ + + override def open(config: Configuration) { +Preconditions.checkNotNull(aggregates) +Preconditions.checkNotNull(groupKeysMapping) +aggregateBuffer = new Row(intermediateRowArity) +output = new Row(finalRowLength) + } + + /** + * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer, + * calculate aggregated values output by aggregate buffer, and set them into output + * Row based on the mapping relation between intermediate aggregate data and output data. + * + * @param records Grouped intermediate aggregate Rows iterator. + * @param out The collector to hand results to. + * + */ + override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = { + +// Initiate intermediate aggregate value. +aggregates.foreach(_.initiate(aggregateBuffer)) + +// Merge intermediate aggregate value to buffer. +var last: Row = null +records.foreach((record) => { + aggregates.foreach(_.merge(record, aggregateBuffer)) + last = record +}) + +// Set group keys to aggregateBuffer. +for (i <- 0 until groupKeysMapping.length) { --- End diff -- yes, i would update this. > Partial aggregate interface design and sort-based implementation > > > Key: FLINK-3474 > URL: https://issues.apache.org/jira/browse/FLINK-3474 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Chengxiang Li >Assignee: Chengxiang Li > > The scope of this sub task includes: > # Partial aggregate interface. > # Simple aggregate function implementation, such as SUM/AVG/COUNT/MIN/MAX. > # DataSetAggregateRule which translate logical calcite aggregate node to > Flink user functions. As hash-based combiner is not available yet(see PR > #1517), we would use sort-based combine as default. --
[jira] [Commented] (FLINK-3474) Partial aggregate interface design and sort-based implementation
[ https://issues.apache.org/jira/browse/FLINK-3474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15177259#comment-15177259 ] ASF GitHub Bot commented on FLINK-3474: --- Github user ChengXiangLi commented on a diff in the pull request: https://github.com/apache/flink/pull/1746#discussion_r54838416 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala --- @@ -17,26 +17,77 @@ */ package org.apache.flink.api.table.runtime.aggregate +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.flink.api.table.Row + /** - * Represents a SQL aggregate function. The user should first initialize the aggregate, then feed it - * with grouped aggregate field values, and finally get the aggregated value. - * @tparam T the output type + * The interface for all Flink aggregate functions, which expressed in terms of initiate(), + * prepare(), merge() and evaluate(). The aggregate functions would be executed in 2 phases: + * -- In Map phase, use prepare() to transform aggregate field value into intermediate + * aggregate value. + * -- In GroupReduce phase, use merge() to merge grouped intermediate aggregate values + * into aggregate buffer. Then use evaluate() to calculate the final aggregated value. + * For associative decomposable aggregate functions, they support partial aggregate. To optimize + * the performance, a Combine phase would be added between Map phase and GroupReduce phase, + * -- In Combine phase, use merge() to merge sub-grouped intermediate aggregate values + * into aggregate buffer. + * + * The intermediate aggregate value is stored inside Row, aggOffsetInRow is used as the start + * field index in Row, so different aggregate functions could share the same Row as intermediate + * aggregate value/aggregate buffer, as their aggregate values could be stored in distinct fields + * of Row with no conflict. The intermediate aggregate value is required to be a sequence of JVM + * primitives, and Flink use intermediateDataType() to get its data types in SQL side. + * + * @tparam T Aggregated value type. */ trait Aggregate[T] extends Serializable { + + protected var aggOffsetInRow: Int = _ + /** - * Initialize the aggregate state. + * Initiate the intermediate aggregate value in Row. + * @param intermediate */ - def initiateAggregate + def initiate(intermediate: Row): Unit /** - * Feed the aggregate field value. + * Transform the aggregate field value into intermediate aggregate data. * @param value + * @param intermediate */ - def aggregate(value: Any) + def prepare(value: Any, intermediate: Row): Unit --- End diff -- So null value handling should be done inside aggregate function implementation instead of at aggregate interface level, there was an umbrella JIRA about null value handling in Table API(FLINK-3139), i would create a subtask which do the null value handling for COUNT/SUM/MIN/MAX/AVG which are introduced in this PR, is that ok? > Partial aggregate interface design and sort-based implementation > > > Key: FLINK-3474 > URL: https://issues.apache.org/jira/browse/FLINK-3474 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Chengxiang Li >Assignee: Chengxiang Li > > The scope of this sub task includes: > # Partial aggregate interface. > # Simple aggregate function implementation, such as SUM/AVG/COUNT/MIN/MAX. > # DataSetAggregateRule which translate logical calcite aggregate node to > Flink user functions. As hash-based combiner is not available yet(see PR > #1517), we would use sort-based combine as default. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3474] support partial aggregate
Github user ChengXiangLi commented on a diff in the pull request: https://github.com/apache/flink/pull/1746#discussion_r54838441 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala --- @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.runtime.aggregate + +import java.lang.Iterable + +import com.google.common.base.Preconditions +import org.apache.flink.api.common.functions.{CombineFunction, RichGroupReduceFunction, RichMapPartitionFunction} +import org.apache.flink.api.table.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.util.Collector + +import scala.collection.JavaConversions._ + +/** + * It wraps the aggregate logic inside of + * [[org.apache.flink.api.java.operators.GroupReduceOperator]]. + * + * @param aggregates The aggregate functions. + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row + * and output Row. + * @param aggregateMapping The index mapping between aggregate function list and aggregated value + * index in output Row. + */ +class AggregateReduceGroupFunction( +private val aggregates: Array[Aggregate[_ <: Any]], +private val groupKeysMapping: Array[(Int, Int)], +private val aggregateMapping: Array[(Int, Int)], +private val intermediateRowArity: Int) +extends RichGroupReduceFunction[Row, Row] { + + private val finalRowLength: Int = groupKeysMapping.length + aggregateMapping.length + private var aggregateBuffer: Row = _ + private var output: Row = _ + + override def open(config: Configuration) { +Preconditions.checkNotNull(aggregates) +Preconditions.checkNotNull(groupKeysMapping) +aggregateBuffer = new Row(intermediateRowArity) +output = new Row(finalRowLength) + } + + /** + * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer, + * calculate aggregated values output by aggregate buffer, and set them into output + * Row based on the mapping relation between intermediate aggregate data and output data. + * + * @param records Grouped intermediate aggregate Rows iterator. + * @param out The collector to hand results to. + * + */ + override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = { + +// Initiate intermediate aggregate value. +aggregates.foreach(_.initiate(aggregateBuffer)) + +// Merge intermediate aggregate value to buffer. +var last: Row = null +records.foreach((record) => { + aggregates.foreach(_.merge(record, aggregateBuffer)) + last = record +}) + +// Set group keys to aggregateBuffer. +for (i <- 0 until groupKeysMapping.length) { --- End diff -- yes, i would update this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3474] support partial aggregate
Github user ChengXiangLi commented on a diff in the pull request: https://github.com/apache/flink/pull/1746#discussion_r54838416 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala --- @@ -17,26 +17,77 @@ */ package org.apache.flink.api.table.runtime.aggregate +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.flink.api.table.Row + /** - * Represents a SQL aggregate function. The user should first initialize the aggregate, then feed it - * with grouped aggregate field values, and finally get the aggregated value. - * @tparam T the output type + * The interface for all Flink aggregate functions, which expressed in terms of initiate(), + * prepare(), merge() and evaluate(). The aggregate functions would be executed in 2 phases: + * -- In Map phase, use prepare() to transform aggregate field value into intermediate + * aggregate value. + * -- In GroupReduce phase, use merge() to merge grouped intermediate aggregate values + * into aggregate buffer. Then use evaluate() to calculate the final aggregated value. + * For associative decomposable aggregate functions, they support partial aggregate. To optimize + * the performance, a Combine phase would be added between Map phase and GroupReduce phase, + * -- In Combine phase, use merge() to merge sub-grouped intermediate aggregate values + * into aggregate buffer. + * + * The intermediate aggregate value is stored inside Row, aggOffsetInRow is used as the start + * field index in Row, so different aggregate functions could share the same Row as intermediate + * aggregate value/aggregate buffer, as their aggregate values could be stored in distinct fields + * of Row with no conflict. The intermediate aggregate value is required to be a sequence of JVM + * primitives, and Flink use intermediateDataType() to get its data types in SQL side. + * + * @tparam T Aggregated value type. */ trait Aggregate[T] extends Serializable { + + protected var aggOffsetInRow: Int = _ + /** - * Initialize the aggregate state. + * Initiate the intermediate aggregate value in Row. + * @param intermediate */ - def initiateAggregate + def initiate(intermediate: Row): Unit /** - * Feed the aggregate field value. + * Transform the aggregate field value into intermediate aggregate data. * @param value + * @param intermediate */ - def aggregate(value: Any) + def prepare(value: Any, intermediate: Row): Unit --- End diff -- So null value handling should be done inside aggregate function implementation instead of at aggregate interface level, there was an umbrella JIRA about null value handling in Table API(FLINK-3139), i would create a subtask which do the null value handling for COUNT/SUM/MIN/MAX/AVG which are introduced in this PR, is that ok? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [docs] fix javascript exception caused by disq...
GitHub user wuchong opened a pull request: https://github.com/apache/flink/pull/1756 [docs] fix javascript exception caused by disqus As we comment `` but not the disqus javascript. It will cause a exception like this: ![image](https://cloud.githubusercontent.com/assets/5378924/13482083/40bd92ea-e125-11e5-90f4-2682b763a288.png) And fix a minor typo in `cluster_execution.md` You can merge this pull request into a Git repository by running: $ git pull https://github.com/wuchong/flink docs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1756.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1756 commit 713d418b6259ee94abe35edf710c5cabf978e1c2 Author: Jark WuDate: 2016-03-03T01:43:38Z [docs] fix javascript exception caused by disqus and fix typos in cluster execution. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3554) Bounded sources should emit a Max Watermark when they are done
[ https://issues.apache.org/jira/browse/FLINK-3554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15176939#comment-15176939 ] ASF GitHub Bot commented on FLINK-3554: --- Github user wenlonglwl commented on the pull request: https://github.com/apache/flink/pull/1750#issuecomment-191534452 just want to remind, this dose not completely solve the problem of loss of pending windows over finite sources~ looking forward to the solutions~ > Bounded sources should emit a Max Watermark when they are done > -- > > Key: FLINK-3554 > URL: https://issues.apache.org/jira/browse/FLINK-3554 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.0.0 > > > For proper event time support in bounded sources, these sources should emit a > final watermark before shutting down. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3498) Implement TRIM, SUBSTRING as reference design for Table API
[ https://issues.apache.org/jira/browse/FLINK-3498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15176768#comment-15176768 ] ASF GitHub Bot commented on FLINK-3498: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1747#issuecomment-191503064 Will merge this to `tableOnCalcite` branch. > Implement TRIM, SUBSTRING as reference design for Table API > --- > > Key: FLINK-3498 > URL: https://issues.apache.org/jira/browse/FLINK-3498 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Timo Walther >Assignee: Timo Walther > > As described in FLINK-3497 TRIM and SUBSTRING are the first scalar functions > implemented in Calcite and used in Table API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3498] Implement TRIM, SUBSTRING as refe...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1747#issuecomment-191503064 Will merge this to `tableOnCalcite` branch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3486] [tableAPI] Fix broken renaming of...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1738#issuecomment-191502994 Will merge this to `tableOnCalcite` branch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3486) Use Project to rename all record fields would fail following Project.
[ https://issues.apache.org/jira/browse/FLINK-3486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15176766#comment-15176766 ] ASF GitHub Bot commented on FLINK-3486: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1738#issuecomment-191502994 Will merge this to `tableOnCalcite` branch. > Use Project to rename all record fields would fail following Project. > - > > Key: FLINK-3486 > URL: https://issues.apache.org/jira/browse/FLINK-3486 > Project: Flink > Issue Type: Bug > Components: Table API >Reporter: Chengxiang Li >Assignee: Fabian Hueske > > {noformat} val t = CollectionDataSets.get3TupleDataSet(env).toTable > .select('_1 as 'a, '_2 as 'b, '_3 as 'c) > .select('a, 'b) > {noformat} > would throw exception like: > {noformat} > java.lang.IllegalArgumentException: field [a] not found; input fields are: > [_1, _2, _3] > at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:290) > at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:275) > at > org.apache.flink.api.table.plan.RexNodeTranslator$.toRexNode(RexNodeTranslator.scala:80) > at org.apache.flink.api.table.Table$$anonfun$5.apply(table.scala:98) > at org.apache.flink.api.table.Table$$anonfun$5.apply(table.scala:98) > {noformat} > new alias names are lost. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3564) Implement distinct() for Table API
[ https://issues.apache.org/jira/browse/FLINK-3564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15176735#comment-15176735 ] ASF GitHub Bot commented on FLINK-3564: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1754#discussion_r54814006 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/DistinctITCase.java --- @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.table.test; + +import java.util.List; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.table.TableEnvironment; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.Table; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class DistinctITCase extends MultipleProgramsTestBase { + + public DistinctITCase(TestExecutionMode mode){ + super(mode); + } + + @Test + public void testAllDistinct() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet> input = CollectionDataSets.getSmall3TupleDataSet(env); + + Table table = tableEnv.fromDataSet(input, "a, b, c"); + + Table distinct = table.distinct(); + + DataSet ds = tableEnv.toDataSet(distinct, Row.class); + List results = ds.collect(); + String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n"; + compareResultAsText(results, expected); + } + + @Test + public void testDistinct() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet > input = CollectionDataSets.getSmall3TupleDataSet(env); + + Table table = tableEnv.fromDataSet(input, "a, b, c"); + + Table distinct = table.select("b").distinct(); + + DataSet ds = tableEnv.toDataSet(distinct, Row.class); + List results = ds.collect(); + String expected = "1\n" + "2\n"; + compareResultAsText(results, expected); + } + + @Test + public void testDistinctAfterAggregate() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet > input = CollectionDataSets.get5TupleDataSet(env); + + Table table = tableEnv.fromDataSet(input, "a, b, c, d, e"); + + Table distinct = table.groupBy("a").select("e").distinct(); --- End diff -- Isn't the result of `groupBy('a).select('e)` non-deterministic? Many database systems do not even allow this. Btw, I looked at the implementation of `GroupedTable.select()` and it looks incorrect. It does not group but just add a selection. Either we should reject the select or select the value an arbitrary row of the group. > Implement distinct() for Table API > -- > > Key: FLINK-3564 > URL: https://issues.apache.org/jira/browse/FLINK-3564 > Project: Flink > Issue Type: New Feature > Components: Table API >
[GitHub] flink pull request: [FLINK-3564] [table] Implement distinct() for ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1754#discussion_r54814006 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/DistinctITCase.java --- @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.table.test; + +import java.util.List; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.table.TableEnvironment; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.Table; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class DistinctITCase extends MultipleProgramsTestBase { + + public DistinctITCase(TestExecutionMode mode){ + super(mode); + } + + @Test + public void testAllDistinct() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet> input = CollectionDataSets.getSmall3TupleDataSet(env); + + Table table = tableEnv.fromDataSet(input, "a, b, c"); + + Table distinct = table.distinct(); + + DataSet ds = tableEnv.toDataSet(distinct, Row.class); + List results = ds.collect(); + String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n"; + compareResultAsText(results, expected); + } + + @Test + public void testDistinct() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet > input = CollectionDataSets.getSmall3TupleDataSet(env); + + Table table = tableEnv.fromDataSet(input, "a, b, c"); + + Table distinct = table.select("b").distinct(); + + DataSet ds = tableEnv.toDataSet(distinct, Row.class); + List results = ds.collect(); + String expected = "1\n" + "2\n"; + compareResultAsText(results, expected); + } + + @Test + public void testDistinctAfterAggregate() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet > input = CollectionDataSets.get5TupleDataSet(env); + + Table table = tableEnv.fromDataSet(input, "a, b, c, d, e"); + + Table distinct = table.groupBy("a").select("e").distinct(); --- End diff -- Isn't the result of `groupBy('a).select('e)` non-deterministic? Many database systems do not even allow this. Btw, I looked at the implementation of `GroupedTable.select()` and it looks incorrect. It does not group but just add a selection. Either we should reject the select or select the value an arbitrary row of the group. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3564) Implement distinct() for Table API
[ https://issues.apache.org/jira/browse/FLINK-3564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15176674#comment-15176674 ] ASF GitHub Bot commented on FLINK-3564: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1754#discussion_r54808793 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/DistinctITCase.java --- @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.table.test; + +import java.util.List; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.table.TableEnvironment; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.Table; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class DistinctITCase extends MultipleProgramsTestBase { + + public DistinctITCase(TestExecutionMode mode){ + super(mode); + } + + @Test + public void testAllDistinct() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet> input = CollectionDataSets.getSmall3TupleDataSet(env); + + Table table = tableEnv.fromDataSet(input, "a, b, c"); + + Table distinct = table.distinct(); + + DataSet ds = tableEnv.toDataSet(distinct, Row.class); + List results = ds.collect(); + String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n"; + compareResultAsText(results, expected); + } + + @Test + public void testDistinct() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet > input = CollectionDataSets.getSmall3TupleDataSet(env); --- End diff -- Why not use `get3TupleDataSet()` which has more duplicates? > Implement distinct() for Table API > -- > > Key: FLINK-3564 > URL: https://issues.apache.org/jira/browse/FLINK-3564 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Minor > > This is only syntactic sugar for grouping of all fields. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3564] [table] Implement distinct() for ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1754#discussion_r54808793 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/DistinctITCase.java --- @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.table.test; + +import java.util.List; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.table.TableEnvironment; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.Table; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class DistinctITCase extends MultipleProgramsTestBase { + + public DistinctITCase(TestExecutionMode mode){ + super(mode); + } + + @Test + public void testAllDistinct() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet> input = CollectionDataSets.getSmall3TupleDataSet(env); + + Table table = tableEnv.fromDataSet(input, "a, b, c"); + + Table distinct = table.distinct(); + + DataSet ds = tableEnv.toDataSet(distinct, Row.class); + List results = ds.collect(); + String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n"; + compareResultAsText(results, expected); + } + + @Test + public void testDistinct() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet > input = CollectionDataSets.getSmall3TupleDataSet(env); --- End diff -- Why not use `get3TupleDataSet()` which has more duplicates? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3564) Implement distinct() for Table API
[ https://issues.apache.org/jira/browse/FLINK-3564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15176672#comment-15176672 ] ASF GitHub Bot commented on FLINK-3564: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1754#discussion_r54808671 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/DistinctITCase.java --- @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.table.test; + +import java.util.List; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.table.TableEnvironment; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.Table; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class DistinctITCase extends MultipleProgramsTestBase { + + public DistinctITCase(TestExecutionMode mode){ + super(mode); + } + + @Test + public void testAllDistinct() throws Exception { --- End diff -- This test does not check if `distinct` is working. The tuples of `CollectionDataSets.getSmall3TupleDataSet()` are already distinct. > Implement distinct() for Table API > -- > > Key: FLINK-3564 > URL: https://issues.apache.org/jira/browse/FLINK-3564 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Minor > > This is only syntactic sugar for grouping of all fields. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3564] [table] Implement distinct() for ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1754#discussion_r54808671 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/DistinctITCase.java --- @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.table.test; + +import java.util.List; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.table.TableEnvironment; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.Table; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class DistinctITCase extends MultipleProgramsTestBase { + + public DistinctITCase(TestExecutionMode mode){ + super(mode); + } + + @Test + public void testAllDistinct() throws Exception { --- End diff -- This test does not check if `distinct` is working. The tuples of `CollectionDataSets.getSmall3TupleDataSet()` are already distinct. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-3565) FlinkKafkaConsumer does not work with Scala 2.11
[ https://issues.apache.org/jira/browse/FLINK-3565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-3565. --- Resolution: Fixed Fix Version/s: 1.1.0 1.0.0 Resolved for 1.0 in http://git-wip-us.apache.org/repos/asf/flink/commit/3adc5148 for 1.1 in http://git-wip-us.apache.org/repos/asf/flink/commit/072da7de > FlinkKafkaConsumer does not work with Scala 2.11 > - > > Key: FLINK-3565 > URL: https://issues.apache.org/jira/browse/FLINK-3565 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Aljoscha Krettek >Assignee: Maximilian Michels >Priority: Blocker > Fix For: 1.0.0, 1.1.0 > > > Running a program built against Flink_2.11 dependencies fails on a Flink_2.11 > cluster with this exception: > java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class > at kafka.utils.Pool.(Pool.scala:28) > at > kafka.consumer.FetchRequestAndResponseStatsRegistry$.(FetchRequestAndResponseStats.scala:60) > at > kafka.consumer.FetchRequestAndResponseStatsRegistry$.(FetchRequestAndResponseStats.scala) > at kafka.consumer.SimpleConsumer.(SimpleConsumer.scala:39) > at kafka.javaapi.consumer.SimpleConsumer.(SimpleConsumer.scala:34) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:518) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:218) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:193) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:160) > at com.dataartisans.querywindow.WindowJob.main(WindowJob.java:93) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at org.apache.flink.client.program.Client.runBlocking(Client.java:248) > at > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) > Caused by: java.lang.ClassNotFoundException: > scala.collection.GenTraversableOnce$class > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > ... 21 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3565) FlinkKafkaConsumer does not work with Scala 2.11
[ https://issues.apache.org/jira/browse/FLINK-3565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15176241#comment-15176241 ] ASF GitHub Bot commented on FLINK-3565: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1755 > FlinkKafkaConsumer does not work with Scala 2.11 > - > > Key: FLINK-3565 > URL: https://issues.apache.org/jira/browse/FLINK-3565 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Aljoscha Krettek >Assignee: Maximilian Michels >Priority: Blocker > > Running a program built against Flink_2.11 dependencies fails on a Flink_2.11 > cluster with this exception: > java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class > at kafka.utils.Pool.(Pool.scala:28) > at > kafka.consumer.FetchRequestAndResponseStatsRegistry$.(FetchRequestAndResponseStats.scala:60) > at > kafka.consumer.FetchRequestAndResponseStatsRegistry$.(FetchRequestAndResponseStats.scala) > at kafka.consumer.SimpleConsumer.(SimpleConsumer.scala:39) > at kafka.javaapi.consumer.SimpleConsumer.(SimpleConsumer.scala:34) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:518) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:218) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:193) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:160) > at com.dataartisans.querywindow.WindowJob.main(WindowJob.java:93) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at org.apache.flink.client.program.Client.runBlocking(Client.java:248) > at > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) > Caused by: java.lang.ClassNotFoundException: > scala.collection.GenTraversableOnce$class > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > ... 21 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3565] add module to force execution of ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1755 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3565] add module to force execution of ...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1755#issuecomment-191343477 +1 to merge --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3474] support partial aggregate
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1746#discussion_r54763270 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala --- @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.runtime.aggregate + +import java.lang.Iterable + +import com.google.common.base.Preconditions +import org.apache.flink.api.common.functions.{CombineFunction, RichGroupReduceFunction, RichMapPartitionFunction} +import org.apache.flink.api.table.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.util.Collector + +import scala.collection.JavaConversions._ + +/** + * It wraps the aggregate logic inside of + * [[org.apache.flink.api.java.operators.GroupReduceOperator]]. + * + * @param aggregates The aggregate functions. + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row + * and output Row. + * @param aggregateMapping The index mapping between aggregate function list and aggregated value + * index in output Row. + */ +class AggregateReduceGroupFunction( +private val aggregates: Array[Aggregate[_ <: Any]], +private val groupKeysMapping: Array[(Int, Int)], +private val aggregateMapping: Array[(Int, Int)], +private val intermediateRowArity: Int) +extends RichGroupReduceFunction[Row, Row] { + + private val finalRowLength: Int = groupKeysMapping.length + aggregateMapping.length + private var aggregateBuffer: Row = _ + private var output: Row = _ + + override def open(config: Configuration) { +Preconditions.checkNotNull(aggregates) +Preconditions.checkNotNull(groupKeysMapping) +aggregateBuffer = new Row(intermediateRowArity) +output = new Row(finalRowLength) + } + + /** + * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer, + * calculate aggregated values output by aggregate buffer, and set them into output + * Row based on the mapping relation between intermediate aggregate data and output data. + * + * @param records Grouped intermediate aggregate Rows iterator. + * @param out The collector to hand results to. + * + */ + override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = { + +// Initiate intermediate aggregate value. +aggregates.foreach(_.initiate(aggregateBuffer)) + +// Merge intermediate aggregate value to buffer. +var last: Row = null +records.foreach((record) => { + aggregates.foreach(_.merge(record, aggregateBuffer)) + last = record +}) + +// Set group keys to aggregateBuffer. +for (i <- 0 until groupKeysMapping.length) { --- End diff -- Is this necessary? Looks like we copy the keys first into the `aggregateBuffer` and then into the `output` row. Can't we copy the keys directly from `last` to `output`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3474) Partial aggregate interface design and sort-based implementation
[ https://issues.apache.org/jira/browse/FLINK-3474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15176104#comment-15176104 ] ASF GitHub Bot commented on FLINK-3474: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1746#discussion_r54763270 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala --- @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.runtime.aggregate + +import java.lang.Iterable + +import com.google.common.base.Preconditions +import org.apache.flink.api.common.functions.{CombineFunction, RichGroupReduceFunction, RichMapPartitionFunction} +import org.apache.flink.api.table.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.util.Collector + +import scala.collection.JavaConversions._ + +/** + * It wraps the aggregate logic inside of + * [[org.apache.flink.api.java.operators.GroupReduceOperator]]. + * + * @param aggregates The aggregate functions. + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row + * and output Row. + * @param aggregateMapping The index mapping between aggregate function list and aggregated value + * index in output Row. + */ +class AggregateReduceGroupFunction( +private val aggregates: Array[Aggregate[_ <: Any]], +private val groupKeysMapping: Array[(Int, Int)], +private val aggregateMapping: Array[(Int, Int)], +private val intermediateRowArity: Int) +extends RichGroupReduceFunction[Row, Row] { + + private val finalRowLength: Int = groupKeysMapping.length + aggregateMapping.length + private var aggregateBuffer: Row = _ + private var output: Row = _ + + override def open(config: Configuration) { +Preconditions.checkNotNull(aggregates) +Preconditions.checkNotNull(groupKeysMapping) +aggregateBuffer = new Row(intermediateRowArity) +output = new Row(finalRowLength) + } + + /** + * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer, + * calculate aggregated values output by aggregate buffer, and set them into output + * Row based on the mapping relation between intermediate aggregate data and output data. + * + * @param records Grouped intermediate aggregate Rows iterator. + * @param out The collector to hand results to. + * + */ + override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = { + +// Initiate intermediate aggregate value. +aggregates.foreach(_.initiate(aggregateBuffer)) + +// Merge intermediate aggregate value to buffer. +var last: Row = null +records.foreach((record) => { + aggregates.foreach(_.merge(record, aggregateBuffer)) + last = record +}) + +// Set group keys to aggregateBuffer. +for (i <- 0 until groupKeysMapping.length) { --- End diff -- Is this necessary? Looks like we copy the keys first into the `aggregateBuffer` and then into the `output` row. Can't we copy the keys directly from `last` to `output`? > Partial aggregate interface design and sort-based implementation > > > Key: FLINK-3474 > URL: https://issues.apache.org/jira/browse/FLINK-3474 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Chengxiang Li >Assignee: Chengxiang Li > > The scope of this sub task includes: > # Partial aggregate interface. > # Simple aggregate function implementation, such as SUM/AVG/COUNT/MIN/MAX. > # DataSetAggregateRule which translate logical calcite aggregate node
[GitHub] flink pull request: [FLINK-3474] support partial aggregate
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1746#discussion_r54761986 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala --- @@ -17,26 +17,77 @@ */ package org.apache.flink.api.table.runtime.aggregate +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.flink.api.table.Row + /** - * Represents a SQL aggregate function. The user should first initialize the aggregate, then feed it - * with grouped aggregate field values, and finally get the aggregated value. - * @tparam T the output type + * The interface for all Flink aggregate functions, which expressed in terms of initiate(), + * prepare(), merge() and evaluate(). The aggregate functions would be executed in 2 phases: + * -- In Map phase, use prepare() to transform aggregate field value into intermediate + * aggregate value. + * -- In GroupReduce phase, use merge() to merge grouped intermediate aggregate values + * into aggregate buffer. Then use evaluate() to calculate the final aggregated value. + * For associative decomposable aggregate functions, they support partial aggregate. To optimize + * the performance, a Combine phase would be added between Map phase and GroupReduce phase, + * -- In Combine phase, use merge() to merge sub-grouped intermediate aggregate values + * into aggregate buffer. + * + * The intermediate aggregate value is stored inside Row, aggOffsetInRow is used as the start + * field index in Row, so different aggregate functions could share the same Row as intermediate + * aggregate value/aggregate buffer, as their aggregate values could be stored in distinct fields + * of Row with no conflict. The intermediate aggregate value is required to be a sequence of JVM + * primitives, and Flink use intermediateDataType() to get its data types in SQL side. + * + * @tparam T Aggregated value type. */ trait Aggregate[T] extends Serializable { + + protected var aggOffsetInRow: Int = _ + /** - * Initialize the aggregate state. + * Initiate the intermediate aggregate value in Row. + * @param intermediate */ - def initiateAggregate + def initiate(intermediate: Row): Unit /** - * Feed the aggregate field value. + * Transform the aggregate field value into intermediate aggregate data. * @param value + * @param intermediate */ - def aggregate(value: Any) + def prepare(value: Any, intermediate: Row): Unit --- End diff -- Yes, this would be aggregation specific. For example for a `SUM` aggregation, `prepare` could insert a `0`, which is basically the same what `initiate` would do. However, it is also OK, to do it in `prepare` directly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3474) Partial aggregate interface design and sort-based implementation
[ https://issues.apache.org/jira/browse/FLINK-3474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15176085#comment-15176085 ] ASF GitHub Bot commented on FLINK-3474: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1746#discussion_r54761986 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala --- @@ -17,26 +17,77 @@ */ package org.apache.flink.api.table.runtime.aggregate +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.flink.api.table.Row + /** - * Represents a SQL aggregate function. The user should first initialize the aggregate, then feed it - * with grouped aggregate field values, and finally get the aggregated value. - * @tparam T the output type + * The interface for all Flink aggregate functions, which expressed in terms of initiate(), + * prepare(), merge() and evaluate(). The aggregate functions would be executed in 2 phases: + * -- In Map phase, use prepare() to transform aggregate field value into intermediate + * aggregate value. + * -- In GroupReduce phase, use merge() to merge grouped intermediate aggregate values + * into aggregate buffer. Then use evaluate() to calculate the final aggregated value. + * For associative decomposable aggregate functions, they support partial aggregate. To optimize + * the performance, a Combine phase would be added between Map phase and GroupReduce phase, + * -- In Combine phase, use merge() to merge sub-grouped intermediate aggregate values + * into aggregate buffer. + * + * The intermediate aggregate value is stored inside Row, aggOffsetInRow is used as the start + * field index in Row, so different aggregate functions could share the same Row as intermediate + * aggregate value/aggregate buffer, as their aggregate values could be stored in distinct fields + * of Row with no conflict. The intermediate aggregate value is required to be a sequence of JVM + * primitives, and Flink use intermediateDataType() to get its data types in SQL side. + * + * @tparam T Aggregated value type. */ trait Aggregate[T] extends Serializable { + + protected var aggOffsetInRow: Int = _ + /** - * Initialize the aggregate state. + * Initiate the intermediate aggregate value in Row. + * @param intermediate */ - def initiateAggregate + def initiate(intermediate: Row): Unit /** - * Feed the aggregate field value. + * Transform the aggregate field value into intermediate aggregate data. * @param value + * @param intermediate */ - def aggregate(value: Any) + def prepare(value: Any, intermediate: Row): Unit --- End diff -- Yes, this would be aggregation specific. For example for a `SUM` aggregation, `prepare` could insert a `0`, which is basically the same what `initiate` would do. However, it is also OK, to do it in `prepare` directly. > Partial aggregate interface design and sort-based implementation > > > Key: FLINK-3474 > URL: https://issues.apache.org/jira/browse/FLINK-3474 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Chengxiang Li >Assignee: Chengxiang Li > > The scope of this sub task includes: > # Partial aggregate interface. > # Simple aggregate function implementation, such as SUM/AVG/COUNT/MIN/MAX. > # DataSetAggregateRule which translate logical calcite aggregate node to > Flink user functions. As hash-based combiner is not available yet(see PR > #1517), we would use sort-based combine as default. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3474] support partial aggregate
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1746#issuecomment-191343405 Yes, you are right. We need a `RichCombineToGroupCombineWrapper` and inject it as a temporary fix until we have dedicated drivers for `CombineFunction`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3565) FlinkKafkaConsumer does not work with Scala 2.11
[ https://issues.apache.org/jira/browse/FLINK-3565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15176060#comment-15176060 ] ASF GitHub Bot commented on FLINK-3565: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1755#issuecomment-191343477 +1 to merge > FlinkKafkaConsumer does not work with Scala 2.11 > - > > Key: FLINK-3565 > URL: https://issues.apache.org/jira/browse/FLINK-3565 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Aljoscha Krettek >Assignee: Maximilian Michels >Priority: Blocker > > Running a program built against Flink_2.11 dependencies fails on a Flink_2.11 > cluster with this exception: > java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class > at kafka.utils.Pool.(Pool.scala:28) > at > kafka.consumer.FetchRequestAndResponseStatsRegistry$.(FetchRequestAndResponseStats.scala:60) > at > kafka.consumer.FetchRequestAndResponseStatsRegistry$.(FetchRequestAndResponseStats.scala) > at kafka.consumer.SimpleConsumer.(SimpleConsumer.scala:39) > at kafka.javaapi.consumer.SimpleConsumer.(SimpleConsumer.scala:34) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:518) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:218) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:193) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:160) > at com.dataartisans.querywindow.WindowJob.main(WindowJob.java:93) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at org.apache.flink.client.program.Client.runBlocking(Client.java:248) > at > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) > Caused by: java.lang.ClassNotFoundException: > scala.collection.GenTraversableOnce$class > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > ... 21 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3498) Implement TRIM, SUBSTRING as reference design for Table API
[ https://issues.apache.org/jira/browse/FLINK-3498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15176050#comment-15176050 ] ASF GitHub Bot commented on FLINK-3498: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1747#issuecomment-191341765 OK, I see. Sounds good to me. +1 to merge. > Implement TRIM, SUBSTRING as reference design for Table API > --- > > Key: FLINK-3498 > URL: https://issues.apache.org/jira/browse/FLINK-3498 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Timo Walther >Assignee: Timo Walther > > As described in FLINK-3497 TRIM and SUBSTRING are the first scalar functions > implemented in Calcite and used in Table API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs
[ https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Horvath reassigned FLINK-3322: Assignee: Gabor Horvath > MemoryManager creates too much GC pressure with iterative jobs > -- > > Key: FLINK-3322 > URL: https://issues.apache.org/jira/browse/FLINK-3322 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Assignee: Gabor Horvath >Priority: Critical > Fix For: 1.0.0 > > > When taskmanager.memory.preallocate is false (the default), released memory > segments are not added to a pool, but the GC is expected to take care of > them. This puts too much pressure on the GC with iterative jobs, where the > operators reallocate all memory at every superstep. > See the following discussion on the mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html > Reproducing the issue: > https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc > The class to start is malom.Solver. If you increase the memory given to the > JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. > (It will generate some lookuptables to /tmp on first run for a few minutes.) > (I think the slowdown might also depend somewhat on > taskmanager.memory.fraction, because more unused non-managed memory results > in rarer GCs.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3565] add module to force execution of ...
GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/1755 [FLINK-3565] add module to force execution of Shade plugin This ensures that all properties of the root pom are properly resolved by running the Shade plugin. Thus, our root pom does not have to depend on a Scala version just because it holds the Scala version properties. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink FLINK-3565 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1755.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1755 commit 47de08a9503da99b11fc45a7f299671ecab0b21e Author: Maximilian MichelsDate: 2016-03-02T16:52:05Z [maven] add module to force execution of Shade plugin This ensures that all properties of the root pom are properly resolved by running the Shade plugin. Thus, our root pom does not have to depend on a Scala version just because it holds the Scala version properties. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3565) FlinkKafkaConsumer does not work with Scala 2.11
[ https://issues.apache.org/jira/browse/FLINK-3565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15176012#comment-15176012 ] ASF GitHub Bot commented on FLINK-3565: --- GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/1755 [FLINK-3565] add module to force execution of Shade plugin This ensures that all properties of the root pom are properly resolved by running the Shade plugin. Thus, our root pom does not have to depend on a Scala version just because it holds the Scala version properties. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink FLINK-3565 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1755.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1755 commit 47de08a9503da99b11fc45a7f299671ecab0b21e Author: Maximilian MichelsDate: 2016-03-02T16:52:05Z [maven] add module to force execution of Shade plugin This ensures that all properties of the root pom are properly resolved by running the Shade plugin. Thus, our root pom does not have to depend on a Scala version just because it holds the Scala version properties. > FlinkKafkaConsumer does not work with Scala 2.11 > - > > Key: FLINK-3565 > URL: https://issues.apache.org/jira/browse/FLINK-3565 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Aljoscha Krettek >Assignee: Maximilian Michels >Priority: Blocker > > Running a program built against Flink_2.11 dependencies fails on a Flink_2.11 > cluster with this exception: > java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class > at kafka.utils.Pool.(Pool.scala:28) > at > kafka.consumer.FetchRequestAndResponseStatsRegistry$.(FetchRequestAndResponseStats.scala:60) > at > kafka.consumer.FetchRequestAndResponseStatsRegistry$.(FetchRequestAndResponseStats.scala) > at kafka.consumer.SimpleConsumer.(SimpleConsumer.scala:39) > at kafka.javaapi.consumer.SimpleConsumer.(SimpleConsumer.scala:34) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:518) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:218) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:193) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:160) > at com.dataartisans.querywindow.WindowJob.main(WindowJob.java:93) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at org.apache.flink.client.program.Client.runBlocking(Client.java:248) > at > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) > Caused by: java.lang.ClassNotFoundException: > scala.collection.GenTraversableOnce$class > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > ... 21 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3565) FlinkKafkaConsumer does not work with Scala 2.11
[ https://issues.apache.org/jira/browse/FLINK-3565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15176007#comment-15176007 ] Maximilian Michels commented on FLINK-3565: --- Thanks to [~till.rohrmann], I've come up with another approach which is also used in other projects. The idea is to add a dependency to the root pom which is then explicitly shaded from all modules. That way, the Shade plugin always generates the so called "effective" poms. Effective poms have their properties resolved. Thus, we can configure the Scala version like before in the root pom. When Maven packages the Jars, the Shade plugin will ensure that all poms are made "effective". I don't know whether we stick to this approach. It is possible to move to another solution later on. Most importantly, this change is transparent to the user. In contrast to this, I've created an independent module in our source. So everything is self-contained and we don't rely on an external Maven module: https://issues.apache.org/jira/browse/SPARK-3812 > FlinkKafkaConsumer does not work with Scala 2.11 > - > > Key: FLINK-3565 > URL: https://issues.apache.org/jira/browse/FLINK-3565 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Aljoscha Krettek >Assignee: Maximilian Michels >Priority: Blocker > > Running a program built against Flink_2.11 dependencies fails on a Flink_2.11 > cluster with this exception: > java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class > at kafka.utils.Pool.(Pool.scala:28) > at > kafka.consumer.FetchRequestAndResponseStatsRegistry$.(FetchRequestAndResponseStats.scala:60) > at > kafka.consumer.FetchRequestAndResponseStatsRegistry$.(FetchRequestAndResponseStats.scala) > at kafka.consumer.SimpleConsumer.(SimpleConsumer.scala:39) > at kafka.javaapi.consumer.SimpleConsumer.(SimpleConsumer.scala:34) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:518) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:218) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:193) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:160) > at com.dataartisans.querywindow.WindowJob.main(WindowJob.java:93) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at org.apache.flink.client.program.Client.runBlocking(Client.java:248) > at > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) > Caused by: java.lang.ClassNotFoundException: > scala.collection.GenTraversableOnce$class > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > ... 21 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-3422) Scramble HashPartitioner hashes
[ https://issues.apache.org/jira/browse/FLINK-3422?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Márton Balassi resolved FLINK-3422. --- Resolution: Fixed Fixed via 0ff286d and f0f93c2. > Scramble HashPartitioner hashes > --- > > Key: FLINK-3422 > URL: https://issues.apache.org/jira/browse/FLINK-3422 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.10.2 >Reporter: Stephan Ewen >Assignee: Gabor Horvath >Priority: Critical > Fix For: 1.0.0 > > > The {{HashPartitioner}} used by the streaming API does not apply any hash > scrambling against bad user hash functions. > We should apply a murmor or jenkins hash on top of the hash code, similar as > in the {{DataSet}} API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3422][streaming][api-breaking] Scramble...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1685 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3422) Scramble HashPartitioner hashes
[ https://issues.apache.org/jira/browse/FLINK-3422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15175883#comment-15175883 ] ASF GitHub Bot commented on FLINK-3422: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1685 > Scramble HashPartitioner hashes > --- > > Key: FLINK-3422 > URL: https://issues.apache.org/jira/browse/FLINK-3422 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.10.2 >Reporter: Stephan Ewen >Assignee: Gabor Horvath >Priority: Critical > Fix For: 1.0.0 > > > The {{HashPartitioner}} used by the streaming API does not apply any hash > scrambling against bad user hash functions. > We should apply a murmor or jenkins hash on top of the hash code, similar as > in the {{DataSet}} API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3566) Input type validation often fails on custom TypeInfo implementations
[ https://issues.apache.org/jira/browse/FLINK-3566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15175770#comment-15175770 ] Gyula Fora commented on FLINK-3566: --- Another example with KeySelectors and interfaces: interface Event { String getKey(); } public static class MyEvent implements Event { @Override public String getKey() { return ""; } } public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream events = env.fromElements(new MyEvent()); applyKeySelector(events); } private static void applyKeySelector(DataStream events) { events.keyBy(e -> e.getKey()); } This gives a following exception: Input mismatch: Generic object type 'package.TypeTest.MyEvent' expected but was 'package.TypeTest.Event'. > Input type validation often fails on custom TypeInfo implementations > > > Key: FLINK-3566 > URL: https://issues.apache.org/jira/browse/FLINK-3566 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Reporter: Gyula Fora > > Input type validation often fails when used with custom type infos. One > example of this behaviour can be reproduced by creating a custom type info > with our own field type: > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.generateSequence(1, 10).map(new MapFunctionTuple1 >() { > @Override > public Tuple1 map(Long value) throws > Exception { > return Tuple1.of(Optional.of(value)); > } > }).returns(new TupleTypeInfo<>(new > OptionTypeInfo(BasicTypeInfo.LONG_TYPE_INFO))) > .keyBy(new KeySelector , > Optional>() { > @Override > public Optional > getKey(Tuple1 value) throws Exception { > return value.f0; > } > }); > This will fail on Input type validation at the KeySelector (or any other > function for example a mapper) with the following exception: > Input mismatch: Basic type expected. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3566) Input type validation often fails on custom TypeInfo implementations
Gyula Fora created FLINK-3566: - Summary: Input type validation often fails on custom TypeInfo implementations Key: FLINK-3566 URL: https://issues.apache.org/jira/browse/FLINK-3566 Project: Flink Issue Type: Bug Components: Type Serialization System Reporter: Gyula Fora Input type validation often fails when used with custom type infos. One example of this behaviour can be reproduced by creating a custom type info with our own field type: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.generateSequence(1, 10).map(new MapFunction>() { @Override public Tuple1 map(Long value) throws Exception { return Tuple1.of(Optional.of(value)); } }).returns(new TupleTypeInfo<>(new OptionTypeInfo(BasicTypeInfo.LONG_TYPE_INFO))) .keyBy(new KeySelector , Optional>() { @Override public Optional getKey(Tuple1 value) throws Exception { return value.f0; } }); This will fail on Input type validation at the KeySelector (or any other function for example a mapper) with the following exception: Input mismatch: Basic type expected. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3554] [streaming] Emit a MAX Watermark ...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1750#issuecomment-191260012 Hi, what are you referring to? The watermark does not interact with count windows. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3554) Bounded sources should emit a Max Watermark when they are done
[ https://issues.apache.org/jira/browse/FLINK-3554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15175663#comment-15175663 ] ASF GitHub Bot commented on FLINK-3554: --- Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1750#issuecomment-191260012 Hi, what are you referring to? The watermark does not interact with count windows. > Bounded sources should emit a Max Watermark when they are done > -- > > Key: FLINK-3554 > URL: https://issues.apache.org/jira/browse/FLINK-3554 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.0.0 > > > For proper event time support in bounded sources, these sources should emit a > final watermark before shutting down. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3152) Support all comparisons for Date type
[ https://issues.apache.org/jira/browse/FLINK-3152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15175657#comment-15175657 ] Fabian Hueske commented on FLINK-3152: -- Yes, a subtask of FLINK-3497 sounds good. I will comment on FLINK-3498 soon. > Support all comparisons for Date type > - > > Key: FLINK-3152 > URL: https://issues.apache.org/jira/browse/FLINK-3152 > Project: Flink > Issue Type: Improvement > Components: Table API >Reporter: Timo Walther > > Currently, the Table API does not support comparisons like "DATE < DATE", > "DATE >= DATE". The ExpressionCodeGenerator needs to be adapted for that. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-3563) .returns() doesn't compile when using .map() with a custom MapFunction
[ https://issues.apache.org/jira/browse/FLINK-3563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15175641#comment-15175641 ] Timo Walther edited comment on FLINK-3563 at 3/2/16 2:11 PM: - As far as I know, mixing of Scala and Java API is not supported. So it is possible that the type extraction fails in this example. I tested this which runs fine: {code} text.map(new MapFunction() { @Override public Map map(String value) throws Exception { return new HashMap(2); } }).print(); {code} Explicit casts are ok in this case. [~aljoscha] I think we can close this issue, what do you think? was (Author: twalthr): As far as I know, mixing of Scala and Java API is not supported. So it is possible that the type extraction fails in this example. I tested this which runs fine: {code} text.map(new MapFunction () { @Override public Map map(String value) throws Exception { return new HashMap(2); } }).print(); {code} Explicit casts is ok in this case. [~aljoscha] I think we can close this issue, what do you think? > .returns() doesn't compile when using .map() with a custom MapFunction > -- > > Key: FLINK-3563 > URL: https://issues.apache.org/jira/browse/FLINK-3563 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 0.10.1 >Reporter: Simone Robutti >Priority: Minor > > Defined a DummyMapFunction that goes from a java Map to another java Map like > this: > {code:title=DummyMapFunction.scalaborderStyle=solid} > class DummyMapFunction() extends MapFunction[java.util.Map[String, Any], > java.util.Map[FieldName, Any]] { > override def map(input: java.util.Map[String, Any]): > java.util.Map[FieldName, Any] = { > val result: java.util.Map[FieldName, Any] = new > java.util.HashMap[FieldName, Any]() > result > } > } > {code} > and trying to use it with a map: > {code:title=Main.java} > DummyMapFunction operator = new DummyMapFunction(); > DataSource
[jira] [Commented] (FLINK-3563) .returns() doesn't compile when using .map() with a custom MapFunction
[ https://issues.apache.org/jira/browse/FLINK-3563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15175641#comment-15175641 ] Timo Walther commented on FLINK-3563: - As far as I know, mixing of Scala and Java API is not supported. So it is possible that the type extraction fails in this example. I tested this which runs fine: {code} text.map(new MapFunction() { @Override public Map map(String value) throws Exception { return new HashMap(2); } }).print(); {code} Explicit casts is ok in this case. [~aljoscha] I think we can close this issue, what do you think? > .returns() doesn't compile when using .map() with a custom MapFunction > -- > > Key: FLINK-3563 > URL: https://issues.apache.org/jira/browse/FLINK-3563 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 0.10.1 >Reporter: Simone Robutti >Priority: Minor > > Defined a DummyMapFunction that goes from a java Map to another java Map like > this: > {code:title=DummyMapFunction.scalaborderStyle=solid} > class DummyMapFunction() extends MapFunction[java.util.Map[String, Any], > java.util.Map[FieldName, Any]] { > override def map(input: java.util.Map[String, Any]): > java.util.Map[FieldName, Any] = { > val result: java.util.Map[FieldName, Any] = new > java.util.HashMap[FieldName, Any]() > result > } > } > {code} > and trying to use it with a map: > {code:title=Main.java} > DummyMapFunction operator = new DummyMapFunction(); > DataSource
[jira] [Commented] (FLINK-3565) FlinkKafkaConsumer does not work with Scala 2.11
[ https://issues.apache.org/jira/browse/FLINK-3565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15175616#comment-15175616 ] Aljoscha Krettek commented on FLINK-3565: - I think 2) is the way to go, yes. > FlinkKafkaConsumer does not work with Scala 2.11 > - > > Key: FLINK-3565 > URL: https://issues.apache.org/jira/browse/FLINK-3565 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Aljoscha Krettek >Assignee: Maximilian Michels >Priority: Blocker > > Running a program built against Flink_2.11 dependencies fails on a Flink_2.11 > cluster with this exception: > java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class > at kafka.utils.Pool.(Pool.scala:28) > at > kafka.consumer.FetchRequestAndResponseStatsRegistry$.(FetchRequestAndResponseStats.scala:60) > at > kafka.consumer.FetchRequestAndResponseStatsRegistry$.(FetchRequestAndResponseStats.scala) > at kafka.consumer.SimpleConsumer.(SimpleConsumer.scala:39) > at kafka.javaapi.consumer.SimpleConsumer.(SimpleConsumer.scala:34) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:518) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:218) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:193) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:160) > at com.dataartisans.querywindow.WindowJob.main(WindowJob.java:93) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at org.apache.flink.client.program.Client.runBlocking(Client.java:248) > at > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) > Caused by: java.lang.ClassNotFoundException: > scala.collection.GenTraversableOnce$class > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > ... 21 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3561) ExecutionConfig's timestampsEnabled is unused
[ https://issues.apache.org/jira/browse/FLINK-3561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15175613#comment-15175613 ] Aljoscha Krettek commented on FLINK-3561: - I think so, yes. [~StephanEwen] must have forgot them when he removed the other related stuff. > ExecutionConfig's timestampsEnabled is unused > - > > Key: FLINK-3561 > URL: https://issues.apache.org/jira/browse/FLINK-3561 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.0.0, 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Minor > Fix For: 1.1.0 > > > Seems like the flag can be removed. What do you think [~aljoscha]? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-3565) FlinkKafkaConsumer does not work with Scala 2.11
[ https://issues.apache.org/jira/browse/FLINK-3565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels reassigned FLINK-3565: - Assignee: Maximilian Michels > FlinkKafkaConsumer does not work with Scala 2.11 > - > > Key: FLINK-3565 > URL: https://issues.apache.org/jira/browse/FLINK-3565 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Aljoscha Krettek >Assignee: Maximilian Michels >Priority: Blocker > > Running a program built against Flink_2.11 dependencies fails on a Flink_2.11 > cluster with this exception: > java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class > at kafka.utils.Pool.(Pool.scala:28) > at > kafka.consumer.FetchRequestAndResponseStatsRegistry$.(FetchRequestAndResponseStats.scala:60) > at > kafka.consumer.FetchRequestAndResponseStatsRegistry$.(FetchRequestAndResponseStats.scala) > at kafka.consumer.SimpleConsumer.(SimpleConsumer.scala:39) > at kafka.javaapi.consumer.SimpleConsumer.(SimpleConsumer.scala:34) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:518) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:218) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:193) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:160) > at com.dataartisans.querywindow.WindowJob.main(WindowJob.java:93) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at org.apache.flink.client.program.Client.runBlocking(Client.java:248) > at > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) > Caused by: java.lang.ClassNotFoundException: > scala.collection.GenTraversableOnce$class > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > ... 21 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3565) FlinkKafkaConsumer does not work with Scala 2.11
Aljoscha Krettek created FLINK-3565: --- Summary: FlinkKafkaConsumer does not work with Scala 2.11 Key: FLINK-3565 URL: https://issues.apache.org/jira/browse/FLINK-3565 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 1.0.0 Reporter: Aljoscha Krettek Priority: Blocker Running a program built against Flink_2.11 dependencies fails on a Flink_2.11 cluster with this exception: java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class at kafka.utils.Pool.(Pool.scala:28) at kafka.consumer.FetchRequestAndResponseStatsRegistry$.(FetchRequestAndResponseStats.scala:60) at kafka.consumer.FetchRequestAndResponseStatsRegistry$.(FetchRequestAndResponseStats.scala) at kafka.consumer.SimpleConsumer.(SimpleConsumer.scala:39) at kafka.javaapi.consumer.SimpleConsumer.(SimpleConsumer.scala:34) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:518) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:218) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:193) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:160) at com.dataartisans.querywindow.WindowJob.main(WindowJob.java:93) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) at org.apache.flink.client.program.Client.runBlocking(Client.java:248) at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) Caused by: java.lang.ClassNotFoundException: scala.collection.GenTraversableOnce$class at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 21 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3152) Support all comparisons for Date type
[ https://issues.apache.org/jira/browse/FLINK-3152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15175590#comment-15175590 ] Dawid Wysakowicz commented on FLINK-3152: - Thanks Timo, Sure, I will happily work on it as well as any other issue in Table API if there are some more adequate for a beginner like me or with a higher priority. > Support all comparisons for Date type > - > > Key: FLINK-3152 > URL: https://issues.apache.org/jira/browse/FLINK-3152 > Project: Flink > Issue Type: Improvement > Components: Table API >Reporter: Timo Walther > > Currently, the Table API does not support comparisons like "DATE < DATE", > "DATE >= DATE". The ExpressionCodeGenerator needs to be adapted for that. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-3152) Support all comparisons for Date type
[ https://issues.apache.org/jira/browse/FLINK-3152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15175570#comment-15175570 ] Timo Walther edited comment on FLINK-3152 at 3/2/16 1:21 PM: - Hi Dawid, thanks for your interest. We are currently moving the Table API on top of Apache Calcite (see [FLINK-3221]). The code generation needed to be rewritten for that. This issue is now more complicated than it was before (with the "old" Table API), because the new CodeGenerator does not support the Date type at all yet. We need to sync it with Calcites time/date types first. If you like to work on this issue anyway I can help you integrating the Date type into the new Table API on Calcite. Regards, Timo was (Author: twalthr): Hi Dawid, thanks for your interest. We are currently moving the Table API on top of Apache Calcite (see [FLINK-3221]). The code generation needed to be rewritten for that. This issue is now more complicated than it was before (with the "old" Table API), because the new CodeGenerator does not support the Date type at all. We need to sync it with Calcites time/date types first. If you like to work on this issue anyway I can help you integrating the Date type into the new Table API on Calcite. Regards, Timo > Support all comparisons for Date type > - > > Key: FLINK-3152 > URL: https://issues.apache.org/jira/browse/FLINK-3152 > Project: Flink > Issue Type: Improvement > Components: Table API >Reporter: Timo Walther > > Currently, the Table API does not support comparisons like "DATE < DATE", > "DATE >= DATE". The ExpressionCodeGenerator needs to be adapted for that. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3152) Support all comparisons for Date type
[ https://issues.apache.org/jira/browse/FLINK-3152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15175570#comment-15175570 ] Timo Walther commented on FLINK-3152: - Hi Dawid, thanks for your interest. We are currently moving the Table API on top of Apache Calcite (see [FLINK-3221]). The code generation needed to be rewritten for that. This issue is now more complicated than it was before (with the "old" Table API), because the new CodeGenerator does not support the Date type at all. We need to sync it with Calcites time/date types first. If you like to work on this issue anyway I can help you integrating the Date type into the new Table API on Calcite. Regards, Timo > Support all comparisons for Date type > - > > Key: FLINK-3152 > URL: https://issues.apache.org/jira/browse/FLINK-3152 > Project: Flink > Issue Type: Improvement > Components: Table API >Reporter: Timo Walther > > Currently, the Table API does not support comparisons like "DATE < DATE", > "DATE >= DATE". The ExpressionCodeGenerator needs to be adapted for that. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3152) Support all comparisons for Date type
[ https://issues.apache.org/jira/browse/FLINK-3152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15175564#comment-15175564 ] Dawid Wysakowicz commented on FLINK-3152: - Hi, I would happily try to work on this issue if nobody else is already working on it. I would also benefit from any advice on how should I proceed with it. Thanks > Support all comparisons for Date type > - > > Key: FLINK-3152 > URL: https://issues.apache.org/jira/browse/FLINK-3152 > Project: Flink > Issue Type: Improvement > Components: Table API >Reporter: Timo Walther > > Currently, the Table API does not support comparisons like "DATE < DATE", > "DATE >= DATE". The ExpressionCodeGenerator needs to be adapted for that. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3564] [table] Implement distinct() for ...
GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/1754 [FLINK-3564] [table] Implement distinct() for Table API Implements some syntactic sugar. Another step towards a SQL-like API. You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink distinctTableApi Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1754.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1754 commit 5986e9e284bf64dbc5a382c829b8a5843929b028 Author: twalthrDate: 2016-03-02T12:19:38Z [FLINK-3564] [table] Implement distinct() for Table API --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3564) Implement distinct() for Table API
[ https://issues.apache.org/jira/browse/FLINK-3564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15175522#comment-15175522 ] ASF GitHub Bot commented on FLINK-3564: --- GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/1754 [FLINK-3564] [table] Implement distinct() for Table API Implements some syntactic sugar. Another step towards a SQL-like API. You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink distinctTableApi Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1754.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1754 commit 5986e9e284bf64dbc5a382c829b8a5843929b028 Author: twalthrDate: 2016-03-02T12:19:38Z [FLINK-3564] [table] Implement distinct() for Table API > Implement distinct() for Table API > -- > > Key: FLINK-3564 > URL: https://issues.apache.org/jira/browse/FLINK-3564 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Minor > > This is only syntactic sugar for grouping of all fields. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3564) Implement distinct() for Table API
Timo Walther created FLINK-3564: --- Summary: Implement distinct() for Table API Key: FLINK-3564 URL: https://issues.apache.org/jira/browse/FLINK-3564 Project: Flink Issue Type: New Feature Components: Table API Reporter: Timo Walther Assignee: Timo Walther Priority: Minor This is only syntactic sugar for grouping of all fields. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2941) Implement a neo4j - Flink/Gelly connector
[ https://issues.apache.org/jira/browse/FLINK-2941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15175453#comment-15175453 ] Chesnay Schepler commented on FLINK-2941: - does neo4j have something akin to our flink-contrib module? adding a connector without tests is bound to cause issues in the future. > Implement a neo4j - Flink/Gelly connector > - > > Key: FLINK-2941 > URL: https://issues.apache.org/jira/browse/FLINK-2941 > Project: Flink > Issue Type: New Feature > Components: Gelly >Reporter: Vasia Kalavri >Assignee: Martin Junghanns > Labels: requires-design-doc > > By connecting Flink/Gelly with a graph database like neo4j we can facilitate > interesting use-cases, like: > - use neo4j as an input source, i.e. read the complete graph or a subgraph > from a neo4j database, import it into Flink and run a graph analysis task > with Gelly. > - use neo4j as a sink, i.e. perform ETL on some data in Flink to create a > graph and insert the graph in a neo4j database for further querying. > We have started a discussion on possible implementations and have looked into > similar projects, e.g. connecting neo4j to Spark. Some initial thoughts and > experiences can be found in [this > document|https://docs.google.com/document/d/13qT_e-y8aTNWQnD43jRBq1074Y1LggPNDsic_Obwc28/edit?usp=sharing]. > Please, feel free to comment and add ideas! I will also start a discussion > in the mailing list with more concrete problems that we would like to get > feedback on. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3563) .returns() doesn't compile when using .map() with a custom MapFunction
Simone Robutti created FLINK-3563: - Summary: .returns() doesn't compile when using .map() with a custom MapFunction Key: FLINK-3563 URL: https://issues.apache.org/jira/browse/FLINK-3563 Project: Flink Issue Type: Bug Components: Type Serialization System Affects Versions: 0.10.1 Reporter: Simone Robutti Priority: Minor Defined a DummyMapFunction that goes from a java Map to another java Map like this: {code:title=DummyMapFunction.scalaborderStyle=solid} class DummyMapFunction() extends MapFunction[java.util.Map[String, Any], java.util.Map[FieldName, Any]] { override def map(input: java.util.Map[String, Any]): java.util.Map[FieldName, Any] = { val result: java.util.Map[FieldName, Any] = new java.util.HashMap[FieldName, Any]() result } } {code} and trying to use it with a map: {code:title=Main.java} DummyMapFunction operator = new DummyMapFunction(); DataSource
[jira] [Commented] (FLINK-2941) Implement a neo4j - Flink/Gelly connector
[ https://issues.apache.org/jira/browse/FLINK-2941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15175434#comment-15175434 ] Vasia Kalavri commented on FLINK-2941: -- Hey [~mju], thanks for working on this! I'm very happy we have working input and output formats for Neo4j :D Regarding the licenses, afaik we cannot have a GPL dependency in Flink. So, I see two solutions here: (1) we add the Neo4j connector to flink-batch-connectors without tests. Examples don't depend on anything GPL licensed, right? (2) we keep this as an external flink package. I'm personally in favor of (1), but I would like to hear other people's opinions. It seems that there are other batch connectors without tests btw, e.g. hcatalog and hbase. > Implement a neo4j - Flink/Gelly connector > - > > Key: FLINK-2941 > URL: https://issues.apache.org/jira/browse/FLINK-2941 > Project: Flink > Issue Type: New Feature > Components: Gelly >Reporter: Vasia Kalavri >Assignee: Martin Junghanns > Labels: requires-design-doc > > By connecting Flink/Gelly with a graph database like neo4j we can facilitate > interesting use-cases, like: > - use neo4j as an input source, i.e. read the complete graph or a subgraph > from a neo4j database, import it into Flink and run a graph analysis task > with Gelly. > - use neo4j as a sink, i.e. perform ETL on some data in Flink to create a > graph and insert the graph in a neo4j database for further querying. > We have started a discussion on possible implementations and have looked into > similar projects, e.g. connecting neo4j to Spark. Some initial thoughts and > experiences can be found in [this > document|https://docs.google.com/document/d/13qT_e-y8aTNWQnD43jRBq1074Y1LggPNDsic_Obwc28/edit?usp=sharing]. > Please, feel free to comment and add ideas! I will also start a discussion > in the mailing list with more concrete problems that we would like to get > feedback on. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3474) Partial aggregate interface design and sort-based implementation
[ https://issues.apache.org/jira/browse/FLINK-3474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15175430#comment-15175430 ] ASF GitHub Bot commented on FLINK-3474: --- Github user ChengXiangLi commented on the pull request: https://github.com/apache/flink/pull/1746#issuecomment-191186632 Hi, @fhueske , i've update the PR based on your comments, except object reusing in `AggregateGroupCombineFunction`. I found `GroupReduceOperator` would transform it into `CombineToGroupCombineWrapper` which does not implement a rich user function. so we can not initiate the reusing object in `open()`. I think we need add a `RichCombineToGroupCombineWrapper` which used to wrap rich combinable GroupReduceFunction like 'AggregateGroupCombineFunction'. what do you think? > Partial aggregate interface design and sort-based implementation > > > Key: FLINK-3474 > URL: https://issues.apache.org/jira/browse/FLINK-3474 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Chengxiang Li >Assignee: Chengxiang Li > > The scope of this sub task includes: > # Partial aggregate interface. > # Simple aggregate function implementation, such as SUM/AVG/COUNT/MIN/MAX. > # DataSetAggregateRule which translate logical calcite aggregate node to > Flink user functions. As hash-based combiner is not available yet(see PR > #1517), we would use sort-based combine as default. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3474] support partial aggregate
Github user ChengXiangLi commented on the pull request: https://github.com/apache/flink/pull/1746#issuecomment-191186632 Hi, @fhueske , i've update the PR based on your comments, except object reusing in `AggregateGroupCombineFunction`. I found `GroupReduceOperator` would transform it into `CombineToGroupCombineWrapper` which does not implement a rich user function. so we can not initiate the reusing object in `open()`. I think we need add a `RichCombineToGroupCombineWrapper` which used to wrap rich combinable GroupReduceFunction like 'AggregateGroupCombineFunction'. what do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3474) Partial aggregate interface design and sort-based implementation
[ https://issues.apache.org/jira/browse/FLINK-3474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15175412#comment-15175412 ] ASF GitHub Bot commented on FLINK-3474: --- Github user ChengXiangLi commented on a diff in the pull request: https://github.com/apache/flink/pull/1746#discussion_r54706319 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala --- @@ -17,26 +17,77 @@ */ package org.apache.flink.api.table.runtime.aggregate +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.flink.api.table.Row + /** - * Represents a SQL aggregate function. The user should first initialize the aggregate, then feed it - * with grouped aggregate field values, and finally get the aggregated value. - * @tparam T the output type + * The interface for all Flink aggregate functions, which expressed in terms of initiate(), + * prepare(), merge() and evaluate(). The aggregate functions would be executed in 2 phases: + * -- In Map phase, use prepare() to transform aggregate field value into intermediate + * aggregate value. + * -- In GroupReduce phase, use merge() to merge grouped intermediate aggregate values + * into aggregate buffer. Then use evaluate() to calculate the final aggregated value. + * For associative decomposable aggregate functions, they support partial aggregate. To optimize + * the performance, a Combine phase would be added between Map phase and GroupReduce phase, + * -- In Combine phase, use merge() to merge sub-grouped intermediate aggregate values + * into aggregate buffer. + * + * The intermediate aggregate value is stored inside Row, aggOffsetInRow is used as the start + * field index in Row, so different aggregate functions could share the same Row as intermediate + * aggregate value/aggregate buffer, as their aggregate values could be stored in distinct fields + * of Row with no conflict. The intermediate aggregate value is required to be a sequence of JVM + * primitives, and Flink use intermediateDataType() to get its data types in SQL side. + * + * @tparam T Aggregated value type. */ trait Aggregate[T] extends Serializable { + + protected var aggOffsetInRow: Int = _ + /** - * Initialize the aggregate state. + * Initiate the intermediate aggregate value in Row. + * @param intermediate */ - def initiateAggregate + def initiate(intermediate: Row): Unit /** - * Feed the aggregate field value. + * Transform the aggregate field value into intermediate aggregate data. * @param value + * @param intermediate */ - def aggregate(value: Any) + def prepare(value: Any, intermediate: Row): Unit --- End diff -- Aggregate functions may handle `null` in different ways, some ignore it, some take it as specified value, i'm not sure whether it's a good idea to handle it with `initiate()` in all cases. > Partial aggregate interface design and sort-based implementation > > > Key: FLINK-3474 > URL: https://issues.apache.org/jira/browse/FLINK-3474 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Chengxiang Li >Assignee: Chengxiang Li > > The scope of this sub task includes: > # Partial aggregate interface. > # Simple aggregate function implementation, such as SUM/AVG/COUNT/MIN/MAX. > # DataSetAggregateRule which translate logical calcite aggregate node to > Flink user functions. As hash-based combiner is not available yet(see PR > #1517), we would use sort-based combine as default. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3474] support partial aggregate
Github user ChengXiangLi commented on a diff in the pull request: https://github.com/apache/flink/pull/1746#discussion_r54706319 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala --- @@ -17,26 +17,77 @@ */ package org.apache.flink.api.table.runtime.aggregate +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.flink.api.table.Row + /** - * Represents a SQL aggregate function. The user should first initialize the aggregate, then feed it - * with grouped aggregate field values, and finally get the aggregated value. - * @tparam T the output type + * The interface for all Flink aggregate functions, which expressed in terms of initiate(), + * prepare(), merge() and evaluate(). The aggregate functions would be executed in 2 phases: + * -- In Map phase, use prepare() to transform aggregate field value into intermediate + * aggregate value. + * -- In GroupReduce phase, use merge() to merge grouped intermediate aggregate values + * into aggregate buffer. Then use evaluate() to calculate the final aggregated value. + * For associative decomposable aggregate functions, they support partial aggregate. To optimize + * the performance, a Combine phase would be added between Map phase and GroupReduce phase, + * -- In Combine phase, use merge() to merge sub-grouped intermediate aggregate values + * into aggregate buffer. + * + * The intermediate aggregate value is stored inside Row, aggOffsetInRow is used as the start + * field index in Row, so different aggregate functions could share the same Row as intermediate + * aggregate value/aggregate buffer, as their aggregate values could be stored in distinct fields + * of Row with no conflict. The intermediate aggregate value is required to be a sequence of JVM + * primitives, and Flink use intermediateDataType() to get its data types in SQL side. + * + * @tparam T Aggregated value type. */ trait Aggregate[T] extends Serializable { + + protected var aggOffsetInRow: Int = _ + /** - * Initialize the aggregate state. + * Initiate the intermediate aggregate value in Row. + * @param intermediate */ - def initiateAggregate + def initiate(intermediate: Row): Unit /** - * Feed the aggregate field value. + * Transform the aggregate field value into intermediate aggregate data. * @param value + * @param intermediate */ - def aggregate(value: Any) + def prepare(value: Any, intermediate: Row): Unit --- End diff -- Aggregate functions may handle `null` in different ways, some ignore it, some take it as specified value, i'm not sure whether it's a good idea to handle it with `initiate()` in all cases. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [build] Consolidate scala checkstyle usage and...
GitHub user mbalassi opened a pull request: https://github.com/apache/flink/pull/1753 [build] Consolidate scala checkstyle usage and update version to 0.8.0 Consolidation is done via plugin management, so that configuration is done in one central location. Version update is needed because the latest releaseof the plugin, 0.8.0 contains a fix for a bug that I have recently run into. [1] [1] https://github.com/scalastyle/scalastyle/pull/124 You can merge this pull request into a Git repository by running: $ git pull https://github.com/mbalassi/flink scalastyle-bump Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1753.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1753 commit 2abfb63bbab0e511bcdd680593b310ccc9f98da0 Author: Márton BalassiDate: 2016-03-01T20:42:40Z [build] Consolidate scala checkstyle usage and update version to 0.8.0 Consolidation is done via plugin management. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-3562) Update docs in the course of EventTimeSourceFunction removal
Maximilian Michels created FLINK-3562: - Summary: Update docs in the course of EventTimeSourceFunction removal Key: FLINK-3562 URL: https://issues.apache.org/jira/browse/FLINK-3562 Project: Flink Issue Type: Bug Affects Versions: 1.0.0 Reporter: Maximilian Michels Assignee: Maximilian Michels Priority: Minor Fix For: 1.0.0, 1.1.0 EventTimeSourceFunction has been removed. Documentation and JavaDocs haven't been updated. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3560] [examples] Remove unchecked outpu...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1752#issuecomment-191171356 Added the missing `ProgramDescriptions`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3560) Examples shouldn't always print usage statement
[ https://issues.apache.org/jira/browse/FLINK-3560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15175353#comment-15175353 ] ASF GitHub Bot commented on FLINK-3560: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1752#issuecomment-191159732 That is a good point @vasia. Will add the missing `ProgramDescriptions`. > Examples shouldn't always print usage statement > --- > > Key: FLINK-3560 > URL: https://issues.apache.org/jira/browse/FLINK-3560 > Project: Flink > Issue Type: Improvement > Components: Examples >Affects Versions: 1.0.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > > At the moment all our examples print a usage statement no matter whether the > parameters have been provided or not. This can be confusing for people > because an usage statement is usually only printed if one has specified a > wrong parameter or if a parameter is missing. > I propose to remove the unchecked printing of the usage statement. -- This message was sent by Atlassian JIRA (v6.3.4#6332)