[jira] [Commented] (FLINK-3586) Risk of data overflow while use sum/count to calculate AVG value

2016-05-22 Thread Chengxiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15295804#comment-15295804
 ] 

Chengxiang Li commented on FLINK-3586:
--

Not at all, feel free to take over it, Fabian.

> Risk of data overflow while use sum/count to calculate AVG value
> 
>
> Key: FLINK-3586
> URL: https://issues.apache.org/jira/browse/FLINK-3586
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Chengxiang Li
>Assignee: Fabian Hueske
>Priority: Minor
>
> Now, we use {{(sum: Long, count: Long}} to store AVG partial aggregate data, 
> which may have data overflow risk, we should use unbounded data type(such as 
> BigInteger) to store them for necessary data types.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-3586) Risk of data overflow while use sum/count to calculate AVG value

2016-03-19 Thread Chengxiang Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chengxiang Li reassigned FLINK-3586:


Assignee: Chengxiang Li

> Risk of data overflow while use sum/count to calculate AVG value
> 
>
> Key: FLINK-3586
> URL: https://issues.apache.org/jira/browse/FLINK-3586
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Chengxiang Li
>Assignee: Chengxiang Li
>Priority: Minor
>
> Now, we use {{(sum: Long, count: Long}} to store AVG partial aggregate data, 
> which may have data overflow risk, we should use unbounded data type(such as 
> BigInteger) to store them for necessary data types.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-3475) DISTINCT aggregate function support

2016-03-19 Thread Chengxiang Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chengxiang Li reassigned FLINK-3475:


Assignee: Chengxiang Li

> DISTINCT aggregate function support
> ---
>
> Key: FLINK-3475
> URL: https://issues.apache.org/jira/browse/FLINK-3475
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Chengxiang Li
>Assignee: Chengxiang Li
>
> DISTINCT aggregate function may be able to reuse the aggregate function 
> instead of separate implementation, and let Flink runtime take care of 
> duplicate records.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3586) Risk of data overflow while use sum/count to calculate AVG value

2016-03-07 Thread Chengxiang Li (JIRA)
Chengxiang Li created FLINK-3586:


 Summary: Risk of data overflow while use sum/count to calculate 
AVG value
 Key: FLINK-3586
 URL: https://issues.apache.org/jira/browse/FLINK-3586
 Project: Flink
  Issue Type: Sub-task
  Components: Table API
Reporter: Chengxiang Li
Priority: Minor


Now, we use {{(sum: Long, count: Long}} to store AVG partial aggregate data, 
which may have data overflow risk, we should use unbounded data type(such as 
BigInteger) to store them for necessary data types.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3473) Add partial aggregate support in Flink

2016-03-02 Thread Chengxiang Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chengxiang Li updated FLINK-3473:
-
Attachment: PartialAggregateinFlink_v2.pdf

> Add partial aggregate support in Flink
> --
>
> Key: FLINK-3473
> URL: https://issues.apache.org/jira/browse/FLINK-3473
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Reporter: Chengxiang Li
>Assignee: Chengxiang Li
> Attachments: PartialAggregateinFlink_v1.pdf, 
> PartialAggregateinFlink_v2.pdf
>
>
> For decomposable aggregate function, partial aggregate is more efficient as 
> it significantly reduce the network traffic during shuffle and parallelize 
> part of the aggregate calculation. This is an umbrella task for partial 
> aggregate.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3508) Add more test cases to verify the rules of logical plan optimization

2016-02-25 Thread Chengxiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15167301#comment-15167301
 ] 

Chengxiang Li commented on FLINK-3508:
--

Merged to tableOnCalcite branch at 72686231fd8f9fa6a1c05df48f6f29eaa3ca4f2b.

> Add more test cases to verify the rules of logical plan optimization
> 
>
> Key: FLINK-3508
> URL: https://issues.apache.org/jira/browse/FLINK-3508
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Reporter: Chengxiang Li
>Assignee: Chengxiang Li
>Priority: Minor
>
> We have enabled many rules in logical plan optimization phase, more 
> complicated test cases should be added to verify whether these rules actally 
> work.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-3508) Add more test cases to verify the rules of logical plan optimization

2016-02-25 Thread Chengxiang Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chengxiang Li closed FLINK-3508.


> Add more test cases to verify the rules of logical plan optimization
> 
>
> Key: FLINK-3508
> URL: https://issues.apache.org/jira/browse/FLINK-3508
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Reporter: Chengxiang Li
>Assignee: Chengxiang Li
>Priority: Minor
>
> We have enabled many rules in logical plan optimization phase, more 
> complicated test cases should be added to verify whether these rules actally 
> work.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-3508) Add more test cases to verify the rules of logical plan optimization

2016-02-25 Thread Chengxiang Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chengxiang Li resolved FLINK-3508.
--
Resolution: Fixed

> Add more test cases to verify the rules of logical plan optimization
> 
>
> Key: FLINK-3508
> URL: https://issues.apache.org/jira/browse/FLINK-3508
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Reporter: Chengxiang Li
>Assignee: Chengxiang Li
>Priority: Minor
>
> We have enabled many rules in logical plan optimization phase, more 
> complicated test cases should be added to verify whether these rules actally 
> work.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3508) Add more test cases to verify the rules of logical plan optimization

2016-02-25 Thread Chengxiang Li (JIRA)
Chengxiang Li created FLINK-3508:


 Summary: Add more test cases to verify the rules of logical plan 
optimization
 Key: FLINK-3508
 URL: https://issues.apache.org/jira/browse/FLINK-3508
 Project: Flink
  Issue Type: Improvement
  Components: Table API
Reporter: Chengxiang Li
Assignee: Chengxiang Li
Priority: Minor


We have enabled many rules in logical plan optimization phase, more complicated 
test cases should be added to verify whether these rules actally work.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3507) PruneEmptyRules does not prune empty node as expected.

2016-02-24 Thread Chengxiang Li (JIRA)
Chengxiang Li created FLINK-3507:


 Summary: PruneEmptyRules does not prune empty node as expected. 
 Key: FLINK-3507
 URL: https://issues.apache.org/jira/browse/FLINK-3507
 Project: Flink
  Issue Type: Improvement
Reporter: Chengxiang Li
Priority: Minor


{noformat}
val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
  .where(Literal(false))
  .groupBy('b)
  .select('a.sum)
{noformat}
With PruneEmptyRules instances enabled, we expect the empty Aggregate and 
Project should be removed, while it's not removed after logical optimization.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3506) ReduceExpressionsRule does not remove duplicate expression in Filter

2016-02-24 Thread Chengxiang Li (JIRA)
Chengxiang Li created FLINK-3506:


 Summary: ReduceExpressionsRule does not remove duplicate 
expression in Filter
 Key: FLINK-3506
 URL: https://issues.apache.org/jira/browse/FLINK-3506
 Project: Flink
  Issue Type: Improvement
  Components: Table API
Reporter: Chengxiang Li
Priority: Minor


{noformat}
val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
val filterDs = ds.filter('a % 2 !== 0).filter('b % 2 === 0).filter('a % 2 !== 0)
{noformat}
According to the ReduceExpressionsRule definition, we expect the duplicted 
filter expression get removed, while it's not after logical optimization.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3486) Use Project to rename all record fields would fail following Project.

2016-02-24 Thread Chengxiang Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chengxiang Li updated FLINK-3486:
-
Summary: Use Project to rename all record fields would fail following 
Project.  (was: [SQL] Use Project to rename all record fields would fail 
following Project.)

> Use Project to rename all record fields would fail following Project.
> -
>
> Key: FLINK-3486
> URL: https://issues.apache.org/jira/browse/FLINK-3486
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
>Reporter: Chengxiang Li
>
> {noformat} val t = CollectionDataSets.get3TupleDataSet(env).toTable
>   .select('_1 as 'a, '_2 as 'b, '_3 as 'c)
>   .select('a, 'b)
> {noformat}
> would throw exception like:
> {noformat}
> java.lang.IllegalArgumentException: field [a] not found; input fields are: 
> [_1, _2, _3]
>   at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:290)
>   at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:275)
>   at 
> org.apache.flink.api.table.plan.RexNodeTranslator$.toRexNode(RexNodeTranslator.scala:80)
>   at org.apache.flink.api.table.Table$$anonfun$5.apply(table.scala:98)
>   at org.apache.flink.api.table.Table$$anonfun$5.apply(table.scala:98) 
> {noformat}
> new alias names are lost.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3502) AggregateProjectPullUpConstantsRule fails on certain group keys order.

2016-02-24 Thread Chengxiang Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chengxiang Li updated FLINK-3502:
-
Summary: AggregateProjectPullUpConstantsRule fails on certain group keys 
order.  (was: [SQL] AggregateProjectPullUpConstantsRule fails on certain group 
keys order.)

> AggregateProjectPullUpConstantsRule fails on certain group keys order.
> --
>
> Key: FLINK-3502
> URL: https://issues.apache.org/jira/browse/FLINK-3502
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
>Reporter: Chengxiang Li
>
> {noformat}
> val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
>   .select('b, 4 as 'four, 'a)
>   .groupBy('b, 'four)
>   .select('four, 'a.sum)
> {noformat}
> This query would throw exception like:
> {noformat}
> java.lang.AssertionError: Internal error: Error while applying rule 
> AggregateProjectPullUpConstantsRule, args 
> [rel#7:LogicalAggregate.NONE.[](input=rel#6:Subset#1.NONE.[],group={1, 
> 2},TMP_1=SUM($0)), 
> rel#5:LogicalProject.NONE.[](input=rel#4:Subset#0.NONE.[],a=$0,four=4,b=$1)]
>   at org.apache.calcite.util.Util.newInternal(Util.java:792)
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:251)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:826)
>   at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:304)
>   at 
> org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78)
>   at 
> org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:51)
>   at 
> org.apache.flink.api.scala.table.TableConversions.toDataSet(TableConversions.scala:35)
>   at 
> org.apache.flink.api.scala.table.test.GroupedAggregationsITCase.testGroupedByExpression(GroupedAggregationsITCase.scala:133)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>   at org.junit.runners.Suite.runChild(Suite.java:127)
>   at org.junit.runners.Suite.runChild(Suite.java:26)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:78)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:212)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
> Caused by: java.lang.AssertionError: Internal error: Error occurred 

[jira] [Updated] (FLINK-3486) [SQL] Use Project to rename all record fields would fail following Project.

2016-02-24 Thread Chengxiang Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chengxiang Li updated FLINK-3486:
-
Component/s: Table API

