[jira] [Commented] (FLINK-3586) Risk of data overflow while use sum/count to calculate AVG value
[ https://issues.apache.org/jira/browse/FLINK-3586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15295804#comment-15295804 ] Chengxiang Li commented on FLINK-3586: -- Not at all, feel free to take over it, Fabian. > Risk of data overflow while use sum/count to calculate AVG value > > > Key: FLINK-3586 > URL: https://issues.apache.org/jira/browse/FLINK-3586 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Chengxiang Li >Assignee: Fabian Hueske >Priority: Minor > > Now, we use {{(sum: Long, count: Long}} to store AVG partial aggregate data, > which may have data overflow risk, we should use unbounded data type(such as > BigInteger) to store them for necessary data types. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-3586) Risk of data overflow while use sum/count to calculate AVG value
[ https://issues.apache.org/jira/browse/FLINK-3586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chengxiang Li reassigned FLINK-3586: Assignee: Chengxiang Li > Risk of data overflow while use sum/count to calculate AVG value > > > Key: FLINK-3586 > URL: https://issues.apache.org/jira/browse/FLINK-3586 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Chengxiang Li >Assignee: Chengxiang Li >Priority: Minor > > Now, we use {{(sum: Long, count: Long}} to store AVG partial aggregate data, > which may have data overflow risk, we should use unbounded data type(such as > BigInteger) to store them for necessary data types. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-3475) DISTINCT aggregate function support
[ https://issues.apache.org/jira/browse/FLINK-3475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chengxiang Li reassigned FLINK-3475: Assignee: Chengxiang Li > DISTINCT aggregate function support > --- > > Key: FLINK-3475 > URL: https://issues.apache.org/jira/browse/FLINK-3475 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Chengxiang Li >Assignee: Chengxiang Li > > DISTINCT aggregate function may be able to reuse the aggregate function > instead of separate implementation, and let Flink runtime take care of > duplicate records. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3586) Risk of data overflow while use sum/count to calculate AVG value
Chengxiang Li created FLINK-3586: Summary: Risk of data overflow while use sum/count to calculate AVG value Key: FLINK-3586 URL: https://issues.apache.org/jira/browse/FLINK-3586 Project: Flink Issue Type: Sub-task Components: Table API Reporter: Chengxiang Li Priority: Minor Now, we use {{(sum: Long, count: Long}} to store AVG partial aggregate data, which may have data overflow risk, we should use unbounded data type(such as BigInteger) to store them for necessary data types. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3473) Add partial aggregate support in Flink
[ https://issues.apache.org/jira/browse/FLINK-3473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chengxiang Li updated FLINK-3473: - Attachment: PartialAggregateinFlink_v2.pdf > Add partial aggregate support in Flink > -- > > Key: FLINK-3473 > URL: https://issues.apache.org/jira/browse/FLINK-3473 > Project: Flink > Issue Type: Improvement > Components: Table API >Reporter: Chengxiang Li >Assignee: Chengxiang Li > Attachments: PartialAggregateinFlink_v1.pdf, > PartialAggregateinFlink_v2.pdf > > > For decomposable aggregate function, partial aggregate is more efficient as > it significantly reduce the network traffic during shuffle and parallelize > part of the aggregate calculation. This is an umbrella task for partial > aggregate. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3508) Add more test cases to verify the rules of logical plan optimization
[ https://issues.apache.org/jira/browse/FLINK-3508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15167301#comment-15167301 ] Chengxiang Li commented on FLINK-3508: -- Merged to tableOnCalcite branch at 72686231fd8f9fa6a1c05df48f6f29eaa3ca4f2b. > Add more test cases to verify the rules of logical plan optimization > > > Key: FLINK-3508 > URL: https://issues.apache.org/jira/browse/FLINK-3508 > Project: Flink > Issue Type: Improvement > Components: Table API >Reporter: Chengxiang Li >Assignee: Chengxiang Li >Priority: Minor > > We have enabled many rules in logical plan optimization phase, more > complicated test cases should be added to verify whether these rules actally > work. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-3508) Add more test cases to verify the rules of logical plan optimization
[ https://issues.apache.org/jira/browse/FLINK-3508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chengxiang Li closed FLINK-3508. > Add more test cases to verify the rules of logical plan optimization > > > Key: FLINK-3508 > URL: https://issues.apache.org/jira/browse/FLINK-3508 > Project: Flink > Issue Type: Improvement > Components: Table API >Reporter: Chengxiang Li >Assignee: Chengxiang Li >Priority: Minor > > We have enabled many rules in logical plan optimization phase, more > complicated test cases should be added to verify whether these rules actally > work. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-3508) Add more test cases to verify the rules of logical plan optimization
[ https://issues.apache.org/jira/browse/FLINK-3508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chengxiang Li resolved FLINK-3508. -- Resolution: Fixed > Add more test cases to verify the rules of logical plan optimization > > > Key: FLINK-3508 > URL: https://issues.apache.org/jira/browse/FLINK-3508 > Project: Flink > Issue Type: Improvement > Components: Table API >Reporter: Chengxiang Li >Assignee: Chengxiang Li >Priority: Minor > > We have enabled many rules in logical plan optimization phase, more > complicated test cases should be added to verify whether these rules actally > work. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3508) Add more test cases to verify the rules of logical plan optimization
Chengxiang Li created FLINK-3508: Summary: Add more test cases to verify the rules of logical plan optimization Key: FLINK-3508 URL: https://issues.apache.org/jira/browse/FLINK-3508 Project: Flink Issue Type: Improvement Components: Table API Reporter: Chengxiang Li Assignee: Chengxiang Li Priority: Minor We have enabled many rules in logical plan optimization phase, more complicated test cases should be added to verify whether these rules actally work. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3507) PruneEmptyRules does not prune empty node as expected.
Chengxiang Li created FLINK-3507: Summary: PruneEmptyRules does not prune empty node as expected. Key: FLINK-3507 URL: https://issues.apache.org/jira/browse/FLINK-3507 Project: Flink Issue Type: Improvement Reporter: Chengxiang Li Priority: Minor {noformat} val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) .where(Literal(false)) .groupBy('b) .select('a.sum) {noformat} With PruneEmptyRules instances enabled, we expect the empty Aggregate and Project should be removed, while it's not removed after logical optimization. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3506) ReduceExpressionsRule does not remove duplicate expression in Filter
Chengxiang Li created FLINK-3506: Summary: ReduceExpressionsRule does not remove duplicate expression in Filter Key: FLINK-3506 URL: https://issues.apache.org/jira/browse/FLINK-3506 Project: Flink Issue Type: Improvement Components: Table API Reporter: Chengxiang Li Priority: Minor {noformat} val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) val filterDs = ds.filter('a % 2 !== 0).filter('b % 2 === 0).filter('a % 2 !== 0) {noformat} According to the ReduceExpressionsRule definition, we expect the duplicted filter expression get removed, while it's not after logical optimization. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3486) Use Project to rename all record fields would fail following Project.
[ https://issues.apache.org/jira/browse/FLINK-3486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chengxiang Li updated FLINK-3486: - Summary: Use Project to rename all record fields would fail following Project. (was: [SQL] Use Project to rename all record fields would fail following Project.) > Use Project to rename all record fields would fail following Project. > - > > Key: FLINK-3486 > URL: https://issues.apache.org/jira/browse/FLINK-3486 > Project: Flink > Issue Type: Bug > Components: Table API >Reporter: Chengxiang Li > > {noformat} val t = CollectionDataSets.get3TupleDataSet(env).toTable > .select('_1 as 'a, '_2 as 'b, '_3 as 'c) > .select('a, 'b) > {noformat} > would throw exception like: > {noformat} > java.lang.IllegalArgumentException: field [a] not found; input fields are: > [_1, _2, _3] > at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:290) > at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:275) > at > org.apache.flink.api.table.plan.RexNodeTranslator$.toRexNode(RexNodeTranslator.scala:80) > at org.apache.flink.api.table.Table$$anonfun$5.apply(table.scala:98) > at org.apache.flink.api.table.Table$$anonfun$5.apply(table.scala:98) > {noformat} > new alias names are lost. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3502) AggregateProjectPullUpConstantsRule fails on certain group keys order.
[ https://issues.apache.org/jira/browse/FLINK-3502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chengxiang Li updated FLINK-3502: - Summary: AggregateProjectPullUpConstantsRule fails on certain group keys order. (was: [SQL] AggregateProjectPullUpConstantsRule fails on certain group keys order.) > AggregateProjectPullUpConstantsRule fails on certain group keys order. > -- > > Key: FLINK-3502 > URL: https://issues.apache.org/jira/browse/FLINK-3502 > Project: Flink > Issue Type: Bug > Components: Table API >Reporter: Chengxiang Li > > {noformat} > val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) > .select('b, 4 as 'four, 'a) > .groupBy('b, 'four) > .select('four, 'a.sum) > {noformat} > This query would throw exception like: > {noformat} > java.lang.AssertionError: Internal error: Error while applying rule > AggregateProjectPullUpConstantsRule, args > [rel#7:LogicalAggregate.NONE.[](input=rel#6:Subset#1.NONE.[],group={1, > 2},TMP_1=SUM($0)), > rel#5:LogicalProject.NONE.[](input=rel#4:Subset#0.NONE.[],a=$0,four=4,b=$1)] > at org.apache.calcite.util.Util.newInternal(Util.java:792) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:251) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:826) > at > org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:304) > at > org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78) > at > org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:51) > at > org.apache.flink.api.scala.table.TableConversions.toDataSet(TableConversions.scala:35) > at > org.apache.flink.api.scala.table.test.GroupedAggregationsITCase.testGroupedByExpression(GroupedAggregationsITCase.scala:133) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) > at org.junit.runners.ParentRunner.run(ParentRunner.java:309) > at org.junit.runners.Suite.runChild(Suite.java:127) > at org.junit.runners.Suite.runChild(Suite.java:26) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner.run(ParentRunner.java:309) > at org.junit.runner.JUnitCore.run(JUnitCore.java:160) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:78) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:212) > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) > Caused by: java.lang.AssertionError: Internal error: Error occurred
[jira] [Updated] (FLINK-3486) [SQL] Use Project to rename all record fields would fail following Project.
[ https://issues.apache.org/jira/browse/FLINK-3486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chengxiang Li updated FLINK-3486: - Component/s: Table API > [SQL] Use Project to rename all record fields would fail following Project. > --- > > Key: FLINK-3486 > URL: https://issues.apache.org/jira/browse/FLINK-3486 > Project: Flink > Issue Type: Bug > Components: Table API >Reporter: Chengxiang Li > > {noformat} val t = CollectionDataSets.get3TupleDataSet(env).toTable > .select('_1 as 'a, '_2 as 'b, '_3 as 'c) > .select('a, 'b) > {noformat} > would throw exception like: > {noformat} > java.lang.IllegalArgumentException: field [a] not found; input fields are: > [_1, _2, _3] > at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:290) > at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:275) > at > org.apache.flink.api.table.plan.RexNodeTranslator$.toRexNode(RexNodeTranslator.scala:80) > at org.apache.flink.api.table.Table$$anonfun$5.apply(table.scala:98) > at org.apache.flink.api.table.Table$$anonfun$5.apply(table.scala:98) > {noformat} > new alias names are lost. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3487) [SQL] FilterAggregateTransposeRule does not transform logical plan as desired.
[ https://issues.apache.org/jira/browse/FLINK-3487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chengxiang Li updated FLINK-3487: - Component/s: Table API > [SQL] FilterAggregateTransposeRule does not transform logical plan as desired. > -- > > Key: FLINK-3487 > URL: https://issues.apache.org/jira/browse/FLINK-3487 > Project: Flink > Issue Type: Improvement > Components: Table API >Reporter: Chengxiang Li >Priority: Minor > > {noformat} > val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) > .groupBy('a) > .select('a, 'b.avg as 'value) > .filter('a === 1) > {noformat} > For this query, filter is expected to be pushed down before the aggregate by > FilterAggregateTransposeRule, currently the optimzied logical plan is not > what we desired. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3503) ProjectJoinTransposeRule fails to push down project.
[ https://issues.apache.org/jira/browse/FLINK-3503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chengxiang Li updated FLINK-3503: - Summary: ProjectJoinTransposeRule fails to push down project. (was: [SQL] ProjectJoinTransposeRule fails to push down project.) > ProjectJoinTransposeRule fails to push down project. > > > Key: FLINK-3503 > URL: https://issues.apache.org/jira/browse/FLINK-3503 > Project: Flink > Issue Type: Improvement > Components: Table API >Reporter: Chengxiang Li >Priority: Minor > > {noformat} > val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) > val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h) > val joinT = ds1.join(ds2).where('b === 'e).select('c, 'g) > {noformat} > For this query, ProjectJoinTransposeRule should pushes a Project past a Join > by splitting the projection into a projection on top of each child of the > join. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3487) FilterAggregateTransposeRule does not transform logical plan as desired.
[ https://issues.apache.org/jira/browse/FLINK-3487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chengxiang Li updated FLINK-3487: - Summary: FilterAggregateTransposeRule does not transform logical plan as desired. (was: [SQL] FilterAggregateTransposeRule does not transform logical plan as desired.) > FilterAggregateTransposeRule does not transform logical plan as desired. > > > Key: FLINK-3487 > URL: https://issues.apache.org/jira/browse/FLINK-3487 > Project: Flink > Issue Type: Improvement > Components: Table API >Reporter: Chengxiang Li >Priority: Minor > > {noformat} > val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) > .groupBy('a) > .select('a, 'b.avg as 'value) > .filter('a === 1) > {noformat} > For this query, filter is expected to be pushed down before the aggregate by > FilterAggregateTransposeRule, currently the optimzied logical plan is not > what we desired. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3505) JoinUnionTransposeRule fails to push Join past Union.
Chengxiang Li created FLINK-3505: Summary: JoinUnionTransposeRule fails to push Join past Union. Key: FLINK-3505 URL: https://issues.apache.org/jira/browse/FLINK-3505 Project: Flink Issue Type: Bug Components: Table API Reporter: Chengxiang Li {noformat} val unionDs = ds1 .unionAll(ds2.select('a, 'b, 'c)) .join(ds3) .where('a === 'k) .select('a, 'b, 'l) {noformat} For this query, JoinUnionTransposeRule fails to push push Join past Union. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3503) [SQL] ProjectJoinTransposeRule fails to push down project.
[ https://issues.apache.org/jira/browse/FLINK-3503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chengxiang Li updated FLINK-3503: - Component/s: Table API > [SQL] ProjectJoinTransposeRule fails to push down project. > -- > > Key: FLINK-3503 > URL: https://issues.apache.org/jira/browse/FLINK-3503 > Project: Flink > Issue Type: Improvement > Components: Table API >Reporter: Chengxiang Li >Priority: Minor > > {noformat} > val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) > val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h) > val joinT = ds1.join(ds2).where('b === 'e).select('c, 'g) > {noformat} > For this query, ProjectJoinTransposeRule should pushes a Project past a Join > by splitting the projection into a projection on top of each child of the > join. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3504) Join fails while have expression inside join condition.
[ https://issues.apache.org/jira/browse/FLINK-3504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chengxiang Li updated FLINK-3504: - Summary: Join fails while have expression inside join condition. (was: [SQL] Join fails while have expression inside join condition.) > Join fails while have expression inside join condition. > --- > > Key: FLINK-3504 > URL: https://issues.apache.org/jira/browse/FLINK-3504 > Project: Flink > Issue Type: Bug > Components: Table API >Reporter: Chengxiang Li > > {noformat} > val ds1 = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) > val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h) > val joinT = ds1.join(ds2).filter('a + 3 === 'd).select('c, 'g) > {noformat} > This query would throw exception: > {noformat} > Caused by: org.apache.flink.api.table.TableException: Joins should have at > least one equality condition > at > org.apache.flink.api.table.plan.rules.dataset.DataSetJoinRule.convert(DataSetJoinRule.scala:57) > at > org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:116) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:228) > ... 44 more > {noformat} > There are 2 issues here: > # DataSetJoinRule does not support expression inside join condition. > # JoinPushExpressionsRulewould add a Project to calculate expression value > before Join, so the join condition does not include expression any more, > however, it's not returned after the logical optimization. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3504) [SQL] Join fails while have expression inside join condition.
Chengxiang Li created FLINK-3504: Summary: [SQL] Join fails while have expression inside join condition. Key: FLINK-3504 URL: https://issues.apache.org/jira/browse/FLINK-3504 Project: Flink Issue Type: Bug Components: Table API Reporter: Chengxiang Li {noformat} val ds1 = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h) val joinT = ds1.join(ds2).filter('a + 3 === 'd).select('c, 'g) {noformat} This query would throw exception: {noformat} Caused by: org.apache.flink.api.table.TableException: Joins should have at least one equality condition at org.apache.flink.api.table.plan.rules.dataset.DataSetJoinRule.convert(DataSetJoinRule.scala:57) at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:116) at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:228) ... 44 more {noformat} There are 2 issues here: # DataSetJoinRule does not support expression inside join condition. # JoinPushExpressionsRulewould add a Project to calculate expression value before Join, so the join condition does not include expression any more, however, it's not returned after the logical optimization. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3487) [SQL] FilterAggregateTransposeRule does not transform logical plan as desired.
[ https://issues.apache.org/jira/browse/FLINK-3487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chengxiang Li updated FLINK-3487: - Issue Type: Improvement (was: Bug) > [SQL] FilterAggregateTransposeRule does not transform logical plan as desired. > -- > > Key: FLINK-3487 > URL: https://issues.apache.org/jira/browse/FLINK-3487 > Project: Flink > Issue Type: Improvement >Reporter: Chengxiang Li >Priority: Minor > > {noformat} > val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) > .groupBy('a) > .select('a, 'b.avg as 'value) > .filter('a === 1) > {noformat} > For this query, filter is expected to be pushed down before the aggregate by > FilterAggregateTransposeRule, currently the optimzied logical plan is not > what we desired. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3503) [SQL] ProjectJoinTransposeRule fails to push down project.
Chengxiang Li created FLINK-3503: Summary: [SQL] ProjectJoinTransposeRule fails to push down project. Key: FLINK-3503 URL: https://issues.apache.org/jira/browse/FLINK-3503 Project: Flink Issue Type: Improvement Reporter: Chengxiang Li Priority: Minor {noformat} val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h) val joinT = ds1.join(ds2).where('b === 'e).select('c, 'g) {noformat} For this query, ProjectJoinTransposeRule should pushes a Project past a Join by splitting the projection into a projection on top of each child of the join. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3502) [SQL] AggregateProjectPullUpConstantsRule fails on certain group keys order.
Chengxiang Li created FLINK-3502: Summary: [SQL] AggregateProjectPullUpConstantsRule fails on certain group keys order. Key: FLINK-3502 URL: https://issues.apache.org/jira/browse/FLINK-3502 Project: Flink Issue Type: Bug Components: Table API Reporter: Chengxiang Li {noformat} val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) .select('b, 4 as 'four, 'a) .groupBy('b, 'four) .select('four, 'a.sum) {noformat} This query would throw exception like: {noformat} java.lang.AssertionError: Internal error: Error while applying rule AggregateProjectPullUpConstantsRule, args [rel#7:LogicalAggregate.NONE.[](input=rel#6:Subset#1.NONE.[],group={1, 2},TMP_1=SUM($0)), rel#5:LogicalProject.NONE.[](input=rel#4:Subset#0.NONE.[],a=$0,four=4,b=$1)] at org.apache.calcite.util.Util.newInternal(Util.java:792) at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:251) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:826) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:304) at org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78) at org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:51) at org.apache.flink.api.scala.table.TableConversions.toDataSet(TableConversions.scala:35) at org.apache.flink.api.scala.table.test.GroupedAggregationsITCase.testGroupedByExpression(GroupedAggregationsITCase.scala:133) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) at org.junit.runners.ParentRunner.run(ParentRunner.java:309) at org.junit.runners.Suite.runChild(Suite.java:127) at org.junit.runners.Suite.runChild(Suite.java:26) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner.run(ParentRunner.java:309) at org.junit.runner.JUnitCore.run(JUnitCore.java:160) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:78) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:212) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Caused by: java.lang.AssertionError: Internal error: Error occurred while applying rule AggregateProjectPullUpConstantsRule at org.apache.calcite.util.Util.newInternal(Util.java:792) at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:150) at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:213) at
[jira] [Created] (FLINK-3487) [SQL] FilterAggregateTransposeRule does not transform logical plan as desired.
Chengxiang Li created FLINK-3487: Summary: [SQL] FilterAggregateTransposeRule does not transform logical plan as desired. Key: FLINK-3487 URL: https://issues.apache.org/jira/browse/FLINK-3487 Project: Flink Issue Type: Bug Reporter: Chengxiang Li Priority: Minor {noformat} val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) .groupBy('a) .select('a, 'b.avg as 'value) .filter('a === 1) {noformat} For this query, filter is expected to be pushed down before the aggregate by FilterAggregateTransposeRule, currently the optimzied logical plan is not what we desired. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3486) [SQL] Use Project to rename all record fields would fail following Project.
Chengxiang Li created FLINK-3486: Summary: [SQL] Use Project to rename all record fields would fail following Project. Key: FLINK-3486 URL: https://issues.apache.org/jira/browse/FLINK-3486 Project: Flink Issue Type: Bug Reporter: Chengxiang Li {noformat} val t = CollectionDataSets.get3TupleDataSet(env).toTable .select('_1 as 'a, '_2 as 'b, '_3 as 'c) .select('a, 'b) {noformat} would throw exception like: {noformat} java.lang.IllegalArgumentException: field [a] not found; input fields are: [_1, _2, _3] at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:290) at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:275) at org.apache.flink.api.table.plan.RexNodeTranslator$.toRexNode(RexNodeTranslator.scala:80) at org.apache.flink.api.table.Table$$anonfun$5.apply(table.scala:98) at org.apache.flink.api.table.Table$$anonfun$5.apply(table.scala:98) {noformat} new alias names are lost. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3476) Support hash-based partial aggregate
Chengxiang Li created FLINK-3476: Summary: Support hash-based partial aggregate Key: FLINK-3476 URL: https://issues.apache.org/jira/browse/FLINK-3476 Project: Flink Issue Type: Sub-task Components: Table API Reporter: Chengxiang Li As described in the design doc, we should be able to enable hash-based partial aggregate after hash-based combiner( #1517) is supported. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3473) Add partial aggregate support in Flink
[ https://issues.apache.org/jira/browse/FLINK-3473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chengxiang Li updated FLINK-3473: - Attachment: PartialAggregateinFlink_v1.pdf > Add partial aggregate support in Flink > -- > > Key: FLINK-3473 > URL: https://issues.apache.org/jira/browse/FLINK-3473 > Project: Flink > Issue Type: Improvement > Components: Table API >Reporter: Chengxiang Li >Assignee: Chengxiang Li > Attachments: PartialAggregateinFlink_v1.pdf > > > For decomposable aggregate function, partial aggregate is more efficient as > it significantly reduce the network traffic during shuffle and parallelize > part of the aggregate calculation. This is an umbrella task for partial > aggregate. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3474) Partial aggregate interface design and sort-based implementation
[ https://issues.apache.org/jira/browse/FLINK-3474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chengxiang Li updated FLINK-3474: - Attachment: (was: PartialAggregateinFlink_v1.pdf) > Partial aggregate interface design and sort-based implementation > > > Key: FLINK-3474 > URL: https://issues.apache.org/jira/browse/FLINK-3474 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Chengxiang Li >Assignee: Chengxiang Li > > The scope of this sub task includes: > # Partial aggregate interface. > # Simple aggregate function implementation, such as SUM/AVG/COUNT/MIN/MAX. > # DataSetAggregateRule which translate logical calcite aggregate node to > Flink user functions. As hash-based combiner is not available yet(see PR > #1517), we would use sort-based combine as default. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3474) Partial aggregate interface design and sort-based implementation
[ https://issues.apache.org/jira/browse/FLINK-3474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chengxiang Li updated FLINK-3474: - Attachment: PartialAggregateinFlink_v1.pdf > Partial aggregate interface design and sort-based implementation > > > Key: FLINK-3474 > URL: https://issues.apache.org/jira/browse/FLINK-3474 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Chengxiang Li >Assignee: Chengxiang Li > Attachments: PartialAggregateinFlink_v1.pdf > > > The scope of this sub task includes: > # Partial aggregate interface. > # Simple aggregate function implementation, such as SUM/AVG/COUNT/MIN/MAX. > # DataSetAggregateRule which translate logical calcite aggregate node to > Flink user functions. As hash-based combiner is not available yet(see PR > #1517), we would use sort-based combine as default. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3475) DISTINCT aggregate function support
Chengxiang Li created FLINK-3475: Summary: DISTINCT aggregate function support Key: FLINK-3475 URL: https://issues.apache.org/jira/browse/FLINK-3475 Project: Flink Issue Type: Sub-task Components: Table API Reporter: Chengxiang Li DISTINCT aggregate function may be able to reuse the aggregate function instead of separate implementation, and let Flink runtime take care of duplicate records. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3473) Add partial aggregate support in Flink
Chengxiang Li created FLINK-3473: Summary: Add partial aggregate support in Flink Key: FLINK-3473 URL: https://issues.apache.org/jira/browse/FLINK-3473 Project: Flink Issue Type: Improvement Components: Table API Reporter: Chengxiang Li Assignee: Chengxiang Li For decomposable aggregate function, partial aggregate is more efficient as it significantly reduce the network traffic during shuffle and parallelize part of the aggregate calculation. This is an umbrella task for partial aggregate. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-3281) IndexOutOfBoundsException when range-partitioning empty DataSet
[ https://issues.apache.org/jira/browse/FLINK-3281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chengxiang Li resolved FLINK-3281. -- Resolution: Fixed Fix Version/s: 1.0.0 > IndexOutOfBoundsException when range-partitioning empty DataSet > > > Key: FLINK-3281 > URL: https://issues.apache.org/jira/browse/FLINK-3281 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime, Local Runtime >Reporter: Fridtjof Sander >Assignee: Chengxiang Li > Fix For: 1.0.0 > > > Code: > {code} > import org.apache.flink.api.scala._ > object RangePartitionOnEmptyDataSet { > def main(args:Array[String]) = { > val env = ExecutionEnvironment.getExecutionEnvironment > env > .fromCollection(Seq[Tuple1[String]]()) > .partitionByRange(0) > .collect() > } > } > {code} > Output: > {noformat} > 01/24/2016 16:24:36 Job execution switched to status RUNNING. > 01/24/2016 16:24:36 DataSource (at > RangePartitionOnEmptyDataSet$.main(RangePartitionOnEmptyDataSet.scala:9) > (org.apache.flink.api.java.io.CollectionInputFormat))(1/1) switched to > SCHEDULED > 01/24/2016 16:24:36 DataSource (at > RangePartitionOnEmptyDataSet$.main(RangePartitionOnEmptyDataSet.scala:9) > (org.apache.flink.api.java.io.CollectionInputFormat))(1/1) switched to > DEPLOYING > 01/24/2016 16:24:36 DataSource (at > RangePartitionOnEmptyDataSet$.main(RangePartitionOnEmptyDataSet.scala:9) > (org.apache.flink.api.java.io.CollectionInputFormat))(1/1) switched to > RUNNING > 01/24/2016 16:24:36 RangePartition: LocalSample(1/1) switched to SCHEDULED > 01/24/2016 16:24:36 RangePartition: LocalSample(1/1) switched to DEPLOYING > 01/24/2016 16:24:36 DataSource (at > RangePartitionOnEmptyDataSet$.main(RangePartitionOnEmptyDataSet.scala:9) > (org.apache.flink.api.java.io.CollectionInputFormat))(1/1) switched to > FINISHED > 01/24/2016 16:24:36 RangePartition: PreparePartition(1/1) switched to > SCHEDULED > 01/24/2016 16:24:36 RangePartition: PreparePartition(1/1) switched to > DEPLOYING > 01/24/2016 16:24:36 RangePartition: LocalSample(1/1) switched to RUNNING > 01/24/2016 16:24:36 RangePartition: PreparePartition(1/1) switched to > RUNNING > 01/24/2016 16:24:36 RangePartition: GlobalSample(1/1) switched to SCHEDULED > 01/24/2016 16:24:36 RangePartition: GlobalSample(1/1) switched to DEPLOYING > 01/24/2016 16:24:36 RangePartition: LocalSample(1/1) switched to FINISHED > 01/24/2016 16:24:36 RangePartition: GlobalSample(1/1) switched to RUNNING > 01/24/2016 16:24:36 RangePartition: Histogram(1/1) switched to SCHEDULED > 01/24/2016 16:24:36 RangePartition: Histogram(1/1) switched to DEPLOYING > 01/24/2016 16:24:36 RangePartition: GlobalSample(1/1) switched to FINISHED > 01/24/2016 16:24:36 RangePartition: Histogram(1/1) switched to RUNNING > 01/24/2016 16:24:37 RangePartition: Histogram(1/1) switched to FAILED > java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 > at java.util.ArrayList.rangeCheck(ArrayList.java:653) > at java.util.ArrayList.get(ArrayList.java:429) > at > org.apache.flink.runtime.operators.udf.RangeBoundaryBuilder.mapPartition(RangeBoundaryBuilder.java:66) > at > org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:98) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:561) > at java.lang.Thread.run(Thread.java:745) > 01/24/2016 16:24:37 Job execution switched to status FAILING. > java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 > at java.util.ArrayList.rangeCheck(ArrayList.java:653) > at java.util.ArrayList.get(ArrayList.java:429) > at > org.apache.flink.runtime.operators.udf.RangeBoundaryBuilder.mapPartition(RangeBoundaryBuilder.java:66) > at > org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:98) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:561) > at java.lang.Thread.run(Thread.java:745) > 01/24/2016 16:24:37 RangePartition: PreparePartition(1/1) switched to > CANCELING > 01/24/2016 16:24:37 RangePartition: Partition(1/4) switched to CANCELED > 01/24/2016 16:24:37 RangePartition: Partition(2/4) switched to CANCELED > 01/24/2016 16:24:37 RangePartition: Partition(3/4) switched to CANCELED > 01/24/2016 16:24:37 RangePartition: Partition(4/4) switched to CANCELED > 01/24/2016 16:24:37 CHAIN
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15119050#comment-15119050 ] Chengxiang Li commented on FLINK-3226: -- [~fhueske] and [~twalthr] , this task should be blocked by me for a while, sorry about that. The task seems more complicated and has more work than i expected, it contains the physical plan genration part of Timo's prototype + the whole previous Table API translation part actually, to move this forward faster, as Fabian suggested, i agree that we can split it into multi sub tasks, such as expression code generation, and even by the Calcite RelNodes, for example, we can work on RelNodes with no dependency in parallel, such as Sort, Join and Aggregate, etc. i would make this task very small, which only contains the Project translator without expression code generation and make it ready during this week. Besides, while try to test my code, i found the {{PlannerImpl}} check its state during each step of query planning process, and there is no way to set it manually, for Table API, we actually skip several steps, such as parse, validate, it failed while we directly go to the optimize step due to the unmatched state, do you guys have any idea about this? > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-3282) Add FlinkRelNode interface.
[ https://issues.apache.org/jira/browse/FLINK-3282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chengxiang Li resolved FLINK-3282. -- Resolution: Fixed > Add FlinkRelNode interface. > --- > > Key: FLINK-3282 > URL: https://issues.apache.org/jira/browse/FLINK-3282 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Chengxiang Li >Assignee: Chengxiang Li > > Add FlinkRelNode interface for physical plan translator and Flink program > generator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-3282) Add FlinkRelNode interface.
[ https://issues.apache.org/jira/browse/FLINK-3282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chengxiang Li closed FLINK-3282. > Add FlinkRelNode interface. > --- > > Key: FLINK-3282 > URL: https://issues.apache.org/jira/browse/FLINK-3282 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Chengxiang Li >Assignee: Chengxiang Li > > Add FlinkRelNode interface for physical plan translator and Flink program > generator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-2871) Add OuterJoin strategy with HashTable on outer side
[ https://issues.apache.org/jira/browse/FLINK-2871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chengxiang Li resolved FLINK-2871. -- Resolution: Fixed Fix Version/s: 1.0.0 > Add OuterJoin strategy with HashTable on outer side > --- > > Key: FLINK-2871 > URL: https://issues.apache.org/jira/browse/FLINK-2871 > Project: Flink > Issue Type: New Feature > Components: Local Runtime, Optimizer >Affects Versions: 0.10.0 >Reporter: Fabian Hueske >Assignee: Chengxiang Li >Priority: Minor > Fix For: 1.0.0 > > > Outer joins are currently supported with two local execution strategies: > - sort-merge join > - hash join where the hash table is built on the inner side. Hence, this > strategy is only supported for left and right outer joins. > In order to support hash-tables on the outer side, we need a special hash > table implementation that gives access to all records which have not been > accessed during the probe phase. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3282) Add FlinkRelNode interface.
Chengxiang Li created FLINK-3282: Summary: Add FlinkRelNode interface. Key: FLINK-3282 URL: https://issues.apache.org/jira/browse/FLINK-3282 Project: Flink Issue Type: New Feature Components: Table API Reporter: Chengxiang Li Assignee: Chengxiang Li Add FlinkRelNode interface for physical plan translator and Flink program generator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15115034#comment-15115034 ] Chengxiang Li commented on FLINK-3226: -- Yes, that make senes, i have create a PR(https://github.com/apache/flink/pull/1544) for FlinkRelNode interface, [~twalthr] and [~fhueske], please let me know if you have any suggestion. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3281) IndexOutOfBoundsException when range-partitioning empty DataSet
[ https://issues.apache.org/jira/browse/FLINK-3281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15114659#comment-15114659 ] Chengxiang Li commented on FLINK-3281: -- [~fsander], thanks for finding this, i would work on it. > IndexOutOfBoundsException when range-partitioning empty DataSet > > > Key: FLINK-3281 > URL: https://issues.apache.org/jira/browse/FLINK-3281 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime, Local Runtime >Reporter: Fridtjof Sander > > Code: > {code} > import org.apache.flink.api.scala._ > object RangePartitionOnEmptyDataSet { > def main(args:Array[String]) = { > val env = ExecutionEnvironment.getExecutionEnvironment > env > .fromCollection(Seq[Tuple1[String]]()) > .partitionByRange(0) > .collect() > } > } > {code} > Output: > {noformat} > 01/24/2016 16:24:36 Job execution switched to status RUNNING. > 01/24/2016 16:24:36 DataSource (at > RangePartitionOnEmptyDataSet$.main(RangePartitionOnEmptyDataSet.scala:9) > (org.apache.flink.api.java.io.CollectionInputFormat))(1/1) switched to > SCHEDULED > 01/24/2016 16:24:36 DataSource (at > RangePartitionOnEmptyDataSet$.main(RangePartitionOnEmptyDataSet.scala:9) > (org.apache.flink.api.java.io.CollectionInputFormat))(1/1) switched to > DEPLOYING > 01/24/2016 16:24:36 DataSource (at > RangePartitionOnEmptyDataSet$.main(RangePartitionOnEmptyDataSet.scala:9) > (org.apache.flink.api.java.io.CollectionInputFormat))(1/1) switched to > RUNNING > 01/24/2016 16:24:36 RangePartition: LocalSample(1/1) switched to SCHEDULED > 01/24/2016 16:24:36 RangePartition: LocalSample(1/1) switched to DEPLOYING > 01/24/2016 16:24:36 DataSource (at > RangePartitionOnEmptyDataSet$.main(RangePartitionOnEmptyDataSet.scala:9) > (org.apache.flink.api.java.io.CollectionInputFormat))(1/1) switched to > FINISHED > 01/24/2016 16:24:36 RangePartition: PreparePartition(1/1) switched to > SCHEDULED > 01/24/2016 16:24:36 RangePartition: PreparePartition(1/1) switched to > DEPLOYING > 01/24/2016 16:24:36 RangePartition: LocalSample(1/1) switched to RUNNING > 01/24/2016 16:24:36 RangePartition: PreparePartition(1/1) switched to > RUNNING > 01/24/2016 16:24:36 RangePartition: GlobalSample(1/1) switched to SCHEDULED > 01/24/2016 16:24:36 RangePartition: GlobalSample(1/1) switched to DEPLOYING > 01/24/2016 16:24:36 RangePartition: LocalSample(1/1) switched to FINISHED > 01/24/2016 16:24:36 RangePartition: GlobalSample(1/1) switched to RUNNING > 01/24/2016 16:24:36 RangePartition: Histogram(1/1) switched to SCHEDULED > 01/24/2016 16:24:36 RangePartition: Histogram(1/1) switched to DEPLOYING > 01/24/2016 16:24:36 RangePartition: GlobalSample(1/1) switched to FINISHED > 01/24/2016 16:24:36 RangePartition: Histogram(1/1) switched to RUNNING > 01/24/2016 16:24:37 RangePartition: Histogram(1/1) switched to FAILED > java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 > at java.util.ArrayList.rangeCheck(ArrayList.java:653) > at java.util.ArrayList.get(ArrayList.java:429) > at > org.apache.flink.runtime.operators.udf.RangeBoundaryBuilder.mapPartition(RangeBoundaryBuilder.java:66) > at > org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:98) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:561) > at java.lang.Thread.run(Thread.java:745) > 01/24/2016 16:24:37 Job execution switched to status FAILING. > java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 > at java.util.ArrayList.rangeCheck(ArrayList.java:653) > at java.util.ArrayList.get(ArrayList.java:429) > at > org.apache.flink.runtime.operators.udf.RangeBoundaryBuilder.mapPartition(RangeBoundaryBuilder.java:66) > at > org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:98) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:561) > at java.lang.Thread.run(Thread.java:745) > 01/24/2016 16:24:37 RangePartition: PreparePartition(1/1) switched to > CANCELING > 01/24/2016 16:24:37 RangePartition: Partition(1/4) switched to CANCELED > 01/24/2016 16:24:37 RangePartition: Partition(2/4) switched to CANCELED > 01/24/2016 16:24:37 RangePartition: Partition(3/4) switched to CANCELED > 01/24/2016 16:24:37 RangePartition: Partition(4/4) switched to CANCELED > 01/24/2016 16:24:37 CHAIN Partition -> FlatMap
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15106292#comment-15106292 ] Chengxiang Li commented on FLINK-3226: -- I list the mapping relationship of Calcite RelNodes, Flink RelNodes and Flink DataSet Operators at [here|https://docs.google.com/document/d/12wf4EEmnwA-SMr-oEwZ0IUZqH1ofK1fUWZJfeOBrVpc/edit?usp=sharing], [~fhueske] and [~twalthr], please let me know if you have any suggestion. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15095914#comment-15095914 ] Chengxiang Li commented on FLINK-3226: -- Cool, [~fhueske]. Before actual work, I would upload a document which contains the mapping of Calcite RelNodes, Flink RelNodes and Flink DataSet Operators, which should be helpful to coordinate with FLINK-3227. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15095470#comment-15095470 ] Chengxiang Li commented on FLINK-3226: -- [~fhueske], i would like to contribute on this task. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2871) Add OuterJoin strategy with HashTable on outer side
[ https://issues.apache.org/jira/browse/FLINK-2871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15061322#comment-15061322 ] Chengxiang Li commented on FLINK-2871: -- I would like to contribute on this. There are 4 reserved bytes left in bucket header of {{MutableHashTable}}, as there are only 9 elements in each bucket, we could use 2 bytes to build a BitSet to mark whether elements in each bucket has been probed during probe phase, and return the elements which has not been probed at the end. > Add OuterJoin strategy with HashTable on outer side > --- > > Key: FLINK-2871 > URL: https://issues.apache.org/jira/browse/FLINK-2871 > Project: Flink > Issue Type: New Feature > Components: Local Runtime, Optimizer >Affects Versions: 0.10.0 >Reporter: Fabian Hueske >Priority: Minor > > Outer joins are currently supported with two local execution strategies: > - sort-merge join > - hash join where the hash table is built on the inner side. Hence, this > strategy is only supported for left and right outer joins. > In order to support hash-tables on the outer side, we need a special hash > table implementation that gives access to all records which have not been > accessed during the probe phase. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-2871) Add OuterJoin strategy with HashTable on outer side
[ https://issues.apache.org/jira/browse/FLINK-2871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chengxiang Li reassigned FLINK-2871: Assignee: Chengxiang Li > Add OuterJoin strategy with HashTable on outer side > --- > > Key: FLINK-2871 > URL: https://issues.apache.org/jira/browse/FLINK-2871 > Project: Flink > Issue Type: New Feature > Components: Local Runtime, Optimizer >Affects Versions: 0.10.0 >Reporter: Fabian Hueske >Assignee: Chengxiang Li >Priority: Minor > > Outer joins are currently supported with two local execution strategies: > - sort-merge join > - hash join where the hash table is built on the inner side. Hence, this > strategy is only supported for left and right outer joins. > In order to support hash-tables on the outer side, we need a special hash > table implementation that gives access to all records which have not been > accessed during the probe phase. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3139) NULL values handling in Table API
Chengxiang Li created FLINK-3139: Summary: NULL values handling in Table API Key: FLINK-3139 URL: https://issues.apache.org/jira/browse/FLINK-3139 Project: Flink Issue Type: Task Components: Table API Reporter: Chengxiang Li This is an umbrella task for NULL value handling in Table API. As the logical API for queries, Table API should support handling NULL values, NULL value is quite a common case during logical query, for example: # Data source may miss column value in many cases("value missing", "value unknown", "value not applicable" .etc ). # Some operators generate Null values on their own, like Outer Join/Cube/Rollup/Grouping Set, and so on. Null values handling in Table API is the prerequisite of these features. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3141) Design of NULL values handling in operation
Chengxiang Li created FLINK-3141: Summary: Design of NULL values handling in operation Key: FLINK-3141 URL: https://issues.apache.org/jira/browse/FLINK-3141 Project: Flink Issue Type: Sub-task Reporter: Chengxiang Li We discuss and finalize how NULL value is handled in specified cases here. this is the first proposal: # NULL compare. In ascending order, NULL is smaller than any other value, and NULL == NULL return false. # NULL exists in GroupBy Key, all NULL values are grouped as a single group. # NULL exists in Aggregate columns, ignore NULL in aggregation function. # NULL exists in both side Join key, refer to #i, NULL == NULL return false, no output for NULL Join key. # NULL in Scalar expression, expression within NULL(eg. 1 + NULL) return NULL. # NULL in Boolean expression, add an extra result: UNKNOWN, more semantic for Boolean expression in reference #1. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3140) NULL value data layout in Row Serializer/Comparator
Chengxiang Li created FLINK-3140: Summary: NULL value data layout in Row Serializer/Comparator Key: FLINK-3140 URL: https://issues.apache.org/jira/browse/FLINK-3140 Project: Flink Issue Type: Sub-task Components: Table API Reporter: Chengxiang Li To store/materialize NULL value in Row objects, we should need new Row Serializer/Comparator which is aware of NULL value fields. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-3087) Table API do not support multi count in aggregation.
[ https://issues.apache.org/jira/browse/FLINK-3087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chengxiang Li resolved FLINK-3087. -- Resolution: Fixed Fix Version/s: 1.0.0 > Table API do not support multi count in aggregation. > > > Key: FLINK-3087 > URL: https://issues.apache.org/jira/browse/FLINK-3087 > Project: Flink > Issue Type: Bug > Components: Table API >Affects Versions: 0.10.0 >Reporter: Chengxiang Li >Assignee: Chengxiang Li > Fix For: 1.0.0 > > > Multi {{count}} in aggregation is not supported, for example: > {code:java} > table.select("a.count", "b.count") > {code} > It's valid in grammar, besides, {{a.count}} and {{b.count}} may have > different values actually if NULL value handling is enabled. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3098) Cast from Date to Long throw compile error.
Chengxiang Li created FLINK-3098: Summary: Cast from Date to Long throw compile error. Key: FLINK-3098 URL: https://issues.apache.org/jira/browse/FLINK-3098 Project: Flink Issue Type: Bug Components: Table API Affects Versions: 0.10.1 Reporter: Chengxiang Li Assignee: Chengxiang Li While cast Date to Long in Table API, the generated code throw comile error. Looks like: {code} Caused by: org.codehaus.commons.compiler.CompileException: Line 33, Column 23: Expression "result$5" is not an rvalue at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:10062) at org.codehaus.janino.UnitCompiler.toRvalueOrCompileException(UnitCompiler.java:5960) at org.codehaus.janino.UnitCompiler.compileContext2(UnitCompiler.java:3172) at org.codehaus.janino.UnitCompiler.access$5400(UnitCompiler.java:182) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3087) Table API do not support multi count in aggregation.
Chengxiang Li created FLINK-3087: Summary: Table API do not support multi count in aggregation. Key: FLINK-3087 URL: https://issues.apache.org/jira/browse/FLINK-3087 Project: Flink Issue Type: Bug Components: Table API Affects Versions: 0.10.0 Reporter: Chengxiang Li Assignee: Chengxiang Li Multi {{count}} in aggregation is not supported, for example: {code:java} table.select("a.count", "b.count") {code} It's valid in grammar, besides, {{a.count}} and {{b.count}} may have different values actually if NULL value handling is enabled. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-2115) TableAPI throws ExpressionException for "Dangling GroupBy operation"
[ https://issues.apache.org/jira/browse/FLINK-2115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chengxiang Li reassigned FLINK-2115: Assignee: Chengxiang Li > TableAPI throws ExpressionException for "Dangling GroupBy operation" > > > Key: FLINK-2115 > URL: https://issues.apache.org/jira/browse/FLINK-2115 > Project: Flink > Issue Type: Bug > Components: Table API >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > The following program below throws an ExpressionException due to a "Dangling > GroupBy operation". > However, I think the program is semantically correct and should execute. > {code} > public static void main(String[] args) throws Exception { > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > DataSet data = env.fromElements(1,2,2,3,3,3,4,4,4,4); > DataSet> tuples = data > .map(new MapFunction >() { > @Override > public Tuple2 map(Integer i) throws Exception { > return new Tuple2 (i, i*2); > } > }); > TableEnvironment tEnv = new TableEnvironment(); > Table t = tEnv.toTable(tuples).as("i, i2") > .groupBy("i, i2").select("i, i2") > .groupBy("i").select("i, i.count as cnt"); > tEnv.toSet(t, Row.class).print(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2980) Add CUBE/ROLLUP/GROUPING SETS operator in Table API.
[ https://issues.apache.org/jira/browse/FLINK-2980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15010576#comment-15010576 ] Chengxiang Li commented on FLINK-2980: -- This depends on the support of null value in Table API, which have been discussed before but not enabled yet. Will revisit this feature after null value support is enabled in Table API. > Add CUBE/ROLLUP/GROUPING SETS operator in Table API. > > > Key: FLINK-2980 > URL: https://issues.apache.org/jira/browse/FLINK-2980 > Project: Flink > Issue Type: Bug > Components: Documentation, Table API >Reporter: Chengxiang Li >Assignee: Chengxiang Li > Attachments: Cube-Rollup-GroupSet design doc in Flink.pdf > > > Computing aggregates over a cube/rollup/grouping sets of several dimensions > is a common operation in data warehousing. It would be nice to have them in > Table API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-2980) Add CUBE/ROLLUP/GROUPING SETS operator in Table API.
[ https://issues.apache.org/jira/browse/FLINK-2980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chengxiang Li reassigned FLINK-2980: Assignee: Chengxiang Li > Add CUBE/ROLLUP/GROUPING SETS operator in Table API. > > > Key: FLINK-2980 > URL: https://issues.apache.org/jira/browse/FLINK-2980 > Project: Flink > Issue Type: Bug > Components: Documentation, Table API >Reporter: Chengxiang Li >Assignee: Chengxiang Li > Attachments: Cube-Rollup-GroupSet design doc in Flink.pdf > > > Computing aggregates over a cube/rollup/grouping sets of several dimensions > is a common operation in data warehousing. It would be nice to have them in > Table API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2998) Support range partition comparison for multi input nodes.
Chengxiang Li created FLINK-2998: Summary: Support range partition comparison for multi input nodes. Key: FLINK-2998 URL: https://issues.apache.org/jira/browse/FLINK-2998 Project: Flink Issue Type: New Feature Components: Optimizer Reporter: Chengxiang Li Priority: Minor The optimizer may have potential opportunity to optimize the DAG while it found two input range partition are equivalent, we does not support the comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2997) Support range partition with user customized data distribution.
Chengxiang Li created FLINK-2997: Summary: Support range partition with user customized data distribution. Key: FLINK-2997 URL: https://issues.apache.org/jira/browse/FLINK-2997 Project: Flink Issue Type: New Feature Reporter: Chengxiang Li This is a followup work of FLINK-7, sometime user have better knowledge of the source data, and they can build customized data distribution to do range partition more efficiently. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-2955) Add operations introduction in Table API page.
[ https://issues.apache.org/jira/browse/FLINK-2955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chengxiang Li reassigned FLINK-2955: Assignee: Chengxiang Li > Add operations introduction in Table API page. > -- > > Key: FLINK-2955 > URL: https://issues.apache.org/jira/browse/FLINK-2955 > Project: Flink > Issue Type: New Feature > Components: Documentation >Reporter: Chengxiang Li >Assignee: Chengxiang Li >Priority: Minor > > On the Table API page, there is no formal introduction of current supported > operations, it should be nice to have it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2980) Add CUBE/ROLLUP/GROUPING SETS operator in Table API.
[ https://issues.apache.org/jira/browse/FLINK-2980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chengxiang Li updated FLINK-2980: - Attachment: Cube-Rollup-GroupSet design doc in Flink.pdf > Add CUBE/ROLLUP/GROUPING SETS operator in Table API. > > > Key: FLINK-2980 > URL: https://issues.apache.org/jira/browse/FLINK-2980 > Project: Flink > Issue Type: Bug > Components: Documentation, Table API >Reporter: Chengxiang Li > Attachments: Cube-Rollup-GroupSet design doc in Flink.pdf > > > Computing aggregates over a cube/rollup/grouping sets of several dimensions > is a common operation in data warehousing. It would be nice to have them in > Table API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2980) Add CUBE/ROLLUP/GROUPING SETS operator in Table API.
Chengxiang Li created FLINK-2980: Summary: Add CUBE/ROLLUP/GROUPING SETS operator in Table API. Key: FLINK-2980 URL: https://issues.apache.org/jira/browse/FLINK-2980 Project: Flink Issue Type: Bug Components: Documentation, Table API Reporter: Chengxiang Li Computing aggregates over a cube/rollup/grouping sets of several dimensions is a common operation in data warehousing. It would be nice to have them in Table API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2956) Migrate integration tests for Table API
Chengxiang Li created FLINK-2956: Summary: Migrate integration tests for Table API Key: FLINK-2956 URL: https://issues.apache.org/jira/browse/FLINK-2956 Project: Flink Issue Type: Sub-task Components: Tests Reporter: Chengxiang Li Priority: Minor Migrate integration tests of Table API from temp file to collect() as described in umbrella jira.. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2951) Add Union operator to Table API.
Chengxiang Li created FLINK-2951: Summary: Add Union operator to Table API. Key: FLINK-2951 URL: https://issues.apache.org/jira/browse/FLINK-2951 Project: Flink Issue Type: New Feature Components: Documentation, Table API Reporter: Chengxiang Li Assignee: Chengxiang Li Priority: Minor Currently, union operation is supported by DataSet/DataStream API, not available in Table API. To union two tables, user has to transform the parameter input to DataSet/DataStream, for example: {code:java} val unionDs = left.union(right.toDataSet[Row]) {code} It should be more API friendly to user if add union operation to Table API, like: {code:java} val unionDs = left.union(right) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2848) Refactor Flink benchmarks with JMH and move to flink-benchmark module
Chengxiang Li created FLINK-2848: Summary: Refactor Flink benchmarks with JMH and move to flink-benchmark module Key: FLINK-2848 URL: https://issues.apache.org/jira/browse/FLINK-2848 Project: Flink Issue Type: Test Components: Tests Reporter: Chengxiang Li Assignee: Chengxiang Li Priority: Minor There are many flink internal micro benchmarks in different modules, which are coarse measured(by System.currentNanoTime()...), and with no warmup or multi iteration test. This is an umbrella JIRA to refactor these micro benchmarks and move to flink-benchmark module for central management. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-7) [GitHub] Enable Range Partitioner
[ https://issues.apache.org/jira/browse/FLINK-7?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14933125#comment-14933125 ] Chengxiang Li commented on FLINK-7: --- The sample operator has been enabled at [FLINK-1901|https://issues.apache.org/jira/browse/FLINK-1901], which means we can generate sample histogram automatically now. I think it's time to continue this work, i would like to contribute on this if no one is working on this. > [GitHub] Enable Range Partitioner > - > > Key: FLINK-7 > URL: https://issues.apache.org/jira/browse/FLINK-7 > Project: Flink > Issue Type: Sub-task >Reporter: GitHub Import > Labels: github-import > Fix For: pre-apache > > > The range partitioner is currently disabled. We need to implement the > following aspects: > 1) Distribution information, if available, must be propagated back together > with the ordering property. > 2) A generic bucket lookup structure (currently specific to PactRecord). > Tests to re-enable after fixing this issue: > - TeraSortITCase > - GlobalSortingITCase > - GlobalSortingMixedOrderITCase > Imported from GitHub > Url: https://github.com/stratosphere/stratosphere/issues/7 > Created by: [StephanEwen|https://github.com/StephanEwen] > Labels: core, enhancement, optimizer, > Milestone: Release 0.4 > Assignee: [fhueske|https://github.com/fhueske] > Created at: Fri Apr 26 13:48:24 CEST 2013 > State: open -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2745) Create a new module for all Flink micro benchmark.
Chengxiang Li created FLINK-2745: Summary: Create a new module for all Flink micro benchmark. Key: FLINK-2745 URL: https://issues.apache.org/jira/browse/FLINK-2745 Project: Flink Issue Type: New Feature Components: Tests Reporter: Chengxiang Li Assignee: Chengxiang Li Priority: Minor Currently in Flink, there many micro benchmarks spread from different modules, these benchmarks measure on manual, triggered by Junit test, no warmup, no multi iteration. Move all benchmark to a single module and import [JMH|http://openjdk.java.net/projects/code-tools/jmh/] as the back benchmark framework may help Flink devlopers to build benchmark more accurate and easier. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2743) Add new RNG based on XORShift algorithm
Chengxiang Li created FLINK-2743: Summary: Add new RNG based on XORShift algorithm Key: FLINK-2743 URL: https://issues.apache.org/jira/browse/FLINK-2743 Project: Flink Issue Type: Improvement Components: Core Reporter: Chengxiang Li Assignee: Chengxiang Li Priority: Minor [XORShift algorithm|https://en.wikipedia.org/wiki/Xorshift] is an optimized algorithm for random number generator, implement a RNG based on it would help to improve the performance of operations where RNG is heavily used. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2754) FixedLengthRecordSorter can not write to output cross MemorySegments.
Chengxiang Li created FLINK-2754: Summary: FixedLengthRecordSorter can not write to output cross MemorySegments. Key: FLINK-2754 URL: https://issues.apache.org/jira/browse/FLINK-2754 Project: Flink Issue Type: Bug Components: Distributed Runtime Reporter: Chengxiang Li Assignee: Chengxiang Li FixedLengthRecordSorter can not write to output cross MemorySegments, it works well as it's only called to write a single record before. Should fix it and add more unit test. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2533) Gap based random sample optimization
[ https://issues.apache.org/jira/browse/FLINK-2533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chengxiang Li updated FLINK-2533: - Assignee: GaoLun > Gap based random sample optimization > > > Key: FLINK-2533 > URL: https://issues.apache.org/jira/browse/FLINK-2533 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Chengxiang Li >Assignee: GaoLun >Priority: Minor > > For random sampler with fraction, like BernoulliSampler and PoissonSampler, > Gap based random sampler could exploit O(k) sample implementation instead of > previous O\(n\) sample implementation, it should perform better while sample > fraction is very small. [This > blog|http://erikerlandson.github.io/blog/2014/09/11/faster-random-samples-with-gap-sampling/] > describes more detail about gap based random sampler. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-2535) Fixed size sample algorithm optimization
[ https://issues.apache.org/jira/browse/FLINK-2535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chengxiang Li closed FLINK-2535. Resolution: Won't Fix > Fixed size sample algorithm optimization > > > Key: FLINK-2535 > URL: https://issues.apache.org/jira/browse/FLINK-2535 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Chengxiang Li >Priority: Minor > Attachments: sampling.png > > > Fixed size sample algorithm is known to be less efficient than sample > algorithms with fraction, but sometime it's necessary. Some optimization > could significantly reduce the storage size and computation cost, such as the > algorithm described in [this > paper|http://machinelearning.wustl.edu/mlpapers/papers/icml2013_meng13a]. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2535) Fixed size sample algorithm optimization
[ https://issues.apache.org/jira/browse/FLINK-2535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14732155#comment-14732155 ] Chengxiang Li commented on FLINK-2535: -- Thanks for your research, [~gallenvara_bg], since there is no obvious performance improvement, i would just close this issue. > Fixed size sample algorithm optimization > > > Key: FLINK-2535 > URL: https://issues.apache.org/jira/browse/FLINK-2535 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Chengxiang Li >Priority: Minor > Attachments: sampling.png > > > Fixed size sample algorithm is known to be less efficient than sample > algorithms with fraction, but sometime it's necessary. Some optimization > could significantly reduce the storage size and computation cost, such as the > algorithm described in [this > paper|http://machinelearning.wustl.edu/mlpapers/papers/icml2013_meng13a]. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-2596) Failing Test: RandomSamplerTest
[ https://issues.apache.org/jira/browse/FLINK-2596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chengxiang Li reassigned FLINK-2596: Assignee: Chengxiang Li > Failing Test: RandomSamplerTest > --- > > Key: FLINK-2596 > URL: https://issues.apache.org/jira/browse/FLINK-2596 > Project: Flink > Issue Type: Bug > Components: Tests >Reporter: Matthias J. Sax >Assignee: Chengxiang Li >Priority: Critical > Labels: test-stability > > {noformat} > Tests run: 17, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 14.925 sec > <<< FAILURE! - in org.apache.flink.api.java.sampling.RandomSamplerTest > testReservoirSamplerWithMultiSourcePartitions2(org.apache.flink.api.java.sampling.RandomSamplerTest) > Time elapsed: 0.444 sec <<< ERROR! > java.lang.IllegalArgumentException: Comparison method violates its general > contract! > at java.util.TimSort.mergeLo(TimSort.java:747) > at java.util.TimSort.mergeAt(TimSort.java:483) > at java.util.TimSort.mergeCollapse(TimSort.java:410) > at java.util.TimSort.sort(TimSort.java:214) > at java.util.TimSort.sort(TimSort.java:173) > at java.util.Arrays.sort(Arrays.java:659) > at java.util.Collections.sort(Collections.java:217) > at > org.apache.flink.api.java.sampling.RandomSamplerTest.transferFromListToArrayWithOrder(RandomSamplerTest.java:375) > at > org.apache.flink.api.java.sampling.RandomSamplerTest.getSampledOutput(RandomSamplerTest.java:367) > at > org.apache.flink.api.java.sampling.RandomSamplerTest.verifyKSTest(RandomSamplerTest.java:338) > at > org.apache.flink.api.java.sampling.RandomSamplerTest.verifyRandomSamplerWithSampleSize(RandomSamplerTest.java:330) > at > org.apache.flink.api.java.sampling.RandomSamplerTest.verifyReservoirSamplerWithReplacement(RandomSamplerTest.java:290) > at > org.apache.flink.api.java.sampling.RandomSamplerTest.testReservoirSamplerWithMultiSourcePartitions2(RandomSamplerTest.java:212) > Results : > Tests in error: > RandomSamplerTest.testReservoirSamplerWithMultiSourcePartitions2:212->verifyReservoirSamplerWithReplacement:290->verifyRandomSamplerWithSampleSize:330->verifyKSTest:338->getSampledOutput:367->transferFromListToArrayWithOrder:375 > ยป IllegalArgument > {noformat} > https://travis-ci.org/apache/flink/jobs/77750329 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2545) NegativeArraySizeException while creating hash table bloom filters
[ https://issues.apache.org/jira/browse/FLINK-2545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14716152#comment-14716152 ] Chengxiang Li commented on FLINK-2545: -- Hi, [~greghogan], do you mind to share how to reproduce this issue? The exception caused by a negative array index, which represents the count of bucket member. It should be very simple to add a check of the count value to fix this issue. But the count should never be negative according to the hashtable implementation, so if it's possible, i want to reproduce this issue to check if there is other hidden issues behind this. NegativeArraySizeException while creating hash table bloom filters -- Key: FLINK-2545 URL: https://issues.apache.org/jira/browse/FLINK-2545 Project: Flink Issue Type: Bug Components: Distributed Runtime Affects Versions: master Reporter: Greg Hogan Assignee: Chengxiang Li The following exception occurred a second time when I immediately re-ran my application, though after recompiling and restarting Flink the subsequent execution ran without error. java.lang.Exception: The data preparation for task '...' , caused an error: null at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:465) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NegativeArraySizeException at org.apache.flink.runtime.operators.hash.MutableHashTable.buildBloomFilterForBucket(MutableHashTable.java:1160) at org.apache.flink.runtime.operators.hash.MutableHashTable.buildBloomFilterForBucketsInPartition(MutableHashTable.java:1143) at org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1117) at org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:946) at org.apache.flink.runtime.operators.hash.MutableHashTable.insertIntoTable(MutableHashTable.java:868) at org.apache.flink.runtime.operators.hash.MutableHashTable.buildInitialTable(MutableHashTable.java:692) at org.apache.flink.runtime.operators.hash.MutableHashTable.open(MutableHashTable.java:455) at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashMatchIterator.open(ReusingBuildSecondHashMatchIterator.java:93) at org.apache.flink.runtime.operators.JoinDriver.prepare(JoinDriver.java:195) at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:459) ... 3 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-2545) NegativeArraySizeException while creating hash table bloom filters
[ https://issues.apache.org/jira/browse/FLINK-2545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chengxiang Li reassigned FLINK-2545: Assignee: Chengxiang Li NegativeArraySizeException while creating hash table bloom filters -- Key: FLINK-2545 URL: https://issues.apache.org/jira/browse/FLINK-2545 Project: Flink Issue Type: Bug Components: Distributed Runtime Affects Versions: master Reporter: Greg Hogan Assignee: Chengxiang Li The following exception occurred a second time when I immediately re-ran my application, though after recompiling and restarting Flink the subsequent execution ran without error. java.lang.Exception: The data preparation for task '...' , caused an error: null at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:465) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NegativeArraySizeException at org.apache.flink.runtime.operators.hash.MutableHashTable.buildBloomFilterForBucket(MutableHashTable.java:1160) at org.apache.flink.runtime.operators.hash.MutableHashTable.buildBloomFilterForBucketsInPartition(MutableHashTable.java:1143) at org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1117) at org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:946) at org.apache.flink.runtime.operators.hash.MutableHashTable.insertIntoTable(MutableHashTable.java:868) at org.apache.flink.runtime.operators.hash.MutableHashTable.buildInitialTable(MutableHashTable.java:692) at org.apache.flink.runtime.operators.hash.MutableHashTable.open(MutableHashTable.java:455) at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashMatchIterator.open(ReusingBuildSecondHashMatchIterator.java:93) at org.apache.flink.runtime.operators.JoinDriver.prepare(JoinDriver.java:195) at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:459) ... 3 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-2564) Failing Test: RandomSamplerTest
[ https://issues.apache.org/jira/browse/FLINK-2564?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chengxiang Li reassigned FLINK-2564: Assignee: Chengxiang Li Failing Test: RandomSamplerTest --- Key: FLINK-2564 URL: https://issues.apache.org/jira/browse/FLINK-2564 Project: Flink Issue Type: Bug Reporter: Matthias J. Sax Assignee: Chengxiang Li Labels: test-stability {noformat} Tests run: 17, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 15.943 sec FAILURE! - in org.apache.flink.api.java.sampling. testPoissonSamplerFraction(org.apache.flink.api.java.sampling.RandomSamplerTest) Time elapsed: 0.017 sec FAILURE! java.lang.AssertionError: expected fraction: 0.01, result fraction: 0.011300 at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.assertTrue(Assert.java:41) at org.apache.flink.api.java.sampling.RandomSamplerTest.verifySamplerFraction(RandomSamplerTest.java:249) at org.apache.flink.api.java.sampling.RandomSamplerTest.testPoissonSamplerFraction(RandomSamplerTest.java:116) Results : Failed tests: Successfully installed excon-0.33.0 RandomSamplerTest.testPoissonSamplerFraction:116-verifySamplerFraction:249 expected fraction: 0.01, result fraction: 0.011300 {noformat} Full log: https://travis-ci.org/apache/flink/jobs/76720572 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2564) Failing Test: RandomSamplerTest
[ https://issues.apache.org/jira/browse/FLINK-2564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709020#comment-14709020 ] Chengxiang Li commented on FLINK-2564: -- As discussed in [here|https://github.com/apache/flink/pull/949#issuecomment-133990256], it's a balance between accurate verification and failure-positive result. As this failure and KS test failure happens more often than expected, we should expand the verification boundary which would reduce the fail-positive case to an acceptable level. Failing Test: RandomSamplerTest --- Key: FLINK-2564 URL: https://issues.apache.org/jira/browse/FLINK-2564 Project: Flink Issue Type: Bug Reporter: Matthias J. Sax Assignee: Chengxiang Li Labels: test-stability {noformat} Tests run: 17, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 15.943 sec FAILURE! - in org.apache.flink.api.java.sampling. testPoissonSamplerFraction(org.apache.flink.api.java.sampling.RandomSamplerTest) Time elapsed: 0.017 sec FAILURE! java.lang.AssertionError: expected fraction: 0.01, result fraction: 0.011300 at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.assertTrue(Assert.java:41) at org.apache.flink.api.java.sampling.RandomSamplerTest.verifySamplerFraction(RandomSamplerTest.java:249) at org.apache.flink.api.java.sampling.RandomSamplerTest.testPoissonSamplerFraction(RandomSamplerTest.java:116) Results : Failed tests: Successfully installed excon-0.33.0 RandomSamplerTest.testPoissonSamplerFraction:116-verifySamplerFraction:249 expected fraction: 0.01, result fraction: 0.011300 {noformat} Full log: https://travis-ci.org/apache/flink/jobs/76720572 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2549) Add topK operator for DataSet
Chengxiang Li created FLINK-2549: Summary: Add topK operator for DataSet Key: FLINK-2549 URL: https://issues.apache.org/jira/browse/FLINK-2549 Project: Flink Issue Type: New Feature Components: Core, Java API, Scala API Reporter: Chengxiang Li Assignee: Chengxiang Li Priority: Minor topK is a common operation for user, it would be great to have it in Flink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2549) Add topK operator for DataSet
[ https://issues.apache.org/jira/browse/FLINK-2549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14704244#comment-14704244 ] Chengxiang Li commented on FLINK-2549: -- The basic idea of implementation is as following: # In map stage, sort and pick top K elements in each partition. # A single reduce task handle all map output, sort and pick top K elements as the final result. To fully manage the memory used for this operator, we may need a customized PriorityQueue which is built upon MemoryManager of Flink to sort unpredictable size elements with fixed size memory, as discussed at [here|https://github.com/apache/flink/pull/949#issuecomment-132692640]. Add topK operator for DataSet - Key: FLINK-2549 URL: https://issues.apache.org/jira/browse/FLINK-2549 Project: Flink Issue Type: New Feature Components: Core, Java API, Scala API Reporter: Chengxiang Li Assignee: Chengxiang Li Priority: Minor topK is a common operation for user, it would be great to have it in Flink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2533) Gap based random sample optimization
Chengxiang Li created FLINK-2533: Summary: Gap based random sample optimization Key: FLINK-2533 URL: https://issues.apache.org/jira/browse/FLINK-2533 Project: Flink Issue Type: Improvement Components: Core Reporter: Chengxiang Li Priority: Minor For random sampler with fraction, like BernoulliSampler and PoissonSampler, Gap based random sampler could exploit O(k) sample implementation instead of previous O\(n\) sample implementation, it should perform better while sample fraction is very small. [This blog|http://erikerlandson.github.io/blog/2014/09/11/faster-random-samples-with-gap-sampling/] describes more detail about gap based random sampler. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2535) Fixed size sample algorithm optimization
Chengxiang Li created FLINK-2535: Summary: Fixed size sample algorithm optimization Key: FLINK-2535 URL: https://issues.apache.org/jira/browse/FLINK-2535 Project: Flink Issue Type: Improvement Components: Core Reporter: Chengxiang Li Priority: Minor Fixed size sample algorithm is known to be less efficient than sample algorithms with fraction, but sometime it's necessary. Some optimization could significantly reduce the storage size and computation cost, such as the algorithm described in [this paper|http://machinelearning.wustl.edu/mlpapers/papers/icml2013_meng13a]. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14638272#comment-14638272 ] Chengxiang Li commented on FLINK-1901: -- Thanks,[~till.rohrmann], i got it now. This more like a iteration optimization issue to me, it assumes that the output of static code path would always be the same, so it cached the output for potential performance improvement, but this assumption is not always true, for example, static code path with random sampling operator, data source read from HBase, and so on. I think we could open a separate JIRA to address it in a uniform way instead of taking random sampling as a special case in this JIRA. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2396) Review the datasets of dynamic path and static path in iteration.
Chengxiang Li created FLINK-2396: Summary: Review the datasets of dynamic path and static path in iteration. Key: FLINK-2396 URL: https://issues.apache.org/jira/browse/FLINK-2396 Project: Flink Issue Type: Improvement Components: Core Reporter: Chengxiang Li Priority: Minor Currently Flink would cached dataset in static path as it assumes that dataset stay the same during the iteration, but this assumption does not always be true. Take sampling for example, the iteration data set is something like the weight vector of model and there is another training dataset from which to take a small sample to update the weight vector in each iteration (e.g. Stochastic Gradient Descent), we expect sampled dataset is different in each iteration, but Flink would cache the sampled dataset as it in static path. We should review how Flink identify dynamic path and static path, and support add sampled dataset in above example to dynamic path. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14638617#comment-14638617 ] Chengxiang Li commented on FLINK-1901: -- Thanks for the analysis, [~trohrm...@apache.org], I've created FLINK-2396 as a followup work to support sample and other similar datasets in iteration. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14636124#comment-14636124 ] Chengxiang Li commented on FLINK-1901: -- every point is sampled with probability 1/N is one of the sampling case(sampling with fraction, without replacement), there are 3 others kind of sampling case which is normally used as well, like sampling with fraction, with replacement, sampling with fixed size, without replacement and sampling with fixed size, with replacement. We should support all of them while expose a sampling operator to user. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634622#comment-14634622 ] Chengxiang Li edited comment on FLINK-1901 at 7/22/15 2:05 AM: --- To randomly choose a sample from a DataSet S, basically, there exists two kinds of sample requirement: sampling with fraction(such as randomly choose 5% percent items in S) and sampling with fixed size(such as randomly choose 100 items from S). Besides, we do not know the size of S, unless we take extra cost to computer it through DataSet::count(). # Sampling with fraction #* With replacement: the expected sample size follow [Poisson Distribution|https://en.wikipedia.org/wiki/Poisson_distribution] in this case, so Poisson Sampling can be used to choose the sample items. #* Without replacement: during sampling, we can take the sample of each item in iterator as a [Bernoulli Trial|https://en.wikipedia.org/wiki/Bernoulli_trial]. # Sampling with fixed size #* Use DataSet::count() to get the dataset size, with the fixed size, we can turn this into sampling with factor. #* [Reservoir Sampling|https://en.wikipedia.org/wiki/Reservoir_sampling] is another commonly used algorithm to randomly choose a sample of k items from a list S containing n items, where n is either a very large or unknown number, and there are different reservoir sampling algorithms that support reservoir support both sampling with replacement and sampling without replacement. was (Author: chengxiang li): To randomly choose a sample from a DataSet S, basically, there exists two kinds of sample requirement: sampling with factor(such as randomly choose 5% percent items in S) and sampling with fixed size(such as randomly choose 100 items from S). Besides, we do not know the size of S, unless we take extra cost to computer it through DataSet::count(). # Sampling with factor #* With replacement: the expected sample size follow [Poisson Distribution|https://en.wikipedia.org/wiki/Poisson_distribution] in this case, so Poisson Sampling can be used to choose the sample items. #* Without replacement: during sampling, we can take the sample of each item in iterator as a [Bernoulli Trial|https://en.wikipedia.org/wiki/Bernoulli_trial]. # Sampling with fixed size #* Use DataSet::count() to get the dataset size, with the fixed size, we can turn this into sampling with factor. #* [Reservoir Sampling|https://en.wikipedia.org/wiki/Reservoir_sampling] is another commonly used algorithm to randomly choose a sample of k items from a list S containing n items, where n is either a very large or unknown number, and there are different reservoir sampling algorithms that support reservoir support both sampling with replacement and sampling without replacement. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14636332#comment-14636332 ] Chengxiang Li commented on FLINK-1901: -- I write a simple example of sampling operator at [here|https://github.com/ChengXiangLi/flink/blob/FLINK-1901/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/TestSample.java], it works just as expected inside or outside of iteration(for example, 1000 items, sample fraction 0.5, after 3 iterations, output contains around 125 items), [~trohrm...@apache.org], i'm not sure whether i understand you correctly about sampling inside iteration, it looks same to me if it's the case in the example. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634622#comment-14634622 ] Chengxiang Li commented on FLINK-1901: -- To randomly choose a sample from a DataSet S, basically, there exists two kinds of sample requirement: sampling with factor(such as randomly choose 5% percent items in S) and sampling with fixed size(such as randomly choose 100 items from S). Besides, we do not know the size of S, unless we take extra cost to computer it through DataSet::count(). # Sampling with factor #* With replacement: the expected sample size follow [Poisson Distribution|https://en.wikipedia.org/wiki/Poisson_distribution] in this case, so Poisson Sampling can be used to choose the sample items. #* Without replacement: during sampling, we can take the sample of each item in iterator as a [Bernoulli Trial|https://en.wikipedia.org/wiki/Bernoulli_trial]. # Sampling with fixed size #* Use DataSet::count() to get the dataset size, with the fixed size, we can turn this into sampling with factor. #* [Reservoir Sampling|https://en.wikipedia.org/wiki/Reservoir_sampling] is another commonly used algorithm to randomly choose a sample of k items from a list S containing n items, where n is either a very large or unknown number, and there are different reservoir sampling algorithms that support reservoir support both sampling with replacement and sampling without replacement. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634822#comment-14634822 ] Chengxiang Li commented on FLINK-1901: -- Hi, [~sachingoel0101], I didn't find any related class about sampling while search the project with the keyword, is the PR you mentioned ongoing now? Besides ML algorithms, there should be other use case depends on sampling operation, such as range partition, and i believe sample operation itself is a common operation which may be used directly by user. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634628#comment-14634628 ] Chengxiang Li commented on FLINK-1901: -- Hi, [~tvas], i would like to contribute on this issue if there is no others working on it now. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2241) Use BloomFilter to minmize build side records which spilled to disk in Hybrid-Hash-Join
Chengxiang Li created FLINK-2241: Summary: Use BloomFilter to minmize build side records which spilled to disk in Hybrid-Hash-Join Key: FLINK-2241 URL: https://issues.apache.org/jira/browse/FLINK-2241 Project: Flink Issue Type: Improvement Components: Core Reporter: Chengxiang Li Priority: Minor In Hybrid-Hash-Join, while small table does not fit into memory, part of the small table data would be spilled to disk, and the counterpart partition of big table data would be spilled to disk in probe phase as well. If we build a BloomFilter while spill small table to disk during build phase, and use it to filter the big table records which tend to be spilled to disk, this may greatly reduce the spilled big table file size, and saved the disk IO cost for writing and further reading. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2240) Use BloomFilter to minimize probe side records which are spilled to disk in Hybrid-Hash-Join
[ https://issues.apache.org/jira/browse/FLINK-2240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chengxiang Li updated FLINK-2240: - Summary: Use BloomFilter to minimize probe side records which are spilled to disk in Hybrid-Hash-Join (was: Use BloomFilter to minmize build side records which spilled to disk in Hybrid-Hash-Join) Use BloomFilter to minimize probe side records which are spilled to disk in Hybrid-Hash-Join Key: FLINK-2240 URL: https://issues.apache.org/jira/browse/FLINK-2240 Project: Flink Issue Type: Improvement Components: Core Reporter: Chengxiang Li Priority: Minor In Hybrid-Hash-Join, while small table does not fit into memory, part of the small table data would be spilled to disk, and the counterpart partition of big table data would be spilled to disk in probe phase as well. If we build a BloomFilter while spill small table to disk during build phase, and use it to filter the big table records which tend to be spilled to disk, this may greatly reduce the spilled big table file size, and saved the disk IO cost for writing and further reading. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2241) Use BloomFilter to minmize probe side records which are spilled to disk in Hybrid-Hash-Join
[ https://issues.apache.org/jira/browse/FLINK-2241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14591552#comment-14591552 ] Chengxiang Li commented on FLINK-2241: -- Oh, thanks, [~till.rohrmann], i should double clicked my mouse. Use BloomFilter to minmize probe side records which are spilled to disk in Hybrid-Hash-Join --- Key: FLINK-2241 URL: https://issues.apache.org/jira/browse/FLINK-2241 Project: Flink Issue Type: Improvement Components: Core Reporter: Chengxiang Li Priority: Minor In Hybrid-Hash-Join, while small table does not fit into memory, part of the small table data would be spilled to disk, and the counterpart partition of big table data would be spilled to disk in probe phase as well. If we build a BloomFilter while spill small table to disk during build phase, and use it to filter the big table records which tend to be spilled to disk, this may greatly reduce the spilled big table file size, and saved the disk IO cost for writing and further reading. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2240) Use BloomFilter to minmize build side records which spilled to disk in Hybrid-Hash-Join
Chengxiang Li created FLINK-2240: Summary: Use BloomFilter to minmize build side records which spilled to disk in Hybrid-Hash-Join Key: FLINK-2240 URL: https://issues.apache.org/jira/browse/FLINK-2240 Project: Flink Issue Type: Improvement Components: Core Reporter: Chengxiang Li Priority: Minor In Hybrid-Hash-Join, while small table does not fit into memory, part of the small table data would be spilled to disk, and the counterpart partition of big table data would be spilled to disk in probe phase as well. If we build a BloomFilter while spill small table to disk during build phase, and use it to filter the big table records which tend to be spilled to disk, this may greatly reduce the spilled big table file size, and saved the disk IO cost for writing and further reading. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2241) Use BloomFilter to minmize probe side records which are spilled to disk in Hybrid-Hash-Join
[ https://issues.apache.org/jira/browse/FLINK-2241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chengxiang Li updated FLINK-2241: - Summary: Use BloomFilter to minmize probe side records which are spilled to disk in Hybrid-Hash-Join (was: Use BloomFilter to minmize build side records which spilled to disk in Hybrid-Hash-Join) Use BloomFilter to minmize probe side records which are spilled to disk in Hybrid-Hash-Join --- Key: FLINK-2241 URL: https://issues.apache.org/jira/browse/FLINK-2241 Project: Flink Issue Type: Improvement Components: Core Reporter: Chengxiang Li Priority: Minor In Hybrid-Hash-Join, while small table does not fit into memory, part of the small table data would be spilled to disk, and the counterpart partition of big table data would be spilled to disk in probe phase as well. If we build a BloomFilter while spill small table to disk during build phase, and use it to filter the big table records which tend to be spilled to disk, this may greatly reduce the spilled big table file size, and saved the disk IO cost for writing and further reading. -- This message was sent by Atlassian JIRA (v6.3.4#6332)