[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

2016-04-20 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1838


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

2016-04-20 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1838#issuecomment-212608296
  
Merging this PR


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

2016-04-20 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1838#issuecomment-212413005
  
Thanks for the fast update @gallenvara! 
PR looks good and can be merged if tests pass.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

2016-04-20 Thread gallenvara
Github user gallenvara commented on the pull request:

https://github.com/apache/flink/pull/1838#issuecomment-212409952
  
@fhueske Thanks a lot for the explanation and the relevant codes have been 
modified.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

2016-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r60395945
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
 ---
@@ -301,6 +309,51 @@ public void 
testCoGroupWithMultipleKeyFieldsWithFieldSelector() throws Exception
}
 
@Test
+   public void testCoGroupWithMultipleKeyFieldsWithFieldSelector2() throws 
Exception {
+   /*
+* UDF Join on tuples with multiple key field positions and 
same customized distribution
+*/
+
+   final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+   DataSet> ds1 = 
CollectionDataSets.get5TupleDataSet(env)
+   .map(new MapFunction, Tuple5>() {
+   @Override
+   public Tuple5 map(Tuple5 value) throws 
Exception {
+   return new Tuple5<>(value.f0, 
value.f1, value.f2, value.f3, value.f4.intValue());
+   }
+   });
+
+   DataSet> ds2 = 
CollectionDataSets.get3TupleDataSet(env)
+   .map(new MapFunction, Tuple3>() {
+   @Override
+   public Tuple3 
map(Tuple3 value) throws Exception {
+   return new Tuple3<>(value.f0, 
value.f1.intValue(), value.f2);
+   }
+   });
+   
+   env.setParallelism(4);
+   TestDistribution testDis = new TestDistribution();
+   DataSet> coGrouped =
+   DataSetUtils.partitionByRange(ds1, testDis, 0, 
4)
+   .coGroup(DataSetUtils.partitionByRange(ds2, 
testDis, 0, 1))
+   .where(0, 4)
+   .equalTo(0, 1)
+   .with(new Tuple5Tuple3CoGroup2());
--- End diff --

Please use the same `CoGroupFunction` as in the original test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

2016-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r60395706
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
 ---
@@ -797,4 +873,46 @@ public void coGroup(Iterable first, 
Iterable

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

2016-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r60395647
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
 ---
@@ -797,4 +873,46 @@ public void coGroup(Iterable first, 
Iterable

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

2016-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r60395484
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
 ---
@@ -301,6 +309,51 @@ public void 
testCoGroupWithMultipleKeyFieldsWithFieldSelector() throws Exception
}
 
@Test
+   public void testCoGroupWithMultipleKeyFieldsWithFieldSelector2() throws 
Exception {
+   /*
+* UDF Join on tuples with multiple key field positions and 
same customized distribution
+*/
+
+   final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+   DataSet> ds1 = 
CollectionDataSets.get5TupleDataSet(env)
+   .map(new MapFunction, Tuple5>() {
--- End diff --

Can you keep the original data type, i.e., not convert the last field to 
Integer? 
1) It will check if the range partitioning handles different types.
2) it will be more concise (no map functions + no additional 
CoGroupFunction).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

2016-04-20 Thread gallenvara
Github user gallenvara commented on the pull request:

https://github.com/apache/flink/pull/1838#issuecomment-212300584
  
@fhueske PR updated.
I am a little confused when i wrote the tests. The original dataset handled 
by a `map` operator to ensure that the type of partition key is same with the 
boundary in the supplied distribution. In the `DataDistribution` interface, the 
type of `getBucketBoundary` method returned is `Object[]`. My doubt is whether 
this can be changed to type of `Tuple`. I mean that when range partition by one 
field, it return `Tuple1` and two fields return `Tuple2`. Also in the 
`OutputEmmiter`, change the type of keys from `Object[]` to `Tuple` and 
comparing the key with boundary using `Tuple` comparator. If this is possible, 
the boundaries in the distribution for rangePartition test will be:
`Tuple2[] boundaries = new Tuple2[]{
new Tuple2(1, 1L),
new Tuple2(3, 2L),

}`
This can make the test more succinct and direct.
Another confusing is that why partitionByHash and partitionByRange do not 
support some KeySelectors returned Tuple type such as:
```
public static class KeySelector3 implements 
KeySelector, Tuple2> {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 getKey(Tuple3 
in) {
return new Tuple2<>(in.f0,in.f1);
}
}
```
and can not run the following codes:
```
DataSet> dataSet = ...;
dataSet.partitionByRange(new KeySelector3());
```
Can you explain it to me?Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

2016-04-19 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1838#issuecomment-211923989
  
Hi @gallenvara, thanks for the update! I tried it locally and it worked as 
expected. 

I would like two more test methods though, to ensure that the thing is 
working end-to-end.
Could you add one test method to `JoinITCase` which basically extends 
`testeUDFJoinOnTuplesWithMultipleKeyFieldPositions()` and uses range 
partitioning. For that you should provide a DataDistribution and set the 
parallelism to 4 on the environment. 
Please to the same with 
`CoGroupITCase.testCoGroupWithMultipleKeyFieldsWithFieldSelector()`

After that we can merge the PR. Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

2016-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r60229754
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
 ---
@@ -84,8 +84,8 @@ public PartitionOperator(DataSet input, Keys pKeys, 
Partitioner customP
Preconditions.checkArgument(distribution == null || pMethod == 
PartitionMethod.RANGE, "Customized data distribution is only neccessary for 
range partition.");

if (distribution != null) {
-   
Preconditions.checkArgument(distribution.getNumberOfFields() == 
pKeys.getNumberOfKeyFields(), "The number of key fields in the distribution and 
range partitioner should be the same.");
-   
Preconditions.checkArgument(Arrays.equals(distribution.getKeyTypes(), 
pKeys.getKeyFieldTypes()), "The types of key from the distribution and range 
partitioner are not equal.");
+   
Preconditions.checkArgument(pKeys.getNumberOfKeyFields() <= 
distribution.getNumberOfFields(), "The number of key fields in range 
partitioner should be less than the number in the distribution.");
+   
Preconditions.checkArgument(Arrays.equals(pKeys.getKeyFieldTypes(), 
Arrays.copyOfRange(distribution.getKeyTypes(), 0, 
pKeys.getNumberOfKeyFields())), "The type array of the partition key should be 
prefix of the type array of the distribution.");
--- End diff --

Oh, maybe I should improve my English skills. The message should read like: 
`"The types of the flat key fields must be equal to the types of the fields of 
the distribution."`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

2016-04-14 Thread gallenvara
Github user gallenvara commented on the pull request:

https://github.com/apache/flink/pull/1838#issuecomment-210255818
  
@fhueske PR updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

2016-04-14 Thread gallenvara
Github user gallenvara commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r59819652
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
 ---
@@ -84,8 +84,8 @@ public PartitionOperator(DataSet input, Keys pKeys, 
Partitioner customP
Preconditions.checkArgument(distribution == null || pMethod == 
PartitionMethod.RANGE, "Customized data distribution is only neccessary for 
range partition.");

if (distribution != null) {
-   
Preconditions.checkArgument(distribution.getNumberOfFields() == 
pKeys.getNumberOfKeyFields(), "The number of key fields in the distribution and 
range partitioner should be the same.");
-   
Preconditions.checkArgument(Arrays.equals(distribution.getKeyTypes(), 
pKeys.getKeyFieldTypes()), "The types of key from the distribution and range 
partitioner are not equal.");
+   
Preconditions.checkArgument(pKeys.getNumberOfKeyFields() <= 
distribution.getNumberOfFields(), "The number of key fields in range 
partitioner should be less than the number in the distribution.");
+   
Preconditions.checkArgument(Arrays.equals(pKeys.getKeyFieldTypes(), 
Arrays.copyOfRange(distribution.getKeyTypes(), 0, 
pKeys.getNumberOfKeyFields())), "The type array of the partition key should be 
prefix of the type array of the distribution.");
--- End diff --

:) i will improve my poor English skill.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

2016-04-14 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1838#issuecomment-210049036
  
Thanks for the update @gallenvara. Aside from a few comments the PR looks 
good. No worries about the `GlobalProperties`. The internals of the optimizer 
are not straightforward ;-). 
Thanks for adding the tests for the distribution / key length!



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

2016-04-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r59752679
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
 ---
@@ -84,8 +84,8 @@ public PartitionOperator(DataSet input, Keys pKeys, 
Partitioner customP
Preconditions.checkArgument(distribution == null || pMethod == 
PartitionMethod.RANGE, "Customized data distribution is only neccessary for 
range partition.");

if (distribution != null) {
-   
Preconditions.checkArgument(distribution.getNumberOfFields() == 
pKeys.getNumberOfKeyFields(), "The number of key fields in the distribution and 
range partitioner should be the same.");
-   
Preconditions.checkArgument(Arrays.equals(distribution.getKeyTypes(), 
pKeys.getKeyFieldTypes()), "The types of key from the distribution and range 
partitioner are not equal.");
+   
Preconditions.checkArgument(pKeys.getNumberOfKeyFields() <= 
distribution.getNumberOfFields(), "The number of key fields in range 
partitioner should be less than the number in the distribution.");
+   
Preconditions.checkArgument(Arrays.equals(pKeys.getKeyFieldTypes(), 
Arrays.copyOfRange(distribution.getKeyTypes(), 0, 
pKeys.getNumberOfKeyFields())), "The type array of the partition key should be 
prefix of the type array of the distribution.");
--- End diff --

Please split this line and update the message to "The types of the flat key 
fields must be a equal of the types of the fields of the distribution."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

2016-04-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r59752212
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
 ---
@@ -84,8 +84,8 @@ public PartitionOperator(DataSet input, Keys pKeys, 
Partitioner customP
Preconditions.checkArgument(distribution == null || pMethod == 
PartitionMethod.RANGE, "Customized data distribution is only neccessary for 
range partition.");

if (distribution != null) {
-   
Preconditions.checkArgument(distribution.getNumberOfFields() == 
pKeys.getNumberOfKeyFields(), "The number of key fields in the distribution and 
range partitioner should be the same.");
-   
Preconditions.checkArgument(Arrays.equals(distribution.getKeyTypes(), 
pKeys.getKeyFieldTypes()), "The types of key from the distribution and range 
partitioner are not equal.");
+   
Preconditions.checkArgument(pKeys.getNumberOfKeyFields() <= 
distribution.getNumberOfFields(), "The number of key fields in range 
partitioner should be less than the number in the distribution.");
--- End diff --

Update the message to "The distribution must provide at least as many 
fields as flat key fields are specified."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

2016-04-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r59751421
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
 ---
@@ -175,6 +175,77 @@ else if (pIdx > 0 && pIdx < dist.getParallelism() - 1) 
{
env.execute();
}
 
+   @Test
+   public void testPartitionKeyLessDistribution() throws Exception{
+   /*
+* Test the number of keys less than the number of distribution 
fields
+*/
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+
+   DataSet> input1 = 
CollectionDataSets.get3TupleDataSet(env);
+   final TestDataDist2 dist = new TestDataDist2();
+
+   env.setParallelism(dist.getParallelism());
+
+   DataSet result = DataSetUtils
+   .partitionByRange(input1, dist, 0)
+   .mapPartition(new 
RichMapPartitionFunction, Boolean>() {
+
+ @Override
--- End diff --

Can you adjust the indention to be as in the other test methods?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

2016-04-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r59750088
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AbstractJoinDescriptor.java
 ---
@@ -142,9 +142,9 @@ public boolean areCompatible(RequestedGlobalProperties 
requested1, RequestedGlob
else if(produced1.getPartitioning() == 
PartitioningProperty.RANGE_PARTITIONED &&
produced2.getPartitioning() == 
PartitioningProperty.RANGE_PARTITIONED) {
--- End diff --

Add checks that both distributions are not `null`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

2016-04-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r59750107
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java
 ---
@@ -151,10 +151,10 @@ public boolean 
areCompatible(RequestedGlobalProperties requested1, RequestedGlob
else if(produced1.getPartitioning() == 
PartitioningProperty.RANGE_PARTITIONED &&
produced2.getPartitioning() == 
PartitioningProperty.RANGE_PARTITIONED) {
--- End diff --

Add checks that both distributions are not `null`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

2016-04-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r59749772
  
--- Diff: 
flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupGlobalPropertiesCompatibilityTest.java
 ---
@@ -95,6 +97,32 @@ public int partition(Object key, int numPartitions) {

assertTrue(descr.areCompatible(reqLeft, 
reqRight, propsLeft, propsRight));
}
+
+   TestDistribution dist1 = new TestDistribution(1);
+   TestDistribution dist2 = new TestDistribution(1);
+   
+   // test compatible range partitioning
+   {
--- End diff --

Can you add a checks with two keys, one DESC and one ASC? 
Orders of both should be the same to check if it is correctly identified as 
compatible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

2016-04-11 Thread gallenvara
Github user gallenvara commented on the pull request:

https://github.com/apache/flink/pull/1838#issuecomment-208712697
  
@fhueske Thanks a lot for your advice. PR updated. Please forgive my 
limited understand about the logic of `GlobalProperties`. I added tests to 
`CustomDistributionITCase` about the number of partition key being less or 
larger than the number in the distribution.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

2016-04-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r59233583
  
--- Diff: 
flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupGlobalPropertiesCompatibilityTest.java
 ---
@@ -93,7 +95,34 @@ public int partition(Object key, int numPartitions) {
GlobalProperties propsRight = new 
GlobalProperties();
propsRight.setCustomPartitioned(keysRight, 
part);

-   assertTrue(descr.areCompatible(reqLeft, 
reqRight, propsLeft, propsRight));
+   assertTrue(descr1.areCompatible(reqLeft, 
reqRight, propsLeft, propsRight));
+   }
+
+   TestDistribution dist1 = new TestDistribution(1);
+   TestDistribution dist2 = new TestDistribution(1);
+   CoGroupDescriptor descr2 = new 
CoGroupDescriptor(keysLeft, keysRight);
+   
+   // test compatible range partitioning
--- End diff --

We also need tests for a range partitioning with two (or more) keys and 
tests for range partitioning with different orders.
The tests should check for compatible and incompatible partitionings.
Same applies for the `JoinGlobalPropertiesCompatibilityTest`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

2016-04-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r59233074
  
--- Diff: 
flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupGlobalPropertiesCompatibilityTest.java
 ---
@@ -35,7 +37,7 @@ public void checkCompatiblePartitionings() {
final FieldList keysLeft = new FieldList(1, 4);
final FieldList keysRight = new FieldList(3, 1);

-   CoGroupDescriptor descr = new 
CoGroupDescriptor(keysLeft, keysRight);
+   CoGroupDescriptor descr1 = new 
CoGroupDescriptor(keysLeft, keysRight);
--- End diff --

Can't we use this object for the new test as well? Then we would not need 
to rename it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

2016-04-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r59232978
  
--- Diff: 
flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java
 ---
@@ -93,7 +95,34 @@ public int partition(Object key, int numPartitions) {
GlobalProperties propsRight = new 
GlobalProperties();
propsRight.setCustomPartitioned(keysRight, 
part);

-   assertTrue(descr.areCompatible(reqLeft, 
reqRight, propsLeft, propsRight));
+   assertTrue(descr1.areCompatible(reqLeft, 
reqRight, propsLeft, propsRight));
+   }
+
+   TestDistribution dist1 = new TestDistribution(1);
+   TestDistribution dist2 = new TestDistribution(1);
+   SortMergeInnerJoinDescriptor descr2 = new 
SortMergeInnerJoinDescriptor(keysLeft, keysRight);
+   
+   // test compatible range partitioning
+   {
+   Ordering ordering1 = new Ordering();
+   for (int field : keysLeft) {
+   ordering1.appendOrdering(field, null, 
Order.ASCENDING);
+   }
+   Ordering ordering2 = new Ordering();
+   for (int field : keysRight) {
+   ordering2.appendOrdering(field, null, 
Order.ASCENDING);
+   }
+
+   RequestedGlobalProperties reqLeft = new 
RequestedGlobalProperties();
+   reqLeft.setRangePartitioned(ordering1, dist1);
+   RequestedGlobalProperties reqRigth = new 
RequestedGlobalProperties();
--- End diff --

typo: `reqRigth` -> `reqRight`
copy-pasted to the CoGroup test as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

2016-04-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r59232837
  
--- Diff: 
flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java
 ---
@@ -35,7 +37,7 @@ public void checkCompatiblePartitionings() {
final FieldList keysLeft = new FieldList(1, 4);
final FieldList keysRight = new FieldList(3, 1);
 
-   SortMergeInnerJoinDescriptor descr = new 
SortMergeInnerJoinDescriptor(keysLeft, keysRight);
+   SortMergeInnerJoinDescriptor descr1 = new 
SortMergeInnerJoinDescriptor(keysLeft, keysRight);
--- End diff --

why do you rename this variable. I think it can be used for the new tests 
as well, no?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

2016-04-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r59232583
  
--- Diff: 
flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/TestDistribution.java
 ---
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.operators;
+
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+public class TestDistribution implements DataDistribution {
--- End diff --

I think this class can be simplified a lot for our test purposes. We do not 
need to provide proper implementations for the most of the functionality 
(`getParallelism`, `getBucketBoundary`, `write`, `read`). `equals` can also be 
mocked to check for a single value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

2016-04-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r59231845
  
--- Diff: 
flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinWithDistribution.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.operators;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.optimizer.plan.*;
+import org.apache.flink.optimizer.util.CompilerTestBase;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class JoinWithDistribution extends CompilerTestBase {
--- End diff --

Rename to `JoinWithDistributionTest`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

2016-04-11 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1838#issuecomment-208421293
  
`GlobalProperties` must copy the `distribution` itself. There are three 
places where this must be added:
1) `filterBySemanticProperties()`, line 314: `gp.distribution = 
this.distribution;`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

2016-04-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r59229736
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
 ---
@@ -83,6 +83,10 @@ public void setHashPartitioned(FieldList 
partitionedFields) {
this.partitioningFields = partitionedFields;
this.ordering = null;
}
+   
+   public void setDistribution(DataDistribution distribution) {
--- End diff --

Distribution should not be set from outside.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

2016-04-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r59229638
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java 
---
@@ -427,6 +428,14 @@ public void 
computeInterestingPropertiesForInputs(CostEstimator estimator) {
}
}

+   if 
(child1.getInputs().iterator().hasNext()) {
--- End diff --

This change is not necessary. The data distribution should be set within 
`GlobalProperties`. I'll point you to the correct places.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

2016-04-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r59229663
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java 
---
@@ -454,6 +463,14 @@ public void 
computeInterestingPropertiesForInputs(CostEstimator estimator) {
}
}

+   if 
(child2.getInputs().iterator().hasNext()) {
--- End diff --

Same as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

2016-03-30 Thread gallenvara
Github user gallenvara commented on the pull request:

https://github.com/apache/flink/pull/1838#issuecomment-203350617
  
@fhueske Yes, `TwoInputNode` rebuild the channels and `child` nodes don't 
have the information of `data distribution`. I have added the information into 
them and PR has been updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

2016-03-30 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1838#issuecomment-203307525
  
I'm currently on vacation. Will have a closer look when I'm back in about a 
week.

I am not sure that we need to touch the Join and CoGroup operators to pass 
the distributions. The optimizer is able to get this from the GlobalProperties 
to decide whether the partitionings are valid and equivalent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

2016-03-29 Thread gallenvara
Github user gallenvara commented on the pull request:

https://github.com/apache/flink/pull/1838#issuecomment-203216796
  
@fhueske @ChengXiangLi Can you please help with review? :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

2016-03-29 Thread gallenvara
GitHub user gallenvara opened a pull request:

https://github.com/apache/flink/pull/1838

[FLINK-2998] Support range partition comparison for multi input nodes.

The PR implements range partition comparison in operation such as join and 
cogroup for multi inputs, now optimizer can optimize the dag to avoid 
re-partition if it find the data distributions user supplied are equal.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gallenvara/flink flink-2998

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1838.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1838


commit 37e6147a829e50ba8a45c26f225e16e7695f6489
Author: gallenvara 
Date:   2016-03-29T14:36:21Z

Support range partition comparison for multi input nodes.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---