> [SQL] Use Project to rename all record fields would fail following Project.
> ---
>
> Key: FLINK-3486
> URL: https://issues.apache.org/jira/browse/FLINK-3486
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
>Reporter: Chengxiang Li
>
> {noformat} val t = CollectionDataSets.get3TupleDataSet(env).toTable
>   .select('_1 as 'a, '_2 as 'b, '_3 as 'c)
>   .select('a, 'b)
> {noformat}
> would throw exception like:
> {noformat}
> java.lang.IllegalArgumentException: field [a] not found; input fields are: 
> [_1, _2, _3]
>   at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:290)
>   at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:275)
>   at 
> org.apache.flink.api.table.plan.RexNodeTranslator$.toRexNode(RexNodeTranslator.scala:80)
>   at org.apache.flink.api.table.Table$$anonfun$5.apply(table.scala:98)
>   at org.apache.flink.api.table.Table$$anonfun$5.apply(table.scala:98) 
> {noformat}
> new alias names are lost.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3487) [SQL] FilterAggregateTransposeRule does not transform logical plan as desired.

2016-02-24 Thread Chengxiang Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chengxiang Li updated FLINK-3487:
-
Component/s: Table API

> [SQL] FilterAggregateTransposeRule does not transform logical plan as desired.
> --
>
> Key: FLINK-3487
> URL: https://issues.apache.org/jira/browse/FLINK-3487
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Reporter: Chengxiang Li
>Priority: Minor
>
> {noformat}
> val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
>   .groupBy('a)
>   .select('a, 'b.avg as 'value)
>   .filter('a === 1)
> {noformat}
> For this query, filter is expected to be pushed down before the aggregate by 
> FilterAggregateTransposeRule, currently the optimzied logical plan is not 
> what we desired.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3503) ProjectJoinTransposeRule fails to push down project.

2016-02-24 Thread Chengxiang Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chengxiang Li updated FLINK-3503:
-
Summary: ProjectJoinTransposeRule fails to push down project.  (was: [SQL] 
ProjectJoinTransposeRule fails to push down project.)

> ProjectJoinTransposeRule fails to push down project.
> 
>
> Key: FLINK-3503
> URL: https://issues.apache.org/jira/browse/FLINK-3503
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Reporter: Chengxiang Li
>Priority: Minor
>
> {noformat}
> val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
> val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
> val joinT = ds1.join(ds2).where('b === 'e).select('c, 'g)
> {noformat}
> For this query, ProjectJoinTransposeRule should pushes a Project past a Join 
> by splitting the projection into a projection on top of each child of the 
> join.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3487) FilterAggregateTransposeRule does not transform logical plan as desired.

2016-02-24 Thread Chengxiang Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chengxiang Li updated FLINK-3487:
-
Summary: FilterAggregateTransposeRule does not transform logical plan as 
desired.  (was: [SQL] FilterAggregateTransposeRule does not transform logical 
plan as desired.)

> FilterAggregateTransposeRule does not transform logical plan as desired.
> 
>
> Key: FLINK-3487
> URL: https://issues.apache.org/jira/browse/FLINK-3487
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Reporter: Chengxiang Li
>Priority: Minor
>
> {noformat}
> val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
>   .groupBy('a)
>   .select('a, 'b.avg as 'value)
>   .filter('a === 1)
> {noformat}
> For this query, filter is expected to be pushed down before the aggregate by 
> FilterAggregateTransposeRule, currently the optimzied logical plan is not 
> what we desired.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3505) JoinUnionTransposeRule fails to push Join past Union.

2016-02-24 Thread Chengxiang Li (JIRA)
Chengxiang Li created FLINK-3505:


 Summary: JoinUnionTransposeRule fails to push Join past Union.
 Key: FLINK-3505
 URL: https://issues.apache.org/jira/browse/FLINK-3505
 Project: Flink
  Issue Type: Bug
  Components: Table API
Reporter: Chengxiang Li


{noformat}
val unionDs = ds1
  .unionAll(ds2.select('a, 'b, 'c))
  .join(ds3)
  .where('a === 'k)
  .select('a, 'b, 'l)
{noformat}
For this query, JoinUnionTransposeRule fails to push push Join past Union. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3503) [SQL] ProjectJoinTransposeRule fails to push down project.

2016-02-24 Thread Chengxiang Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chengxiang Li updated FLINK-3503:
-
Component/s: Table API

> [SQL] ProjectJoinTransposeRule fails to push down project.
> --
>
> Key: FLINK-3503
> URL: https://issues.apache.org/jira/browse/FLINK-3503
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Reporter: Chengxiang Li
>Priority: Minor
>
> {noformat}
> val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
> val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
> val joinT = ds1.join(ds2).where('b === 'e).select('c, 'g)
> {noformat}
> For this query, ProjectJoinTransposeRule should pushes a Project past a Join 
> by splitting the projection into a projection on top of each child of the 
> join.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3504) Join fails while have expression inside join condition.

2016-02-24 Thread Chengxiang Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chengxiang Li updated FLINK-3504:
-
Summary: Join fails while have expression inside join condition.  (was: 
[SQL] Join fails while have expression inside join condition.)

> Join fails while have expression inside join condition.
> ---
>
> Key: FLINK-3504
> URL: https://issues.apache.org/jira/browse/FLINK-3504
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
>Reporter: Chengxiang Li
>
> {noformat}
> val ds1 = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
> val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
> val joinT = ds1.join(ds2).filter('a + 3 === 'd).select('c, 'g)
> {noformat}
> This query would throw exception:
> {noformat}
> Caused by: org.apache.flink.api.table.TableException: Joins should have at 
> least one equality condition
>   at 
> org.apache.flink.api.table.plan.rules.dataset.DataSetJoinRule.convert(DataSetJoinRule.scala:57)
>   at 
> org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:116)
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:228)
>   ... 44 more
> {noformat}
> There are 2 issues here:
> # DataSetJoinRule does not support expression inside join condition.
> # JoinPushExpressionsRulewould add a Project to calculate expression value 
> before Join, so the join condition does not include expression any more, 
> however, it's not returned after the logical optimization.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3504) [SQL] Join fails while have expression inside join condition.

2016-02-24 Thread Chengxiang Li (JIRA)
Chengxiang Li created FLINK-3504:


 Summary: [SQL] Join fails while have expression inside join 
condition.
 Key: FLINK-3504
 URL: https://issues.apache.org/jira/browse/FLINK-3504
 Project: Flink
  Issue Type: Bug
  Components: Table API
Reporter: Chengxiang Li


{noformat}
val ds1 = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
val joinT = ds1.join(ds2).filter('a + 3 === 'd).select('c, 'g)
{noformat}
This query would throw exception:
{noformat}
Caused by: org.apache.flink.api.table.TableException: Joins should have at 
least one equality condition
at 
org.apache.flink.api.table.plan.rules.dataset.DataSetJoinRule.convert(DataSetJoinRule.scala:57)
at 
org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:116)
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:228)
... 44 more
{noformat}
There are 2 issues here:
# DataSetJoinRule does not support expression inside join condition.
# JoinPushExpressionsRulewould add a Project to calculate expression value 
before Join, so the join condition does not include expression any more, 
however, it's not returned after the logical optimization.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3487) [SQL] FilterAggregateTransposeRule does not transform logical plan as desired.

2016-02-24 Thread Chengxiang Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chengxiang Li updated FLINK-3487:
-
Issue Type: Improvement  (was: Bug)

> [SQL] FilterAggregateTransposeRule does not transform logical plan as desired.
> --
>
> Key: FLINK-3487
> URL: https://issues.apache.org/jira/browse/FLINK-3487
> Project: Flink
>  Issue Type: Improvement
>Reporter: Chengxiang Li
>Priority: Minor
>
> {noformat}
> val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
>   .groupBy('a)
>   .select('a, 'b.avg as 'value)
>   .filter('a === 1)
> {noformat}
> For this query, filter is expected to be pushed down before the aggregate by 
> FilterAggregateTransposeRule, currently the optimzied logical plan is not 
> what we desired.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3503) [SQL] ProjectJoinTransposeRule fails to push down project.

2016-02-24 Thread Chengxiang Li (JIRA)
Chengxiang Li created FLINK-3503:


 Summary: [SQL] ProjectJoinTransposeRule fails to push down project.
 Key: FLINK-3503
 URL: https://issues.apache.org/jira/browse/FLINK-3503
 Project: Flink
  Issue Type: Improvement
Reporter: Chengxiang Li
Priority: Minor


{noformat}
val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)

val joinT = ds1.join(ds2).where('b === 'e).select('c, 'g)
{noformat}
For this query, ProjectJoinTransposeRule should pushes a Project past a Join by 
splitting the projection into a projection on top of each child of the join.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3502) [SQL] AggregateProjectPullUpConstantsRule fails on certain group keys order.

2016-02-24 Thread Chengxiang Li (JIRA)
Chengxiang Li created FLINK-3502:


 Summary: [SQL] AggregateProjectPullUpConstantsRule fails on 
certain group keys order.
 Key: FLINK-3502
 URL: https://issues.apache.org/jira/browse/FLINK-3502
 Project: Flink
  Issue Type: Bug
  Components: Table API
Reporter: Chengxiang Li


{noformat}
val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
  .select('b, 4 as 'four, 'a)
  .groupBy('b, 'four)
  .select('four, 'a.sum)
{noformat}
This query would throw exception like:
{noformat}
java.lang.AssertionError: Internal error: Error while applying rule 
AggregateProjectPullUpConstantsRule, args 
[rel#7:LogicalAggregate.NONE.[](input=rel#6:Subset#1.NONE.[],group={1, 
2},TMP_1=SUM($0)), 
rel#5:LogicalProject.NONE.[](input=rel#4:Subset#0.NONE.[],a=$0,four=4,b=$1)]
at org.apache.calcite.util.Util.newInternal(Util.java:792)
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:251)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:826)
at 
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:304)
at 
org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78)
at 
org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:51)
at 
org.apache.flink.api.scala.table.TableConversions.toDataSet(TableConversions.scala:35)
at 
org.apache.flink.api.scala.table.test.GroupedAggregationsITCase.testGroupedByExpression(GroupedAggregationsITCase.scala:133)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runners.Suite.runChild(Suite.java:127)
at org.junit.runners.Suite.runChild(Suite.java:26)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:78)
at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:212)
at 
com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: java.lang.AssertionError: Internal error: Error occurred while 
applying rule AggregateProjectPullUpConstantsRule
at org.apache.calcite.util.Util.newInternal(Util.java:792)
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:150)
at 
org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:213)
at 

[jira] [Created] (FLINK-3487) [SQL] FilterAggregateTransposeRule does not transform logical plan as desired.

2016-02-23 Thread Chengxiang Li (JIRA)
Chengxiang Li created FLINK-3487:


 Summary: [SQL] FilterAggregateTransposeRule does not transform 
logical plan as desired.
 Key: FLINK-3487
 URL: https://issues.apache.org/jira/browse/FLINK-3487
 Project: Flink
  Issue Type: Bug
Reporter: Chengxiang Li
Priority: Minor


{noformat}
val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
  .groupBy('a)
  .select('a, 'b.avg as 'value)
  .filter('a === 1)
{noformat}
For this query, filter is expected to be pushed down before the aggregate by 
FilterAggregateTransposeRule, currently the optimzied logical plan is not what 
we desired.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3486) [SQL] Use Project to rename all record fields would fail following Project.

2016-02-23 Thread Chengxiang Li (JIRA)
Chengxiang Li created FLINK-3486:


 Summary: [SQL] Use Project to rename all record fields would fail 
following Project.
 Key: FLINK-3486
 URL: https://issues.apache.org/jira/browse/FLINK-3486
 Project: Flink
  Issue Type: Bug
Reporter: Chengxiang Li


{noformat} val t = CollectionDataSets.get3TupleDataSet(env).toTable
  .select('_1 as 'a, '_2 as 'b, '_3 as 'c)
  .select('a, 'b)
{noformat}
would throw exception like:
{noformat}
java.lang.IllegalArgumentException: field [a] not found; input fields are: [_1, 
_2, _3]
at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:290)
at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:275)
at 
org.apache.flink.api.table.plan.RexNodeTranslator$.toRexNode(RexNodeTranslator.scala:80)
at org.apache.flink.api.table.Table$$anonfun$5.apply(table.scala:98)
at org.apache.flink.api.table.Table$$anonfun$5.apply(table.scala:98) 
{noformat}
new alias names are lost.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3476) Support hash-based partial aggregate

2016-02-23 Thread Chengxiang Li (JIRA)
Chengxiang Li created FLINK-3476:


 Summary: Support hash-based partial aggregate
 Key: FLINK-3476
 URL: https://issues.apache.org/jira/browse/FLINK-3476
 Project: Flink
  Issue Type: Sub-task
  Components: Table API
Reporter: Chengxiang Li


As described in the design doc, we should be able to enable hash-based partial 
aggregate after hash-based combiner( #1517) is supported.  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3473) Add partial aggregate support in Flink

2016-02-23 Thread Chengxiang Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chengxiang Li updated FLINK-3473:
-
Attachment: PartialAggregateinFlink_v1.pdf

> Add partial aggregate support in Flink
> --
>
> Key: FLINK-3473
> URL: https://issues.apache.org/jira/browse/FLINK-3473
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Reporter: Chengxiang Li
>Assignee: Chengxiang Li
> Attachments: PartialAggregateinFlink_v1.pdf
>
>
> For decomposable aggregate function, partial aggregate is more efficient as 
> it significantly reduce the network traffic during shuffle and parallelize 
> part of the aggregate calculation. This is an umbrella task for partial 
> aggregate.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3474) Partial aggregate interface design and sort-based implementation

2016-02-23 Thread Chengxiang Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chengxiang Li updated FLINK-3474:
-
Attachment: (was: PartialAggregateinFlink_v1.pdf)

> Partial aggregate interface design and sort-based implementation
> 
>
> Key: FLINK-3474
> URL: https://issues.apache.org/jira/browse/FLINK-3474
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Chengxiang Li
>Assignee: Chengxiang Li
>
> The scope of this sub task includes:
> # Partial aggregate interface.
> # Simple aggregate function implementation, such as SUM/AVG/COUNT/MIN/MAX.
> # DataSetAggregateRule which translate logical calcite aggregate node to 
> Flink user functions. As hash-based combiner is not available yet(see PR 
> #1517), we would use sort-based combine as default.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3474) Partial aggregate interface design and sort-based implementation

2016-02-23 Thread Chengxiang Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chengxiang Li updated FLINK-3474:
-
Attachment: PartialAggregateinFlink_v1.pdf

> Partial aggregate interface design and sort-based implementation
> 
>
> Key: FLINK-3474
> URL: https://issues.apache.org/jira/browse/FLINK-3474
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Chengxiang Li
>Assignee: Chengxiang Li
> Attachments: PartialAggregateinFlink_v1.pdf
>
>
> The scope of this sub task includes:
> # Partial aggregate interface.
> # Simple aggregate function implementation, such as SUM/AVG/COUNT/MIN/MAX.
> # DataSetAggregateRule which translate logical calcite aggregate node to 
> Flink user functions. As hash-based combiner is not available yet(see PR 
> #1517), we would use sort-based combine as default.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3475) DISTINCT aggregate function support

2016-02-23 Thread Chengxiang Li (JIRA)
Chengxiang Li created FLINK-3475:


 Summary: DISTINCT aggregate function support
 Key: FLINK-3475
 URL: https://issues.apache.org/jira/browse/FLINK-3475
 Project: Flink
  Issue Type: Sub-task
  Components: Table API
Reporter: Chengxiang Li


DISTINCT aggregate function may be able to reuse the aggregate function instead 
of separate implementation, and let Flink runtime take care of duplicate 
records.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3473) Add partial aggregate support in Flink

2016-02-22 Thread Chengxiang Li (JIRA)
Chengxiang Li created FLINK-3473:


 Summary: Add partial aggregate support in Flink
 Key: FLINK-3473
 URL: https://issues.apache.org/jira/browse/FLINK-3473
 Project: Flink
  Issue Type: Improvement
  Components: Table API
Reporter: Chengxiang Li
Assignee: Chengxiang Li


For decomposable aggregate function, partial aggregate is more efficient as it 
significantly reduce the network traffic during shuffle and parallelize part of 
the aggregate calculation. This is an umbrella task for partial aggregate.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-3281) IndexOutOfBoundsException when range-partitioning empty DataSet

2016-02-01 Thread Chengxiang Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chengxiang Li resolved FLINK-3281.
--
   Resolution: Fixed
Fix Version/s: 1.0.0

> IndexOutOfBoundsException when range-partitioning empty DataSet 
> 
>
> Key: FLINK-3281
> URL: https://issues.apache.org/jira/browse/FLINK-3281
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime, Local Runtime
>Reporter: Fridtjof Sander
>Assignee: Chengxiang Li
> Fix For: 1.0.0
>
>
> Code:
> {code}
> import org.apache.flink.api.scala._
> object RangePartitionOnEmptyDataSet {
> def main(args:Array[String]) = {
>   val env = ExecutionEnvironment.getExecutionEnvironment
>   env
> .fromCollection(Seq[Tuple1[String]]())
> .partitionByRange(0)
> .collect()
> }
> }
> {code}
> Output:
> {noformat}
> 01/24/2016 16:24:36   Job execution switched to status RUNNING.
> 01/24/2016 16:24:36   DataSource (at 
> RangePartitionOnEmptyDataSet$.main(RangePartitionOnEmptyDataSet.scala:9) 
> (org.apache.flink.api.java.io.CollectionInputFormat))(1/1) switched to 
> SCHEDULED 
> 01/24/2016 16:24:36   DataSource (at 
> RangePartitionOnEmptyDataSet$.main(RangePartitionOnEmptyDataSet.scala:9) 
> (org.apache.flink.api.java.io.CollectionInputFormat))(1/1) switched to 
> DEPLOYING 
> 01/24/2016 16:24:36   DataSource (at 
> RangePartitionOnEmptyDataSet$.main(RangePartitionOnEmptyDataSet.scala:9) 
> (org.apache.flink.api.java.io.CollectionInputFormat))(1/1) switched to 
> RUNNING 
> 01/24/2016 16:24:36   RangePartition: LocalSample(1/1) switched to SCHEDULED 
> 01/24/2016 16:24:36   RangePartition: LocalSample(1/1) switched to DEPLOYING 
> 01/24/2016 16:24:36   DataSource (at 
> RangePartitionOnEmptyDataSet$.main(RangePartitionOnEmptyDataSet.scala:9) 
> (org.apache.flink.api.java.io.CollectionInputFormat))(1/1) switched to 
> FINISHED 
> 01/24/2016 16:24:36   RangePartition: PreparePartition(1/1) switched to 
> SCHEDULED 
> 01/24/2016 16:24:36   RangePartition: PreparePartition(1/1) switched to 
> DEPLOYING 
> 01/24/2016 16:24:36   RangePartition: LocalSample(1/1) switched to RUNNING 
> 01/24/2016 16:24:36   RangePartition: PreparePartition(1/1) switched to 
> RUNNING 
> 01/24/2016 16:24:36   RangePartition: GlobalSample(1/1) switched to SCHEDULED 
> 01/24/2016 16:24:36   RangePartition: GlobalSample(1/1) switched to DEPLOYING 
> 01/24/2016 16:24:36   RangePartition: LocalSample(1/1) switched to FINISHED 
> 01/24/2016 16:24:36   RangePartition: GlobalSample(1/1) switched to RUNNING 
> 01/24/2016 16:24:36   RangePartition: Histogram(1/1) switched to SCHEDULED 
> 01/24/2016 16:24:36   RangePartition: Histogram(1/1) switched to DEPLOYING 
> 01/24/2016 16:24:36   RangePartition: GlobalSample(1/1) switched to FINISHED 
> 01/24/2016 16:24:36   RangePartition: Histogram(1/1) switched to RUNNING 
> 01/24/2016 16:24:37   RangePartition: Histogram(1/1) switched to FAILED 
> java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
>   at java.util.ArrayList.rangeCheck(ArrayList.java:653)
>   at java.util.ArrayList.get(ArrayList.java:429)
>   at 
> org.apache.flink.runtime.operators.udf.RangeBoundaryBuilder.mapPartition(RangeBoundaryBuilder.java:66)
>   at 
> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:98)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:561)
>   at java.lang.Thread.run(Thread.java:745)
> 01/24/2016 16:24:37   Job execution switched to status FAILING.
> java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
>   at java.util.ArrayList.rangeCheck(ArrayList.java:653)
>   at java.util.ArrayList.get(ArrayList.java:429)
>   at 
> org.apache.flink.runtime.operators.udf.RangeBoundaryBuilder.mapPartition(RangeBoundaryBuilder.java:66)
>   at 
> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:98)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:561)
>   at java.lang.Thread.run(Thread.java:745)
> 01/24/2016 16:24:37   RangePartition: PreparePartition(1/1) switched to 
> CANCELING 
> 01/24/2016 16:24:37   RangePartition: Partition(1/4) switched to CANCELED 
> 01/24/2016 16:24:37   RangePartition: Partition(2/4) switched to CANCELED 
> 01/24/2016 16:24:37   RangePartition: Partition(3/4) switched to CANCELED 
> 01/24/2016 16:24:37   RangePartition: Partition(4/4) switched to CANCELED 
> 01/24/2016 16:24:37   CHAIN 

[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-01-27 Thread Chengxiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15119050#comment-15119050
 ] 

Chengxiang Li commented on FLINK-3226:
--

[~fhueske] and [~twalthr] , this task should be blocked by me for a while, 
sorry about that. The task seems more complicated and has more work than i 
expected, it contains the physical plan genration part of Timo's prototype + 
the whole previous Table API translation part  actually, to move this forward 
faster, as Fabian suggested, i agree that we can split it into multi sub tasks, 
such as expression code generation, and even by the Calcite RelNodes, for 
example, we can work on RelNodes with no dependency in parallel, such as Sort, 
Join and Aggregate, etc. i would make this task very small, which only contains 
the Project translator without expression code generation and make it ready 
during this week.
Besides, while try to test my code, i found the {{PlannerImpl}} check its state 
during each step of query planning process, and there is no way to set it 
manually, for Table API, we actually skip several steps, such as parse, 
validate, it failed while we directly go to the optimize step due to the 
unmatched state, do you guys have any idea about this?

> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-3282) Add FlinkRelNode interface.

2016-01-27 Thread Chengxiang Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chengxiang Li resolved FLINK-3282.
--
Resolution: Fixed

> Add FlinkRelNode interface.
> ---
>
> Key: FLINK-3282
> URL: https://issues.apache.org/jira/browse/FLINK-3282
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Chengxiang Li
>Assignee: Chengxiang Li
>
> Add FlinkRelNode interface for physical plan translator and Flink program 
> generator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-3282) Add FlinkRelNode interface.

2016-01-27 Thread Chengxiang Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chengxiang Li closed FLINK-3282.


> Add FlinkRelNode interface.
> ---
>
> Key: FLINK-3282
> URL: https://issues.apache.org/jira/browse/FLINK-3282
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Chengxiang Li
>Assignee: Chengxiang Li
>
> Add FlinkRelNode interface for physical plan translator and Flink program 
> generator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-2871) Add OuterJoin strategy with HashTable on outer side

2016-01-26 Thread Chengxiang Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chengxiang Li resolved FLINK-2871.
--
   Resolution: Fixed
Fix Version/s: 1.0.0

> Add OuterJoin strategy with HashTable on outer side
> ---
>
> Key: FLINK-2871
> URL: https://issues.apache.org/jira/browse/FLINK-2871
> Project: Flink
>  Issue Type: New Feature
>  Components: Local Runtime, Optimizer
>Affects Versions: 0.10.0
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>Priority: Minor
> Fix For: 1.0.0
>
>
> Outer joins are currently supported with two local execution strategies:
> - sort-merge join
> - hash join where the hash table is built on the inner side. Hence, this 
> strategy is only supported for left and right outer joins.
> In order to support hash-tables on the outer side, we need a special hash 
> table implementation that gives access to all records which have not been 
> accessed during the probe phase.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3282) Add FlinkRelNode interface.

2016-01-25 Thread Chengxiang Li (JIRA)
Chengxiang Li created FLINK-3282:


 Summary: Add FlinkRelNode interface.
 Key: FLINK-3282
 URL: https://issues.apache.org/jira/browse/FLINK-3282
 Project: Flink
  Issue Type: New Feature
  Components: Table API
Reporter: Chengxiang Li
Assignee: Chengxiang Li


Add FlinkRelNode interface for physical plan translator and Flink program 
generator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-01-25 Thread Chengxiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15115034#comment-15115034
 ] 

Chengxiang Li commented on FLINK-3226:
--

Yes, that make senes, i have create a 
PR(https://github.com/apache/flink/pull/1544) for FlinkRelNode interface, 
[~twalthr] and [~fhueske], please let me know if you have any suggestion.

> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3281) IndexOutOfBoundsException when range-partitioning empty DataSet

2016-01-24 Thread Chengxiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15114659#comment-15114659
 ] 

Chengxiang Li commented on FLINK-3281:
--

[~fsander], thanks for finding this, i would work on it.

> IndexOutOfBoundsException when range-partitioning empty DataSet 
> 
>
> Key: FLINK-3281
> URL: https://issues.apache.org/jira/browse/FLINK-3281
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime, Local Runtime
>Reporter: Fridtjof Sander
>
> Code:
> {code}
> import org.apache.flink.api.scala._
> object RangePartitionOnEmptyDataSet {
> def main(args:Array[String]) = {
>   val env = ExecutionEnvironment.getExecutionEnvironment
>   env
> .fromCollection(Seq[Tuple1[String]]())
> .partitionByRange(0)
> .collect()
> }
> }
> {code}
> Output:
> {noformat}
> 01/24/2016 16:24:36   Job execution switched to status RUNNING.
> 01/24/2016 16:24:36   DataSource (at 
> RangePartitionOnEmptyDataSet$.main(RangePartitionOnEmptyDataSet.scala:9) 
> (org.apache.flink.api.java.io.CollectionInputFormat))(1/1) switched to 
> SCHEDULED 
> 01/24/2016 16:24:36   DataSource (at 
> RangePartitionOnEmptyDataSet$.main(RangePartitionOnEmptyDataSet.scala:9) 
> (org.apache.flink.api.java.io.CollectionInputFormat))(1/1) switched to 
> DEPLOYING 
> 01/24/2016 16:24:36   DataSource (at 
> RangePartitionOnEmptyDataSet$.main(RangePartitionOnEmptyDataSet.scala:9) 
> (org.apache.flink.api.java.io.CollectionInputFormat))(1/1) switched to 
> RUNNING 
> 01/24/2016 16:24:36   RangePartition: LocalSample(1/1) switched to SCHEDULED 
> 01/24/2016 16:24:36   RangePartition: LocalSample(1/1) switched to DEPLOYING 
> 01/24/2016 16:24:36   DataSource (at 
> RangePartitionOnEmptyDataSet$.main(RangePartitionOnEmptyDataSet.scala:9) 
> (org.apache.flink.api.java.io.CollectionInputFormat))(1/1) switched to 
> FINISHED 
> 01/24/2016 16:24:36   RangePartition: PreparePartition(1/1) switched to 
> SCHEDULED 
> 01/24/2016 16:24:36   RangePartition: PreparePartition(1/1) switched to 
> DEPLOYING 
> 01/24/2016 16:24:36   RangePartition: LocalSample(1/1) switched to RUNNING 
> 01/24/2016 16:24:36   RangePartition: PreparePartition(1/1) switched to 
> RUNNING 
> 01/24/2016 16:24:36   RangePartition: GlobalSample(1/1) switched to SCHEDULED 
> 01/24/2016 16:24:36   RangePartition: GlobalSample(1/1) switched to DEPLOYING 
> 01/24/2016 16:24:36   RangePartition: LocalSample(1/1) switched to FINISHED 
> 01/24/2016 16:24:36   RangePartition: GlobalSample(1/1) switched to RUNNING 
> 01/24/2016 16:24:36   RangePartition: Histogram(1/1) switched to SCHEDULED 
> 01/24/2016 16:24:36   RangePartition: Histogram(1/1) switched to DEPLOYING 
> 01/24/2016 16:24:36   RangePartition: GlobalSample(1/1) switched to FINISHED 
> 01/24/2016 16:24:36   RangePartition: Histogram(1/1) switched to RUNNING 
> 01/24/2016 16:24:37   RangePartition: Histogram(1/1) switched to FAILED 
> java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
>   at java.util.ArrayList.rangeCheck(ArrayList.java:653)
>   at java.util.ArrayList.get(ArrayList.java:429)
>   at 
> org.apache.flink.runtime.operators.udf.RangeBoundaryBuilder.mapPartition(RangeBoundaryBuilder.java:66)
>   at 
> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:98)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:561)
>   at java.lang.Thread.run(Thread.java:745)
> 01/24/2016 16:24:37   Job execution switched to status FAILING.
> java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
>   at java.util.ArrayList.rangeCheck(ArrayList.java:653)
>   at java.util.ArrayList.get(ArrayList.java:429)
>   at 
> org.apache.flink.runtime.operators.udf.RangeBoundaryBuilder.mapPartition(RangeBoundaryBuilder.java:66)
>   at 
> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:98)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:561)
>   at java.lang.Thread.run(Thread.java:745)
> 01/24/2016 16:24:37   RangePartition: PreparePartition(1/1) switched to 
> CANCELING 
> 01/24/2016 16:24:37   RangePartition: Partition(1/4) switched to CANCELED 
> 01/24/2016 16:24:37   RangePartition: Partition(2/4) switched to CANCELED 
> 01/24/2016 16:24:37   RangePartition: Partition(3/4) switched to CANCELED 
> 01/24/2016 16:24:37   RangePartition: Partition(4/4) switched to CANCELED 
> 01/24/2016 16:24:37   CHAIN Partition -> FlatMap 

[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-01-18 Thread Chengxiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15106292#comment-15106292
 ] 

Chengxiang Li commented on FLINK-3226:
--

I list the mapping relationship of Calcite RelNodes, Flink RelNodes and Flink 
DataSet Operators at 
[here|https://docs.google.com/document/d/12wf4EEmnwA-SMr-oEwZ0IUZqH1ofK1fUWZJfeOBrVpc/edit?usp=sharing],
 [~fhueske] and [~twalthr], please let me know if you have any suggestion. 

> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-01-13 Thread Chengxiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15095914#comment-15095914
 ] 

Chengxiang Li commented on FLINK-3226:
--

Cool, [~fhueske]. Before actual work, I would upload a document which contains 
the mapping of Calcite RelNodes, Flink RelNodes and Flink DataSet Operators, 
which should be helpful to coordinate with FLINK-3227. 

> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-01-12 Thread Chengxiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15095470#comment-15095470
 ] 

Chengxiang Li commented on FLINK-3226:
--

[~fhueske], i would like to contribute on this task.

> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2871) Add OuterJoin strategy with HashTable on outer side

2015-12-16 Thread Chengxiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15061322#comment-15061322
 ] 

Chengxiang Li commented on FLINK-2871:
--

I would like to contribute on this. There are 4 reserved bytes left in bucket 
header of {{MutableHashTable}}, as there are only 9 elements in each bucket, we 
could use 2 bytes to build a BitSet to mark whether elements in each bucket has 
been probed during probe phase, and return the elements which has not been 
probed at the end.

> Add OuterJoin strategy with HashTable on outer side
> ---
>
> Key: FLINK-2871
> URL: https://issues.apache.org/jira/browse/FLINK-2871
> Project: Flink
>  Issue Type: New Feature
>  Components: Local Runtime, Optimizer
>Affects Versions: 0.10.0
>Reporter: Fabian Hueske
>Priority: Minor
>
> Outer joins are currently supported with two local execution strategies:
> - sort-merge join
> - hash join where the hash table is built on the inner side. Hence, this 
> strategy is only supported for left and right outer joins.
> In order to support hash-tables on the outer side, we need a special hash 
> table implementation that gives access to all records which have not been 
> accessed during the probe phase.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-2871) Add OuterJoin strategy with HashTable on outer side

2015-12-16 Thread Chengxiang Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chengxiang Li reassigned FLINK-2871:


Assignee: Chengxiang Li

> Add OuterJoin strategy with HashTable on outer side
> ---
>
> Key: FLINK-2871
> URL: https://issues.apache.org/jira/browse/FLINK-2871
> Project: Flink
>  Issue Type: New Feature
>  Components: Local Runtime, Optimizer
>Affects Versions: 0.10.0
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>Priority: Minor
>
> Outer joins are currently supported with two local execution strategies:
> - sort-merge join
> - hash join where the hash table is built on the inner side. Hence, this 
> strategy is only supported for left and right outer joins.
> In order to support hash-tables on the outer side, we need a special hash 
> table implementation that gives access to all records which have not been 
> accessed during the probe phase.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3139) NULL values handling in Table API

2015-12-07 Thread Chengxiang Li (JIRA)
Chengxiang Li created FLINK-3139:


 Summary: NULL values handling in Table API
 Key: FLINK-3139
 URL: https://issues.apache.org/jira/browse/FLINK-3139
 Project: Flink
  Issue Type: Task
  Components: Table API
Reporter: Chengxiang Li


This is an umbrella task for NULL value handling in Table API. 
As the logical API for queries, Table API should support handling NULL values, 
NULL value is quite a common case during logical query, for example:
# Data source may miss column value in many cases("value missing", "value 
unknown", "value not applicable" .etc ).
# Some operators generate Null values on their own, like Outer 
Join/Cube/Rollup/Grouping Set, and so on. Null values handling in Table API is 
the prerequisite of these features.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3141) Design of NULL values handling in operation

2015-12-07 Thread Chengxiang Li (JIRA)
Chengxiang Li created FLINK-3141:


 Summary: Design of NULL values handling in operation
 Key: FLINK-3141
 URL: https://issues.apache.org/jira/browse/FLINK-3141
 Project: Flink
  Issue Type: Sub-task
Reporter: Chengxiang Li


We discuss and finalize how NULL value is handled in specified cases here. this 
is the first proposal:
# NULL compare. In ascending order, NULL is smaller than any other value, and 
NULL == NULL return false. 
# NULL exists in GroupBy Key, all NULL values are grouped as a single group.
# NULL exists in Aggregate columns, ignore NULL in aggregation function.
# NULL exists in both side Join key, refer to #i, NULL == NULL return false, no 
output for NULL Join key.
# NULL in Scalar expression, expression within NULL(eg. 1 + NULL) return NULL. 
# NULL in Boolean expression, add an extra result: UNKNOWN, more semantic for 
Boolean expression in reference #1.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3140) NULL value data layout in Row Serializer/Comparator

2015-12-07 Thread Chengxiang Li (JIRA)
Chengxiang Li created FLINK-3140:


 Summary: NULL value data layout in Row Serializer/Comparator
 Key: FLINK-3140
 URL: https://issues.apache.org/jira/browse/FLINK-3140
 Project: Flink
  Issue Type: Sub-task
  Components: Table API
Reporter: Chengxiang Li


To store/materialize NULL value in Row objects, we should need new Row 
Serializer/Comparator which is aware of NULL value fields.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-3087) Table API do not support multi count in aggregation.

2015-12-02 Thread Chengxiang Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chengxiang Li resolved FLINK-3087.
--
   Resolution: Fixed
Fix Version/s: 1.0.0

> Table API do not support multi count in aggregation.
> 
>
> Key: FLINK-3087
> URL: https://issues.apache.org/jira/browse/FLINK-3087
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
>Affects Versions: 0.10.0
>Reporter: Chengxiang Li
>Assignee: Chengxiang Li
> Fix For: 1.0.0
>
>
> Multi {{count}} in aggregation is not supported, for example:
> {code:java}
> table.select("a.count", "b.count")
> {code}
> It's valid in grammar, besides, {{a.count}} and {{b.count}} may have 
> different values actually if NULL value handling is enabled.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3098) Cast from Date to Long throw compile error.

2015-12-01 Thread Chengxiang Li (JIRA)
Chengxiang Li created FLINK-3098:


 Summary: Cast from Date to Long throw compile error.
 Key: FLINK-3098
 URL: https://issues.apache.org/jira/browse/FLINK-3098
 Project: Flink
  Issue Type: Bug
  Components: Table API
Affects Versions: 0.10.1
Reporter: Chengxiang Li
Assignee: Chengxiang Li


While cast Date to Long in Table API, the generated code throw comile error. 
Looks like:
{code}
Caused by: org.codehaus.commons.compiler.CompileException: Line 33, Column 23: 
Expression "result$5" is not an rvalue
at 
org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:10062)
at 
org.codehaus.janino.UnitCompiler.toRvalueOrCompileException(UnitCompiler.java:5960)
at 
org.codehaus.janino.UnitCompiler.compileContext2(UnitCompiler.java:3172)
at org.codehaus.janino.UnitCompiler.access$5400(UnitCompiler.java:182)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3087) Table API do not support multi count in aggregation.

2015-11-26 Thread Chengxiang Li (JIRA)
Chengxiang Li created FLINK-3087:


 Summary: Table API do not support multi count in aggregation.
 Key: FLINK-3087
 URL: https://issues.apache.org/jira/browse/FLINK-3087
 Project: Flink
  Issue Type: Bug
  Components: Table API
Affects Versions: 0.10.0
Reporter: Chengxiang Li
Assignee: Chengxiang Li


Multi {{count}} in aggregation is not supported, for example:
{code:java}
table.select("a.count", "b.count")
{code}
It's valid in grammar, besides, {{a.count}} and {{b.count}} may have different 
values actually if NULL value handling is enabled.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-2115) TableAPI throws ExpressionException for "Dangling GroupBy operation"

2015-11-18 Thread Chengxiang Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chengxiang Li reassigned FLINK-2115:


Assignee: Chengxiang Li

> TableAPI throws ExpressionException for "Dangling GroupBy operation"
> 
>
> Key: FLINK-2115
> URL: https://issues.apache.org/jira/browse/FLINK-2115
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> The following program below throws an ExpressionException due to a "Dangling 
> GroupBy operation".
> However, I think the program is semantically correct and should execute.
> {code}
> public static void main(String[] args) throws Exception {
>   ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>   DataSet data = env.fromElements(1,2,2,3,3,3,4,4,4,4);
>   DataSet> tuples = data
>   .map(new MapFunction>() {
> @Override
> public Tuple2 map(Integer i) throws Exception {
>   return new Tuple2(i, i*2);
> }
>   });
>   TableEnvironment tEnv = new TableEnvironment();
>   Table t = tEnv.toTable(tuples).as("i, i2")
>   .groupBy("i, i2").select("i, i2")
>   .groupBy("i").select("i, i.count as cnt");
>   tEnv.toSet(t, Row.class).print();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2980) Add CUBE/ROLLUP/GROUPING SETS operator in Table API.

2015-11-18 Thread Chengxiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15010576#comment-15010576
 ] 

Chengxiang Li commented on FLINK-2980:
--

This depends on the support of null value in Table API, which have been 
discussed before but not enabled yet. Will revisit this feature after null 
value support is enabled in Table API.

> Add CUBE/ROLLUP/GROUPING SETS operator in Table API.
> 
>
> Key: FLINK-2980
> URL: https://issues.apache.org/jira/browse/FLINK-2980
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Table API
>Reporter: Chengxiang Li
>Assignee: Chengxiang Li
> Attachments: Cube-Rollup-GroupSet design doc in Flink.pdf
>
>
> Computing aggregates over a cube/rollup/grouping sets of several dimensions 
> is a common operation in data warehousing. It would be nice to have them in 
> Table API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-2980) Add CUBE/ROLLUP/GROUPING SETS operator in Table API.

2015-11-11 Thread Chengxiang Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chengxiang Li reassigned FLINK-2980:


Assignee: Chengxiang Li

> Add CUBE/ROLLUP/GROUPING SETS operator in Table API.
> 
>
> Key: FLINK-2980
> URL: https://issues.apache.org/jira/browse/FLINK-2980
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Table API
>Reporter: Chengxiang Li
>Assignee: Chengxiang Li
> Attachments: Cube-Rollup-GroupSet design doc in Flink.pdf
>
>
> Computing aggregates over a cube/rollup/grouping sets of several dimensions 
> is a common operation in data warehousing. It would be nice to have them in 
> Table API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2998) Support range partition comparison for multi input nodes.

2015-11-10 Thread Chengxiang Li (JIRA)
Chengxiang Li created FLINK-2998:


 Summary: Support range partition comparison for multi input nodes.
 Key: FLINK-2998
 URL: https://issues.apache.org/jira/browse/FLINK-2998
 Project: Flink
  Issue Type: New Feature
  Components: Optimizer
Reporter: Chengxiang Li
Priority: Minor


The optimizer may have potential opportunity to optimize the DAG while it found 
two input range partition are equivalent, we does not support the comparison 
yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2997) Support range partition with user customized data distribution.

2015-11-10 Thread Chengxiang Li (JIRA)
Chengxiang Li created FLINK-2997:


 Summary: Support range partition with user customized data 
distribution.
 Key: FLINK-2997
 URL: https://issues.apache.org/jira/browse/FLINK-2997
 Project: Flink
  Issue Type: New Feature
Reporter: Chengxiang Li


This is a followup work of FLINK-7, sometime user have better knowledge of the 
source data, and they can build customized data distribution to do range 
partition more efficiently.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-2955) Add operations introduction in Table API page.

2015-11-05 Thread Chengxiang Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chengxiang Li reassigned FLINK-2955:


Assignee: Chengxiang Li

> Add operations introduction in Table API page.
> --
>
> Key: FLINK-2955
> URL: https://issues.apache.org/jira/browse/FLINK-2955
> Project: Flink
>  Issue Type: New Feature
>  Components: Documentation
>Reporter: Chengxiang Li
>Assignee: Chengxiang Li
>Priority: Minor
>
> On the Table API page, there is no formal introduction of current supported 
> operations, it should be nice to have it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2980) Add CUBE/ROLLUP/GROUPING SETS operator in Table API.

2015-11-05 Thread Chengxiang Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chengxiang Li updated FLINK-2980:
-
Attachment: Cube-Rollup-GroupSet design doc in Flink.pdf

> Add CUBE/ROLLUP/GROUPING SETS operator in Table API.
> 
>
> Key: FLINK-2980
> URL: https://issues.apache.org/jira/browse/FLINK-2980
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Table API
>Reporter: Chengxiang Li
> Attachments: Cube-Rollup-GroupSet design doc in Flink.pdf
>
>
> Computing aggregates over a cube/rollup/grouping sets of several dimensions 
> is a common operation in data warehousing. It would be nice to have them in 
> Table API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2980) Add CUBE/ROLLUP/GROUPING SETS operator in Table API.

2015-11-05 Thread Chengxiang Li (JIRA)
Chengxiang Li created FLINK-2980:


 Summary: Add CUBE/ROLLUP/GROUPING SETS operator in Table API.
 Key: FLINK-2980
 URL: https://issues.apache.org/jira/browse/FLINK-2980
 Project: Flink
  Issue Type: Bug
  Components: Documentation, Table API
Reporter: Chengxiang Li


Computing aggregates over a cube/rollup/grouping sets of several dimensions is 
a common operation in data warehousing. It would be nice to have them in Table 
API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2956) Migrate integration tests for Table API

2015-11-02 Thread Chengxiang Li (JIRA)
Chengxiang Li created FLINK-2956:


 Summary: Migrate integration tests for Table API
 Key: FLINK-2956
 URL: https://issues.apache.org/jira/browse/FLINK-2956
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Reporter: Chengxiang Li
Priority: Minor


Migrate integration tests of Table API from temp file to collect() as described 
in umbrella jira..



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2951) Add Union operator to Table API.

2015-11-01 Thread Chengxiang Li (JIRA)
Chengxiang Li created FLINK-2951:


 Summary: Add Union operator to Table API.
 Key: FLINK-2951
 URL: https://issues.apache.org/jira/browse/FLINK-2951
 Project: Flink
  Issue Type: New Feature
  Components: Documentation, Table API
Reporter: Chengxiang Li
Assignee: Chengxiang Li
Priority: Minor


Currently, union operation is supported by DataSet/DataStream API, not 
available in Table API. To union two tables, user has to transform the 
parameter input to DataSet/DataStream, for example:
{code:java}
val unionDs = left.union(right.toDataSet[Row])
{code}
It should be more API friendly to user if add union operation to Table API, 
like:
{code:java}
val unionDs = left.union(right)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2848) Refactor Flink benchmarks with JMH and move to flink-benchmark module

2015-10-09 Thread Chengxiang Li (JIRA)
Chengxiang Li created FLINK-2848:


 Summary: Refactor Flink benchmarks with JMH and move to 
flink-benchmark module
 Key: FLINK-2848
 URL: https://issues.apache.org/jira/browse/FLINK-2848
 Project: Flink
  Issue Type: Test
  Components: Tests
Reporter: Chengxiang Li
Assignee: Chengxiang Li
Priority: Minor


There are many flink internal micro benchmarks in different modules, which are 
coarse measured(by System.currentNanoTime()...), and with no warmup or multi 
iteration test. This is an umbrella JIRA to refactor these micro benchmarks and 
move to flink-benchmark module for central management.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-7) [GitHub] Enable Range Partitioner

2015-09-28 Thread Chengxiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14933125#comment-14933125
 ] 

Chengxiang Li commented on FLINK-7:
---

The sample operator has been enabled at 
[FLINK-1901|https://issues.apache.org/jira/browse/FLINK-1901], which means we 
can generate sample histogram automatically now. I think it's time to continue 
this work, i would like to contribute on this if no one is working on this.

> [GitHub] Enable Range Partitioner
> -
>
> Key: FLINK-7
> URL: https://issues.apache.org/jira/browse/FLINK-7
> Project: Flink
>  Issue Type: Sub-task
>Reporter: GitHub Import
>  Labels: github-import
> Fix For: pre-apache
>
>
> The range partitioner is currently disabled. We need to implement the 
> following aspects:
> 1) Distribution information, if available, must be propagated back together 
> with the ordering property.
> 2) A generic bucket lookup structure (currently specific to PactRecord).
> Tests to re-enable after fixing this issue:
>  - TeraSortITCase
>  - GlobalSortingITCase
>  - GlobalSortingMixedOrderITCase
>  Imported from GitHub 
> Url: https://github.com/stratosphere/stratosphere/issues/7
> Created by: [StephanEwen|https://github.com/StephanEwen]
> Labels: core, enhancement, optimizer, 
> Milestone: Release 0.4
> Assignee: [fhueske|https://github.com/fhueske]
> Created at: Fri Apr 26 13:48:24 CEST 2013
> State: open



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2745) Create a new module for all Flink micro benchmark.

2015-09-23 Thread Chengxiang Li (JIRA)
Chengxiang Li created FLINK-2745:


 Summary: Create a new module for all Flink micro benchmark.
 Key: FLINK-2745
 URL: https://issues.apache.org/jira/browse/FLINK-2745
 Project: Flink
  Issue Type: New Feature
  Components: Tests
Reporter: Chengxiang Li
Assignee: Chengxiang Li
Priority: Minor


Currently in Flink, there many micro benchmarks spread from different modules, 
these benchmarks measure on manual, triggered by Junit test, no warmup, no 
multi iteration. Move all benchmark to a single module and import 
[JMH|http://openjdk.java.net/projects/code-tools/jmh/] as the back benchmark 
framework may help Flink devlopers to build benchmark more accurate and easier.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2743) Add new RNG based on XORShift algorithm

2015-09-23 Thread Chengxiang Li (JIRA)
Chengxiang Li created FLINK-2743:


 Summary: Add new RNG based on XORShift algorithm
 Key: FLINK-2743
 URL: https://issues.apache.org/jira/browse/FLINK-2743
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Chengxiang Li
Assignee: Chengxiang Li
Priority: Minor


[XORShift algorithm|https://en.wikipedia.org/wiki/Xorshift] is an optimized 
algorithm for random number generator, implement a RNG based on it would help 
to improve the performance of operations where RNG is heavily used.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2754) FixedLengthRecordSorter can not write to output cross MemorySegments.

2015-09-23 Thread Chengxiang Li (JIRA)
Chengxiang Li created FLINK-2754:


 Summary: FixedLengthRecordSorter can not write to output cross 
MemorySegments.
 Key: FLINK-2754
 URL: https://issues.apache.org/jira/browse/FLINK-2754
 Project: Flink
  Issue Type: Bug
  Components: Distributed Runtime
Reporter: Chengxiang Li
Assignee: Chengxiang Li


FixedLengthRecordSorter can not write to output cross MemorySegments, it works 
well as it's only called to write a single record before. Should fix it and add 
more unit test.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2533) Gap based random sample optimization

2015-09-08 Thread Chengxiang Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chengxiang Li updated FLINK-2533:
-
Assignee: GaoLun

> Gap based random sample optimization
> 
>
> Key: FLINK-2533
> URL: https://issues.apache.org/jira/browse/FLINK-2533
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Chengxiang Li
>Assignee: GaoLun
>Priority: Minor
>
> For random sampler with fraction, like BernoulliSampler and PoissonSampler, 
> Gap based random sampler could exploit O(k) sample implementation instead of 
> previous O\(n\) sample implementation, it should perform better while sample 
> fraction is very small. [This 
> blog|http://erikerlandson.github.io/blog/2014/09/11/faster-random-samples-with-gap-sampling/]
>  describes more detail about gap based random sampler.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-2535) Fixed size sample algorithm optimization

2015-09-05 Thread Chengxiang Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chengxiang Li closed FLINK-2535.

Resolution: Won't Fix

> Fixed size sample algorithm optimization
> 
>
> Key: FLINK-2535
> URL: https://issues.apache.org/jira/browse/FLINK-2535
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Chengxiang Li
>Priority: Minor
> Attachments: sampling.png
>
>
> Fixed size sample algorithm is known to be less efficient than sample 
> algorithms with fraction, but sometime it's necessary. Some optimization 
> could significantly reduce the storage size and computation cost, such as the 
> algorithm described in [this 
> paper|http://machinelearning.wustl.edu/mlpapers/papers/icml2013_meng13a].



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2535) Fixed size sample algorithm optimization

2015-09-05 Thread Chengxiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14732155#comment-14732155
 ] 

Chengxiang Li commented on FLINK-2535:
--

Thanks for your research, [~gallenvara_bg], since there is no obvious 
performance improvement, i would just close this issue.

> Fixed size sample algorithm optimization
> 
>
> Key: FLINK-2535
> URL: https://issues.apache.org/jira/browse/FLINK-2535
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Chengxiang Li
>Priority: Minor
> Attachments: sampling.png
>
>
> Fixed size sample algorithm is known to be less efficient than sample 
> algorithms with fraction, but sometime it's necessary. Some optimization 
> could significantly reduce the storage size and computation cost, such as the 
> algorithm described in [this 
> paper|http://machinelearning.wustl.edu/mlpapers/papers/icml2013_meng13a].



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-2596) Failing Test: RandomSamplerTest

2015-08-30 Thread Chengxiang Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chengxiang Li reassigned FLINK-2596:


Assignee: Chengxiang Li

> Failing Test: RandomSamplerTest
> ---
>
> Key: FLINK-2596
> URL: https://issues.apache.org/jira/browse/FLINK-2596
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Reporter: Matthias J. Sax
>Assignee: Chengxiang Li
>Priority: Critical
>  Labels: test-stability
>
> {noformat}
> Tests run: 17, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 14.925 sec 
> <<< FAILURE! - in org.apache.flink.api.java.sampling.RandomSamplerTest
> testReservoirSamplerWithMultiSourcePartitions2(org.apache.flink.api.java.sampling.RandomSamplerTest)
>  Time elapsed: 0.444 sec <<< ERROR!
> java.lang.IllegalArgumentException: Comparison method violates its general 
> contract!
> at java.util.TimSort.mergeLo(TimSort.java:747)
> at java.util.TimSort.mergeAt(TimSort.java:483)
> at java.util.TimSort.mergeCollapse(TimSort.java:410)
> at java.util.TimSort.sort(TimSort.java:214)
> at java.util.TimSort.sort(TimSort.java:173)
> at java.util.Arrays.sort(Arrays.java:659)
> at java.util.Collections.sort(Collections.java:217)
> at 
> org.apache.flink.api.java.sampling.RandomSamplerTest.transferFromListToArrayWithOrder(RandomSamplerTest.java:375)
> at 
> org.apache.flink.api.java.sampling.RandomSamplerTest.getSampledOutput(RandomSamplerTest.java:367)
> at 
> org.apache.flink.api.java.sampling.RandomSamplerTest.verifyKSTest(RandomSamplerTest.java:338)
> at 
> org.apache.flink.api.java.sampling.RandomSamplerTest.verifyRandomSamplerWithSampleSize(RandomSamplerTest.java:330)
> at 
> org.apache.flink.api.java.sampling.RandomSamplerTest.verifyReservoirSamplerWithReplacement(RandomSamplerTest.java:290)
> at 
> org.apache.flink.api.java.sampling.RandomSamplerTest.testReservoirSamplerWithMultiSourcePartitions2(RandomSamplerTest.java:212)
> Results :
> Tests in error:
> RandomSamplerTest.testReservoirSamplerWithMultiSourcePartitions2:212->verifyReservoirSamplerWithReplacement:290->verifyRandomSamplerWithSampleSize:330->verifyKSTest:338->getSampledOutput:367->transferFromListToArrayWithOrder:375
>  ยป IllegalArgument
> {noformat}
> https://travis-ci.org/apache/flink/jobs/77750329



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2545) NegativeArraySizeException while creating hash table bloom filters

2015-08-27 Thread Chengxiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14716152#comment-14716152
 ] 

Chengxiang Li commented on FLINK-2545:
--

Hi, [~greghogan], do you mind to share how to reproduce this issue? The 
exception caused by a negative array index, which represents the count of 
bucket member. It should be very simple to add a check of the count value to 
fix this issue. But the count should never be negative according to the 
hashtable implementation, so if it's possible, i want to reproduce this issue 
to check if there is other hidden issues behind this.

 NegativeArraySizeException while creating hash table bloom filters
 --

 Key: FLINK-2545
 URL: https://issues.apache.org/jira/browse/FLINK-2545
 Project: Flink
  Issue Type: Bug
  Components: Distributed Runtime
Affects Versions: master
Reporter: Greg Hogan
Assignee: Chengxiang Li

 The following exception occurred a second time when I immediately re-ran my 
 application, though after recompiling and restarting Flink the subsequent 
 execution ran without error.
 java.lang.Exception: The data preparation for task '...' , caused an error: 
 null
   at 
 org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:465)
   at 
 org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
   at java.lang.Thread.run(Thread.java:745)
 Caused by: java.lang.NegativeArraySizeException
   at 
 org.apache.flink.runtime.operators.hash.MutableHashTable.buildBloomFilterForBucket(MutableHashTable.java:1160)
   at 
 org.apache.flink.runtime.operators.hash.MutableHashTable.buildBloomFilterForBucketsInPartition(MutableHashTable.java:1143)
   at 
 org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1117)
   at 
 org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:946)
   at 
 org.apache.flink.runtime.operators.hash.MutableHashTable.insertIntoTable(MutableHashTable.java:868)
   at 
 org.apache.flink.runtime.operators.hash.MutableHashTable.buildInitialTable(MutableHashTable.java:692)
   at 
 org.apache.flink.runtime.operators.hash.MutableHashTable.open(MutableHashTable.java:455)
   at 
 org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashMatchIterator.open(ReusingBuildSecondHashMatchIterator.java:93)
   at 
 org.apache.flink.runtime.operators.JoinDriver.prepare(JoinDriver.java:195)
   at 
 org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:459)
   ... 3 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-2545) NegativeArraySizeException while creating hash table bloom filters

2015-08-26 Thread Chengxiang Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chengxiang Li reassigned FLINK-2545:


Assignee: Chengxiang Li

 NegativeArraySizeException while creating hash table bloom filters
 --

 Key: FLINK-2545
 URL: https://issues.apache.org/jira/browse/FLINK-2545
 Project: Flink
  Issue Type: Bug
  Components: Distributed Runtime
Affects Versions: master
Reporter: Greg Hogan
Assignee: Chengxiang Li

 The following exception occurred a second time when I immediately re-ran my 
 application, though after recompiling and restarting Flink the subsequent 
 execution ran without error.
 java.lang.Exception: The data preparation for task '...' , caused an error: 
 null
   at 
 org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:465)
   at 
 org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
   at java.lang.Thread.run(Thread.java:745)
 Caused by: java.lang.NegativeArraySizeException
   at 
 org.apache.flink.runtime.operators.hash.MutableHashTable.buildBloomFilterForBucket(MutableHashTable.java:1160)
   at 
 org.apache.flink.runtime.operators.hash.MutableHashTable.buildBloomFilterForBucketsInPartition(MutableHashTable.java:1143)
   at 
 org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1117)
   at 
 org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:946)
   at 
 org.apache.flink.runtime.operators.hash.MutableHashTable.insertIntoTable(MutableHashTable.java:868)
   at 
 org.apache.flink.runtime.operators.hash.MutableHashTable.buildInitialTable(MutableHashTable.java:692)
   at 
 org.apache.flink.runtime.operators.hash.MutableHashTable.open(MutableHashTable.java:455)
   at 
 org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashMatchIterator.open(ReusingBuildSecondHashMatchIterator.java:93)
   at 
 org.apache.flink.runtime.operators.JoinDriver.prepare(JoinDriver.java:195)
   at 
 org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:459)
   ... 3 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-2564) Failing Test: RandomSamplerTest

2015-08-24 Thread Chengxiang Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2564?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chengxiang Li reassigned FLINK-2564:


Assignee: Chengxiang Li

 Failing Test: RandomSamplerTest
 ---

 Key: FLINK-2564
 URL: https://issues.apache.org/jira/browse/FLINK-2564
 Project: Flink
  Issue Type: Bug
Reporter: Matthias J. Sax
Assignee: Chengxiang Li
  Labels: test-stability

 {noformat}
 Tests run: 17, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 15.943 sec 
  FAILURE! - in org.apache.flink.api.java.sampling. 
 testPoissonSamplerFraction(org.apache.flink.api.java.sampling.RandomSamplerTest)
  Time elapsed: 0.017 sec  FAILURE!
 java.lang.AssertionError: expected fraction: 0.01, result fraction: 
 0.011300
 at org.junit.Assert.fail(Assert.java:88)
 at org.junit.Assert.assertTrue(Assert.java:41)
 at 
 org.apache.flink.api.java.sampling.RandomSamplerTest.verifySamplerFraction(RandomSamplerTest.java:249)
 at 
 org.apache.flink.api.java.sampling.RandomSamplerTest.testPoissonSamplerFraction(RandomSamplerTest.java:116)
 Results :
 Failed tests:
 Successfully installed excon-0.33.0
 RandomSamplerTest.testPoissonSamplerFraction:116-verifySamplerFraction:249 
 expected fraction: 0.01, result fraction: 0.011300
 {noformat}
 Full log: https://travis-ci.org/apache/flink/jobs/76720572



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2564) Failing Test: RandomSamplerTest

2015-08-24 Thread Chengxiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709020#comment-14709020
 ] 

Chengxiang Li commented on FLINK-2564:
--

As discussed in 
[here|https://github.com/apache/flink/pull/949#issuecomment-133990256], it's a 
balance between accurate verification and failure-positive result. As this 
failure and KS test failure happens more often than expected, we should expand 
the verification boundary which would reduce the fail-positive case to an 
acceptable level.

 Failing Test: RandomSamplerTest
 ---

 Key: FLINK-2564
 URL: https://issues.apache.org/jira/browse/FLINK-2564
 Project: Flink
  Issue Type: Bug
Reporter: Matthias J. Sax
Assignee: Chengxiang Li
  Labels: test-stability

 {noformat}
 Tests run: 17, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 15.943 sec 
  FAILURE! - in org.apache.flink.api.java.sampling. 
 testPoissonSamplerFraction(org.apache.flink.api.java.sampling.RandomSamplerTest)
  Time elapsed: 0.017 sec  FAILURE!
 java.lang.AssertionError: expected fraction: 0.01, result fraction: 
 0.011300
 at org.junit.Assert.fail(Assert.java:88)
 at org.junit.Assert.assertTrue(Assert.java:41)
 at 
 org.apache.flink.api.java.sampling.RandomSamplerTest.verifySamplerFraction(RandomSamplerTest.java:249)
 at 
 org.apache.flink.api.java.sampling.RandomSamplerTest.testPoissonSamplerFraction(RandomSamplerTest.java:116)
 Results :
 Failed tests:
 Successfully installed excon-0.33.0
 RandomSamplerTest.testPoissonSamplerFraction:116-verifySamplerFraction:249 
 expected fraction: 0.01, result fraction: 0.011300
 {noformat}
 Full log: https://travis-ci.org/apache/flink/jobs/76720572



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2549) Add topK operator for DataSet

2015-08-19 Thread Chengxiang Li (JIRA)
Chengxiang Li created FLINK-2549:


 Summary: Add topK operator for DataSet
 Key: FLINK-2549
 URL: https://issues.apache.org/jira/browse/FLINK-2549
 Project: Flink
  Issue Type: New Feature
  Components: Core, Java API, Scala API
Reporter: Chengxiang Li
Assignee: Chengxiang Li
Priority: Minor


topK is a common operation for user, it would be great to have it in Flink. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2549) Add topK operator for DataSet

2015-08-19 Thread Chengxiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14704244#comment-14704244
 ] 

Chengxiang Li commented on FLINK-2549:
--

The basic idea of implementation is as following:
# In map stage, sort and pick top K elements in each partition.
# A single reduce task handle all map output, sort and pick top K elements as 
the final result.

To fully manage the memory used for this operator, we may need a customized 
PriorityQueue which is built upon MemoryManager of Flink to sort unpredictable 
size elements with fixed size memory, as discussed at 
[here|https://github.com/apache/flink/pull/949#issuecomment-132692640].

 Add topK operator for DataSet
 -

 Key: FLINK-2549
 URL: https://issues.apache.org/jira/browse/FLINK-2549
 Project: Flink
  Issue Type: New Feature
  Components: Core, Java API, Scala API
Reporter: Chengxiang Li
Assignee: Chengxiang Li
Priority: Minor

 topK is a common operation for user, it would be great to have it in Flink. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2533) Gap based random sample optimization

2015-08-16 Thread Chengxiang Li (JIRA)
Chengxiang Li created FLINK-2533:


 Summary: Gap based random sample optimization
 Key: FLINK-2533
 URL: https://issues.apache.org/jira/browse/FLINK-2533
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Chengxiang Li
Priority: Minor


For random sampler with fraction, like BernoulliSampler and PoissonSampler, Gap 
based random sampler could exploit O(k) sample implementation instead of 
previous O\(n\) sample implementation, it should perform better while sample 
fraction is very small. [This 
blog|http://erikerlandson.github.io/blog/2014/09/11/faster-random-samples-with-gap-sampling/]
 describes more detail about gap based random sampler.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2535) Fixed size sample algorithm optimization

2015-08-16 Thread Chengxiang Li (JIRA)
Chengxiang Li created FLINK-2535:


 Summary: Fixed size sample algorithm optimization
 Key: FLINK-2535
 URL: https://issues.apache.org/jira/browse/FLINK-2535
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Chengxiang Li
Priority: Minor


Fixed size sample algorithm is known to be less efficient than sample 
algorithms with fraction, but sometime it's necessary. Some optimization could 
significantly reduce the storage size and computation cost, such as the 
algorithm described in [this 
paper|http://machinelearning.wustl.edu/mlpapers/papers/icml2013_meng13a].



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-23 Thread Chengxiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14638272#comment-14638272
 ] 

Chengxiang Li commented on FLINK-1901:
--

Thanks,[~till.rohrmann], i got it now. This more like a iteration optimization 
issue to me, it assumes that the output of static code path would always be the 
same, so it cached the output for potential performance improvement, but this 
assumption is not always true, for example, static code path with random 
sampling operator, data source read from HBase, and so on. I think we could 
open a separate JIRA to address it in a uniform way instead of taking random 
sampling as a special case in this JIRA.

 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2396) Review the datasets of dynamic path and static path in iteration.

2015-07-23 Thread Chengxiang Li (JIRA)
Chengxiang Li created FLINK-2396:


 Summary: Review the datasets of dynamic path and static path in 
iteration.
 Key: FLINK-2396
 URL: https://issues.apache.org/jira/browse/FLINK-2396
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Chengxiang Li
Priority: Minor


Currently Flink would cached dataset in static path as it assumes that dataset 
stay the same during the iteration, but this assumption does not always be 
true. Take sampling for example, the iteration data set is something like the 
weight vector of model and there is another training dataset from which to take 
a small sample to update the weight vector in each iteration (e.g. Stochastic 
Gradient Descent), we expect sampled dataset is different in each iteration, 
but Flink would cache the sampled dataset as it in static path. 
We should review how Flink identify dynamic path and static path, and support 
add sampled dataset in above example to dynamic path.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-23 Thread Chengxiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14638617#comment-14638617
 ] 

Chengxiang Li commented on FLINK-1901:
--

Thanks for the analysis, [~trohrm...@apache.org], I've created FLINK-2396 as a 
followup work to support sample and other similar datasets in iteration.

 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-21 Thread Chengxiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14636124#comment-14636124
 ] 

Chengxiang Li commented on FLINK-1901:
--

every point is sampled with probability 1/N is one of the sampling 
case(sampling with fraction, without replacement), there are 3 others kind of 
sampling case which is normally used as well, like sampling with fraction, 
with replacement, sampling with fixed size, without replacement and 
sampling with fixed size, with replacement. We should support all of them 
while expose a sampling operator to user. 

 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-1901) Create sample operator for Dataset

2015-07-21 Thread Chengxiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634622#comment-14634622
 ] 

Chengxiang Li edited comment on FLINK-1901 at 7/22/15 2:05 AM:
---

To randomly choose a sample from a DataSet S, basically, there exists two kinds 
of sample requirement: sampling with fraction(such as randomly choose 5% 
percent items in S) and sampling with fixed size(such as randomly choose 100 
items from S). Besides, we do not know the size of S, unless we take extra 
cost to computer it through DataSet::count().
# Sampling with fraction
#* With replacement: the expected sample size follow [Poisson 
Distribution|https://en.wikipedia.org/wiki/Poisson_distribution] in this case, 
so Poisson Sampling can be used to choose the sample items.
#* Without replacement: during sampling, we can take the sample of each item in 
iterator as a [Bernoulli Trial|https://en.wikipedia.org/wiki/Bernoulli_trial].
# Sampling with fixed size
#* Use DataSet::count() to get the dataset size, with the fixed size, we can 
turn this into sampling with factor.
#* [Reservoir Sampling|https://en.wikipedia.org/wiki/Reservoir_sampling] is 
another commonly used algorithm to randomly choose a sample of k items from a 
list S containing n items, where n is either a very large or unknown number, 
and there are different reservoir sampling algorithms that support reservoir 
support both sampling with replacement and sampling without replacement.



was (Author: chengxiang li):
To randomly choose a sample from a DataSet S, basically, there exists two kinds 
of sample requirement: sampling with factor(such as randomly choose 5% percent 
items in S) and sampling with fixed size(such as randomly choose 100 items 
from S). Besides, we do not know the size of S, unless we take extra cost to 
computer it through DataSet::count().
# Sampling with factor
#* With replacement: the expected sample size follow [Poisson 
Distribution|https://en.wikipedia.org/wiki/Poisson_distribution] in this case, 
so Poisson Sampling can be used to choose the sample items.
#* Without replacement: during sampling, we can take the sample of each item in 
iterator as a [Bernoulli Trial|https://en.wikipedia.org/wiki/Bernoulli_trial].
# Sampling with fixed size
#* Use DataSet::count() to get the dataset size, with the fixed size, we can 
turn this into sampling with factor.
#* [Reservoir Sampling|https://en.wikipedia.org/wiki/Reservoir_sampling] is 
another commonly used algorithm to randomly choose a sample of k items from a 
list S containing n items, where n is either a very large or unknown number, 
and there are different reservoir sampling algorithms that support reservoir 
support both sampling with replacement and sampling without replacement.


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-21 Thread Chengxiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14636332#comment-14636332
 ] 

Chengxiang Li commented on FLINK-1901:
--

I write a simple example of sampling operator at 
[here|https://github.com/ChengXiangLi/flink/blob/FLINK-1901/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/TestSample.java],
 it works just as expected inside or outside of iteration(for example, 1000 
items, sample fraction 0.5, after 3 iterations, output contains around 125 
items), [~trohrm...@apache.org], i'm not sure whether i understand you 
correctly about sampling inside iteration, it looks same to me if it's the case 
in the example. 

 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-21 Thread Chengxiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634622#comment-14634622
 ] 

Chengxiang Li commented on FLINK-1901:
--

To randomly choose a sample from a DataSet S, basically, there exists two kinds 
of sample requirement: sampling with factor(such as randomly choose 5% percent 
items in S) and sampling with fixed size(such as randomly choose 100 items 
from S). Besides, we do not know the size of S, unless we take extra cost to 
computer it through DataSet::count().
# Sampling with factor
#* With replacement: the expected sample size follow [Poisson 
Distribution|https://en.wikipedia.org/wiki/Poisson_distribution] in this case, 
so Poisson Sampling can be used to choose the sample items.
#* Without replacement: during sampling, we can take the sample of each item in 
iterator as a [Bernoulli Trial|https://en.wikipedia.org/wiki/Bernoulli_trial].
# Sampling with fixed size
#* Use DataSet::count() to get the dataset size, with the fixed size, we can 
turn this into sampling with factor.
#* [Reservoir Sampling|https://en.wikipedia.org/wiki/Reservoir_sampling] is 
another commonly used algorithm to randomly choose a sample of k items from a 
list S containing n items, where n is either a very large or unknown number, 
and there are different reservoir sampling algorithms that support reservoir 
support both sampling with replacement and sampling without replacement.


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-21 Thread Chengxiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634822#comment-14634822
 ] 

Chengxiang Li commented on FLINK-1901:
--

Hi, [~sachingoel0101], I didn't find any related class about sampling while 
search the project with the keyword, is the PR you mentioned ongoing now? 
Besides ML algorithms, there should be other use case depends on sampling 
operation, such as range partition, and i believe sample operation itself is a 
common operation which may be used directly by user.

 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-21 Thread Chengxiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634628#comment-14634628
 ] 

Chengxiang Li commented on FLINK-1901:
--

Hi, [~tvas], i would like to contribute on this issue if there is no others 
working on it now.

 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2241) Use BloomFilter to minmize build side records which spilled to disk in Hybrid-Hash-Join

2015-06-18 Thread Chengxiang Li (JIRA)
Chengxiang Li created FLINK-2241:


 Summary: Use BloomFilter to minmize build side records which 
spilled to disk in Hybrid-Hash-Join
 Key: FLINK-2241
 URL: https://issues.apache.org/jira/browse/FLINK-2241
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Chengxiang Li
Priority: Minor


In Hybrid-Hash-Join, while small table does not fit into memory, part of the 
small table data would be spilled to disk, and the counterpart partition of big 
table data would be spilled to disk in probe phase as well. If we build a 
BloomFilter while spill small table to disk during build phase, and use it to 
filter the big table records which tend to be spilled to disk, this may greatly 
 reduce the spilled big table file size, and saved the disk IO cost for writing 
and further reading.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2240) Use BloomFilter to minimize probe side records which are spilled to disk in Hybrid-Hash-Join

2015-06-18 Thread Chengxiang Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chengxiang Li updated FLINK-2240:
-
Summary: Use BloomFilter to minimize probe side records which are spilled 
to disk in Hybrid-Hash-Join  (was: Use BloomFilter to minmize build side 
records which spilled to disk in Hybrid-Hash-Join)

 Use BloomFilter to minimize probe side records which are spilled to disk in 
 Hybrid-Hash-Join
 

 Key: FLINK-2240
 URL: https://issues.apache.org/jira/browse/FLINK-2240
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Chengxiang Li
Priority: Minor

 In Hybrid-Hash-Join, while small table does not fit into memory, part of the 
 small table data would be spilled to disk, and the counterpart partition of 
 big table data would be spilled to disk in probe phase as well. If we build a 
 BloomFilter while spill small table to disk during build phase, and use it to 
 filter the big table records which tend to be spilled to disk, this may 
 greatly  reduce the spilled big table file size, and saved the disk IO cost 
 for writing and further reading.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2241) Use BloomFilter to minmize probe side records which are spilled to disk in Hybrid-Hash-Join

2015-06-18 Thread Chengxiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14591552#comment-14591552
 ] 

Chengxiang Li commented on FLINK-2241:
--

Oh, thanks, [~till.rohrmann], i should double clicked my mouse.

 Use BloomFilter to minmize probe side records which are spilled to disk in 
 Hybrid-Hash-Join
 ---

 Key: FLINK-2241
 URL: https://issues.apache.org/jira/browse/FLINK-2241
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Chengxiang Li
Priority: Minor

 In Hybrid-Hash-Join, while small table does not fit into memory, part of the 
 small table data would be spilled to disk, and the counterpart partition of 
 big table data would be spilled to disk in probe phase as well. If we build a 
 BloomFilter while spill small table to disk during build phase, and use it to 
 filter the big table records which tend to be spilled to disk, this may 
 greatly  reduce the spilled big table file size, and saved the disk IO cost 
 for writing and further reading.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2240) Use BloomFilter to minmize build side records which spilled to disk in Hybrid-Hash-Join

2015-06-18 Thread Chengxiang Li (JIRA)
Chengxiang Li created FLINK-2240:


 Summary: Use BloomFilter to minmize build side records which 
spilled to disk in Hybrid-Hash-Join
 Key: FLINK-2240
 URL: https://issues.apache.org/jira/browse/FLINK-2240
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Chengxiang Li
Priority: Minor


In Hybrid-Hash-Join, while small table does not fit into memory, part of the 
small table data would be spilled to disk, and the counterpart partition of big 
table data would be spilled to disk in probe phase as well. If we build a 
BloomFilter while spill small table to disk during build phase, and use it to 
filter the big table records which tend to be spilled to disk, this may greatly 
 reduce the spilled big table file size, and saved the disk IO cost for writing 
and further reading.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2241) Use BloomFilter to minmize probe side records which are spilled to disk in Hybrid-Hash-Join

2015-06-18 Thread Chengxiang Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chengxiang Li updated FLINK-2241:
-
Summary: Use BloomFilter to minmize probe side records which are spilled to 
disk in Hybrid-Hash-Join  (was: Use BloomFilter to minmize build side records 
which spilled to disk in Hybrid-Hash-Join)

 Use BloomFilter to minmize probe side records which are spilled to disk in 
 Hybrid-Hash-Join
 ---

 Key: FLINK-2241
 URL: https://issues.apache.org/jira/browse/FLINK-2241
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Chengxiang Li
Priority: Minor

 In Hybrid-Hash-Join, while small table does not fit into memory, part of the 
 small table data would be spilled to disk, and the counterpart partition of 
 big table data would be spilled to disk in probe phase as well. If we build a 
 BloomFilter while spill small table to disk during build phase, and use it to 
 filter the big table records which tend to be spilled to disk, this may 
 greatly  reduce the spilled big table file size, and saved the disk IO cost 
 for writing and further reading.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)