[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-03-22 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1553#issuecomment-199798994
  
Closed this PR in favor of PR #1822 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-03-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1553


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-03-18 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1553#issuecomment-198364412
  
Hi @ramkrish86, I thought about this PR and came to the conclusion that we 
should not continue. The optimizer's design does not allow to modify operators 
in or inject operators into enumerated subplans. This might cause invalid 
execution plans and in worst case wrong results without somebody noticing it.

I would simply log a WARN message that a combiner was not added if the 
optimizer identifies a Partition operator in front of a Reduce or combinable 
GroupReduce operator and give a hint that an explicit CombinerFunction can be 
added with groupCombine in front of the partition operator.

Sorry again @ramkrish86 that I lead you into a dead end with this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-03-07 Thread ramkrish86
Github user ramkrish86 commented on the pull request:

https://github.com/apache/flink/pull/1553#issuecomment-193595361
  
@fhueske 
I was looking into the comments and the refactoring I can avoid by creating 
a new patch altogether. But saying the last comment I think I can hold this off 
for some time. Felt sad as I wanted to bring this to closure. Anyway you are 
the expert so I need to adhere to your words here. No problem. Will check if I 
can take up some other JIRA or do you have any suggestions till then?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-03-07 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1553#issuecomment-193272515
  
Hi Ram, 
I just realized that the approach taken here might not work. We are 
modifying the plan while it is enumerated. There might be cases, where this 
leads to compiler errors or wrong plans. I have to check which side effects the 
plan modification might have.

I would suggest we put this PR for a few days on ice and I check whether it 
is possible to continue or if we have to find another approach.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-03-07 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1553#discussion_r55208348
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
 ---
@@ -126,12 +117,66 @@ public SingleInputPlanNode instantiate(Channel in, 
SingleInputNode node) {
}
toReducer.setLocalStrategy(LocalStrategy.COMBININGSORT, 
in.getLocalStrategyKeys(),

in.getLocalStrategySortOrder());
-
return new SingleInputPlanNode(node, "Reduce 
("+node.getOperator().getName()+")",

toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
}
}
 
+   private SingleInputPlanNode injectCombinerBeforPartitioner(Channel in, 
SingleInputNode node) {
+   // Inject a combiner before the partition node
+   Channel channelWithPartitionSrc = new Channel(in.getSource());
+   GroupReduceNode combinerNode = ((GroupReduceNode) 
node).getCombinerUtilityNode();
+   combinerNode.setParallelism(in.getSource().getParallelism());
+   if(in.getSource().getInputs().iterator().hasNext()) {
+   Channel oldChannelToPartitioner = 
channelWithPartitionSrc.getSource().getInputs().iterator().next();
+   Channel toCombiner = new 
Channel(oldChannelToPartitioner.getSource());
+// A combiner plan node is created with the channel as input 
that has the partitioner as the target
+   toCombiner.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
+SingleInputPlanNode combiner = new 
SingleInputPlanNode(combinerNode, "Combine("+node.getOperator()
+.getName()+")", toCombiner, 
DriverStrategy.SORTED_GROUP_COMBINE);
+setCombinerProperties(in, oldChannelToPartitioner, combiner);
+
+   Channel toPartitioner = new Channel(combiner);
+   // Set the actual partitioner node's strategy key and 
strategy order
+   
toPartitioner.setShipStrategy(oldChannelToPartitioner.getShipStrategy(), 
oldChannelToPartitioner.getShipStrategyKeys(),
+   
oldChannelToPartitioner.getShipStrategySortOrder(), 
oldChannelToPartitioner.getDataExchangeMode());
+// Create the partition single input plan node from the 
existing partition node
+PlanNode partitionplanNode = in.getSource().getPlanNode();
+SingleInputPlanNode partition = new 
SingleInputPlanNode(in.getSource().getOptimizerNode(), 
partitionplanNode.getNodeName(),
+toPartitioner, partitionplanNode.getDriverStrategy());
+partition.setCosts(partitionplanNode.getNodeCosts());
+
partition.initProperties(partitionplanNode.getGlobalProperties(), 
partitionplanNode.getLocalProperties());
+// Create a reducer such that the input of the reducer is the 
partition node
+Channel toReducer = new Channel(partition);
+toReducer.setShipStrategy(in.getShipStrategy(), 
in.getShipStrategyKeys(),
+in.getShipStrategySortOrder(), in.getDataExchangeMode());
+return getReducerSingleInputPlanNode(toReducer, node);
+} else {
+return getReducerSingleInputPlanNode(in, node);
+}
+   }
+
+   private void setCombinerProperties(Channel in, Channel 
oldChannelToPartitioner, SingleInputPlanNode combiner) {
+   combiner.setCosts(new Costs(0, 0));
+   
combiner.initProperties(oldChannelToPartitioner.getGlobalProperties(), 
oldChannelToPartitioner.getLocalProperties());
+   // set sorting comparator key info
+   combiner.setDriverKeyInfo(in.getLocalStrategyKeys(), 
in.getLocalStrategySortOrder(), 0);
+   // set grouping comparator key info
+   combiner.setDriverKeyInfo(this.keyList, 1);
+   }
+
+   private SingleInputPlanNode getReducerSingleInputPlanNode(Channel in, 
SingleInputNode node) {
--- End diff --

Please change parameter names: in -> toReducer, node -> reduceNode


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-03-07 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1553#discussion_r55208210
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
 ---
@@ -126,12 +117,66 @@ public SingleInputPlanNode instantiate(Channel in, 
SingleInputNode node) {
}
toReducer.setLocalStrategy(LocalStrategy.COMBININGSORT, 
in.getLocalStrategyKeys(),

in.getLocalStrategySortOrder());
-
return new SingleInputPlanNode(node, "Reduce 
("+node.getOperator().getName()+")",

toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
}
}
 
+   private SingleInputPlanNode injectCombinerBeforPartitioner(Channel in, 
SingleInputNode node) {
+   // Inject a combiner before the partition node
+   Channel channelWithPartitionSrc = new Channel(in.getSource());
+   GroupReduceNode combinerNode = ((GroupReduceNode) 
node).getCombinerUtilityNode();
+   combinerNode.setParallelism(in.getSource().getParallelism());
+   if(in.getSource().getInputs().iterator().hasNext()) {
+   Channel oldChannelToPartitioner = 
channelWithPartitionSrc.getSource().getInputs().iterator().next();
+   Channel toCombiner = new 
Channel(oldChannelToPartitioner.getSource());
+// A combiner plan node is created with the channel as input 
that has the partitioner as the target
+   toCombiner.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
+SingleInputPlanNode combiner = new 
SingleInputPlanNode(combinerNode, "Combine("+node.getOperator()
+.getName()+")", toCombiner, 
DriverStrategy.SORTED_GROUP_COMBINE);
+setCombinerProperties(in, oldChannelToPartitioner, combiner);
+
+   Channel toPartitioner = new Channel(combiner);
+   // Set the actual partitioner node's strategy key and 
strategy order
+   
toPartitioner.setShipStrategy(oldChannelToPartitioner.getShipStrategy(), 
oldChannelToPartitioner.getShipStrategyKeys(),
+   
oldChannelToPartitioner.getShipStrategySortOrder(), 
oldChannelToPartitioner.getDataExchangeMode());
+// Create the partition single input plan node from the 
existing partition node
+PlanNode partitionplanNode = in.getSource().getPlanNode();
+SingleInputPlanNode partition = new 
SingleInputPlanNode(in.getSource().getOptimizerNode(), 
partitionplanNode.getNodeName(),
+toPartitioner, partitionplanNode.getDriverStrategy());
+partition.setCosts(partitionplanNode.getNodeCosts());
+
partition.initProperties(partitionplanNode.getGlobalProperties(), 
partitionplanNode.getLocalProperties());
+// Create a reducer such that the input of the reducer is the 
partition node
+Channel toReducer = new Channel(partition);
+toReducer.setShipStrategy(in.getShipStrategy(), 
in.getShipStrategyKeys(),
+in.getShipStrategySortOrder(), in.getDataExchangeMode());
+return getReducerSingleInputPlanNode(toReducer, node);
+} else {
+return getReducerSingleInputPlanNode(in, node);
+}
+   }
+
+   private void setCombinerProperties(Channel in, Channel 
oldChannelToPartitioner, SingleInputPlanNode combiner) {
--- End diff --

please use meaningful parameter names for `in` and 
`oldChannelToPartitioner`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-03-07 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1553#discussion_r55205009
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
 ---
@@ -126,12 +117,66 @@ public SingleInputPlanNode instantiate(Channel in, 
SingleInputNode node) {
}
toReducer.setLocalStrategy(LocalStrategy.COMBININGSORT, 
in.getLocalStrategyKeys(),

in.getLocalStrategySortOrder());
-
return new SingleInputPlanNode(node, "Reduce 
("+node.getOperator().getName()+")",

toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
}
}
 
+   private SingleInputPlanNode injectCombinerBeforPartitioner(Channel in, 
SingleInputNode node) {
+   // Inject a combiner before the partition node
+   Channel channelWithPartitionSrc = new Channel(in.getSource());
--- End diff --

This channel is not really used and should not be created


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-03-07 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1553#discussion_r55205422
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
 ---
@@ -126,12 +117,66 @@ public SingleInputPlanNode instantiate(Channel in, 
SingleInputNode node) {
}
toReducer.setLocalStrategy(LocalStrategy.COMBININGSORT, 
in.getLocalStrategyKeys(),

in.getLocalStrategySortOrder());
-
return new SingleInputPlanNode(node, "Reduce 
("+node.getOperator().getName()+")",

toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
}
}
 
+   private SingleInputPlanNode injectCombinerBeforPartitioner(Channel in, 
SingleInputNode node) {
+   // Inject a combiner before the partition node
+   Channel channelWithPartitionSrc = new Channel(in.getSource());
+   GroupReduceNode combinerNode = ((GroupReduceNode) 
node).getCombinerUtilityNode();
+   combinerNode.setParallelism(in.getSource().getParallelism());
+   if(in.getSource().getInputs().iterator().hasNext()) {
+   Channel oldChannelToPartitioner = 
channelWithPartitionSrc.getSource().getInputs().iterator().next();
+   Channel toCombiner = new 
Channel(oldChannelToPartitioner.getSource());
+// A combiner plan node is created with the channel as input 
that has the partitioner as the target
+   toCombiner.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
+SingleInputPlanNode combiner = new 
SingleInputPlanNode(combinerNode, "Combine("+node.getOperator()
+.getName()+")", toCombiner, 
DriverStrategy.SORTED_GROUP_COMBINE);
+setCombinerProperties(in, oldChannelToPartitioner, combiner);
+
+   Channel toPartitioner = new Channel(combiner);
+   // Set the actual partitioner node's strategy key and 
strategy order
+   
toPartitioner.setShipStrategy(oldChannelToPartitioner.getShipStrategy(), 
oldChannelToPartitioner.getShipStrategyKeys(),
+   
oldChannelToPartitioner.getShipStrategySortOrder(), 
oldChannelToPartitioner.getDataExchangeMode());
+// Create the partition single input plan node from the 
existing partition node
+PlanNode partitionplanNode = in.getSource().getPlanNode();
--- End diff --

Indention is off (please check all your changes)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-03-07 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1553#discussion_r55205324
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
 ---
@@ -126,12 +117,66 @@ public SingleInputPlanNode instantiate(Channel in, 
SingleInputNode node) {
}
toReducer.setLocalStrategy(LocalStrategy.COMBININGSORT, 
in.getLocalStrategyKeys(),

in.getLocalStrategySortOrder());
-
return new SingleInputPlanNode(node, "Reduce 
("+node.getOperator().getName()+")",

toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
}
}
 
+   private SingleInputPlanNode injectCombinerBeforPartitioner(Channel in, 
SingleInputNode node) {
+   // Inject a combiner before the partition node
+   Channel channelWithPartitionSrc = new Channel(in.getSource());
+   GroupReduceNode combinerNode = ((GroupReduceNode) 
node).getCombinerUtilityNode();
+   combinerNode.setParallelism(in.getSource().getParallelism());
+   if(in.getSource().getInputs().iterator().hasNext()) {
+   Channel oldChannelToPartitioner = 
channelWithPartitionSrc.getSource().getInputs().iterator().next();
+   Channel toCombiner = new 
Channel(oldChannelToPartitioner.getSource());
+// A combiner plan node is created with the channel as input 
that has the partitioner as the target
+   toCombiner.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
+SingleInputPlanNode combiner = new 
SingleInputPlanNode(combinerNode, "Combine("+node.getOperator()
+.getName()+")", toCombiner, 
DriverStrategy.SORTED_GROUP_COMBINE);
+setCombinerProperties(in, oldChannelToPartitioner, combiner);
+
+   Channel toPartitioner = new Channel(combiner);
+   // Set the actual partitioner node's strategy key and 
strategy order
+   
toPartitioner.setShipStrategy(oldChannelToPartitioner.getShipStrategy(), 
oldChannelToPartitioner.getShipStrategyKeys(),
+   
oldChannelToPartitioner.getShipStrategySortOrder(), 
oldChannelToPartitioner.getDataExchangeMode());
+// Create the partition single input plan node from the 
existing partition node
+PlanNode partitionplanNode = in.getSource().getPlanNode();
+SingleInputPlanNode partition = new 
SingleInputPlanNode(in.getSource().getOptimizerNode(), 
partitionplanNode.getNodeName(),
+toPartitioner, partitionplanNode.getDriverStrategy());
+partition.setCosts(partitionplanNode.getNodeCosts());
+
partition.initProperties(partitionplanNode.getGlobalProperties(), 
partitionplanNode.getLocalProperties());
+// Create a reducer such that the input of the reducer is the 
partition node
+Channel toReducer = new Channel(partition);
+toReducer.setShipStrategy(in.getShipStrategy(), 
in.getShipStrategyKeys(),
+in.getShipStrategySortOrder(), in.getDataExchangeMode());
+return getReducerSingleInputPlanNode(toReducer, node);
+} else {
+return getReducerSingleInputPlanNode(in, node);
--- End diff --

in.getSource is the partitioner an should thus have exactly one input. If 
this is not the case, something is wrong and we should throw a 
`CompilerException`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-03-07 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1553#discussion_r55205361
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
 ---
@@ -126,12 +117,66 @@ public SingleInputPlanNode instantiate(Channel in, 
SingleInputNode node) {
}
toReducer.setLocalStrategy(LocalStrategy.COMBININGSORT, 
in.getLocalStrategyKeys(),

in.getLocalStrategySortOrder());
-
return new SingleInputPlanNode(node, "Reduce 
("+node.getOperator().getName()+")",

toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
}
}
 
+   private SingleInputPlanNode injectCombinerBeforPartitioner(Channel in, 
SingleInputNode node) {
+   // Inject a combiner before the partition node
+   Channel channelWithPartitionSrc = new Channel(in.getSource());
+   GroupReduceNode combinerNode = ((GroupReduceNode) 
node).getCombinerUtilityNode();
+   combinerNode.setParallelism(in.getSource().getParallelism());
+   if(in.getSource().getInputs().iterator().hasNext()) {
+   Channel oldChannelToPartitioner = 
channelWithPartitionSrc.getSource().getInputs().iterator().next();
+   Channel toCombiner = new 
Channel(oldChannelToPartitioner.getSource());
+// A combiner plan node is created with the channel as input 
that has the partitioner as the target
+   toCombiner.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
+SingleInputPlanNode combiner = new 
SingleInputPlanNode(combinerNode, "Combine("+node.getOperator()
--- End diff --

Indention is off


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-03-07 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1553#discussion_r55205091
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
 ---
@@ -126,12 +117,66 @@ public SingleInputPlanNode instantiate(Channel in, 
SingleInputNode node) {
}
toReducer.setLocalStrategy(LocalStrategy.COMBININGSORT, 
in.getLocalStrategyKeys(),

in.getLocalStrategySortOrder());
-
return new SingleInputPlanNode(node, "Reduce 
("+node.getOperator().getName()+")",

toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
}
}
 
+   private SingleInputPlanNode injectCombinerBeforPartitioner(Channel in, 
SingleInputNode node) {
--- End diff --

Please use meaningful parameter name: in -> toReducer, node -> reduceNode


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-03-07 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1553#discussion_r55204942
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
 ---
@@ -126,12 +117,66 @@ public SingleInputPlanNode instantiate(Channel in, 
SingleInputNode node) {
}
toReducer.setLocalStrategy(LocalStrategy.COMBININGSORT, 
in.getLocalStrategyKeys(),

in.getLocalStrategySortOrder());
-
return new SingleInputPlanNode(node, "Reduce 
("+node.getOperator().getName()+")",

toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
}
}
 
+   private SingleInputPlanNode injectCombinerBeforPartitioner(Channel in, 
SingleInputNode node) {
--- End diff --

Typo in method name injectCombinerBefor**e**Partitioner


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-03-03 Thread ramkrish86
Github user ramkrish86 commented on the pull request:

https://github.com/apache/flink/pull/1553#issuecomment-192131350
  
New PR submitted @fhueske . Thanks for helping me thro this code review. It 
is was more of a beginner and there is a lot to learn from my side. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-02-29 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1553#issuecomment-190124885
  
I do not have a concrete use case in mind, but it is certainly possible to 
implement such a job in the DataSet API. Hence, it should be correctly 
translated. 
You can do this for example like this:

```
DataSet> data = ...
DataSet> pData = data.partitionByHash(0);
pData.map(new SomeMapFunc())
 .output(new DiscardingOutputFormat>());
pData.groupReduce(new SomeCombinableGReduceFunc())
 .output(new DiscardingOutputFormat>());
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-02-29 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1553#discussion_r54386465
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
 ---
@@ -87,19 +92,39 @@ public DriverStrategy getStrategy() {
return DriverStrategy.SORTED_GROUP_REDUCE;
}
 
-   @Override
public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
if (in.getShipStrategy() == ShipStrategyType.FORWARD) {
// adjust a sort (changes grouping, so it must be for 
this driver to combining sort
-   if (in.getLocalStrategy() == LocalStrategy.SORT) {
-   if 
(!in.getLocalStrategyKeys().isValidUnorderedPrefix(this.keys)) {
-   throw new RuntimeException("Bug: 
Inconsistent sort for group strategy.");
+   if(in.getSource().getOptimizerNode() instanceof 
PartitionNode) {
+   // Inject a combiner before the partition node
+   Channel toCombiner = new 
Channel(in.getSource());
+   
toCombiner.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
+   GroupReduceNode combinerNode = 
((GroupReduceNode) node).getCombinerUtilityNode();
+   
combinerNode.setParallelism(in.getSource().getParallelism());
+   
if(toCombiner.getSource().getInputs().iterator().hasNext()) {
+   Channel source = 
toCombiner.getSource().getInputs().iterator().next();
+   // A combiner plan node is created with 
the map as the input
+   SingleInputPlanNode combiner = new 
SingleInputPlanNode(combinerNode, "Combine("+node.getOperator()
+   .getName()+")", source, 
DriverStrategy.SORTED_GROUP_COMBINE);
+   addCombinerNodeData(in, toCombiner, 
combiner);
+   Channel combinerChannel = new 
Channel(combiner);
+   
combinerChannel.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
--- End diff --

If we have:
`[Some-Op] --(a)-partition--> [Partition-Op] --(b)-fwd--> [Reduce-Op]`
then `in` is the `--(b)-fwd-->` channel and `node` is the the 
`[Reduce-Op]`. 

The combine operator should be inserted like this:
`[Some-Op] --(1)-fwd--> [Combine-Op] --(2)-partition--> [Partition-Op] 
--(3)-fwd--> [Reduce-Op]`

- The channel (1) must be a new forward/pipelined channel.
- The channel (2) must be new channel with the same shipping and exchange 
strategies as channel (a).
- The channel (3) should be the original channel (b) which is the `in` 
parameter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-02-26 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1553#discussion_r54329786
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
 ---
@@ -87,19 +92,39 @@ public DriverStrategy getStrategy() {
return DriverStrategy.SORTED_GROUP_REDUCE;
}
 
-   @Override
public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
if (in.getShipStrategy() == ShipStrategyType.FORWARD) {
// adjust a sort (changes grouping, so it must be for 
this driver to combining sort
-   if (in.getLocalStrategy() == LocalStrategy.SORT) {
-   if 
(!in.getLocalStrategyKeys().isValidUnorderedPrefix(this.keys)) {
-   throw new RuntimeException("Bug: 
Inconsistent sort for group strategy.");
+   if(in.getSource().getOptimizerNode() instanceof 
PartitionNode) {
+   // Inject a combiner before the partition node
+   Channel toCombiner = new 
Channel(in.getSource());
+   
toCombiner.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
+   GroupReduceNode combinerNode = 
((GroupReduceNode) node).getCombinerUtilityNode();
+   
combinerNode.setParallelism(in.getSource().getParallelism());
+   
if(toCombiner.getSource().getInputs().iterator().hasNext()) {
+   Channel source = 
toCombiner.getSource().getInputs().iterator().next();
+   // A combiner plan node is created with 
the map as the input
+   SingleInputPlanNode combiner = new 
SingleInputPlanNode(combinerNode, "Combine("+node.getOperator()
+   .getName()+")", source, 
DriverStrategy.SORTED_GROUP_COMBINE);
+   addCombinerNodeData(in, toCombiner, 
combiner);
+   Channel combinerChannel = new 
Channel(combiner);
+   
combinerChannel.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
--- End diff --

If am not wrong, the ShipStrategyType and DataExchangeMode that we set to 
the CombinerChannel should be the one associated with the 'in' node that is 
passed on to the method? Which in the case of Wordcount example is FORWARD and 
PIPELINED?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-02-26 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1553#discussion_r54329750
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
 ---
@@ -434,6 +431,95 @@ public void 
testCorrectnessOfGroupreduceWithDescendingGroupSort() throws Excepti
}
 
@Test
+   public void 
testCorrectnessOfGroupreduceWithExplicitPartitionOfReducer() throws Exception {
+   /*
+* check correctness of groupReduce with descending group sort
+*/
+   final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
--- End diff --

Ya. Copying and adapting the test is what I meant too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-02-26 Thread ramkrish86
Github user ramkrish86 commented on the pull request:

https://github.com/apache/flink/pull/1553#issuecomment-189599549
  
@fhueske 
Could you give some examples for the above use case where the partition is 
an input to more than one function?  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-02-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1553#discussion_r53998405
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
 ---
@@ -434,6 +431,95 @@ public void 
testCorrectnessOfGroupreduceWithDescendingGroupSort() throws Excepti
}
 
@Test
+   public void 
testCorrectnessOfGroupreduceWithExplicitPartitionOfReducer() throws Exception {
+   /*
+* check correctness of groupReduce with descending group sort
+*/
+   final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
--- End diff --

I'd rather copy the test and adapt the copy. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-02-24 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1553#discussion_r53978205
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
 ---
@@ -434,6 +431,95 @@ public void 
testCorrectnessOfGroupreduceWithDescendingGroupSort() throws Excepti
}
 
@Test
+   public void 
testCorrectnessOfGroupreduceWithExplicitPartitionOfReducer() throws Exception {
+   /*
+* check correctness of groupReduce with descending group sort
+*/
+   final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
--- End diff --

Is it ok if I modify the existing test 
testCorrectnessOfGroupReduceOnTuplesWithCombine() and add a partitionByHash to 
the input dataset? I tested and debugged it and it seems to work fine after the 
changes and gives the same expected result as given by 
testCorrectnessOfGroupReduceOnTuplesWithCombine().


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-02-22 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1553#discussion_r53659684
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
 ---
@@ -434,6 +431,95 @@ public void 
testCorrectnessOfGroupreduceWithDescendingGroupSort() throws Excepti
}
 
@Test
+   public void 
testCorrectnessOfGroupreduceWithExplicitPartitionOfReducer() throws Exception {
+   /*
+* check correctness of groupReduce with descending group sort
+*/
+   final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
--- End diff --

I see. Okie


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-02-22 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1553#discussion_r53659650
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
 ---
@@ -87,19 +92,39 @@ public DriverStrategy getStrategy() {
return DriverStrategy.SORTED_GROUP_REDUCE;
}
 
-   @Override
public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
if (in.getShipStrategy() == ShipStrategyType.FORWARD) {
// adjust a sort (changes grouping, so it must be for 
this driver to combining sort
-   if (in.getLocalStrategy() == LocalStrategy.SORT) {
-   if 
(!in.getLocalStrategyKeys().isValidUnorderedPrefix(this.keys)) {
-   throw new RuntimeException("Bug: 
Inconsistent sort for group strategy.");
+   if(in.getSource().getOptimizerNode() instanceof 
PartitionNode) {
+   // Inject a combiner before the partition node
+   Channel toCombiner = new 
Channel(in.getSource());
+   
toCombiner.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
+   GroupReduceNode combinerNode = 
((GroupReduceNode) node).getCombinerUtilityNode();
+   
combinerNode.setParallelism(in.getSource().getParallelism());
+   
if(toCombiner.getSource().getInputs().iterator().hasNext()) {
+   Channel source = 
toCombiner.getSource().getInputs().iterator().next();
+   // A combiner plan node is created with 
the map as the input
+   SingleInputPlanNode combiner = new 
SingleInputPlanNode(combinerNode, "Combine("+node.getOperator()
--- End diff --

Thanks for all the comments. Checking one by one. I have few questions. Let 
me get back to you after fixing the other comments. Sorry about the formatting 
things. I will take care from next time so that they are not repeated. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-02-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1553#discussion_r53153223
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
 ---
@@ -59,37 +65,69 @@ public DriverStrategy getStrategy() {
@Override
public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
if (in.getShipStrategy() == ShipStrategyType.FORWARD ||
-   (node.getBroadcastConnections() != null && 
!node.getBroadcastConnections().isEmpty()))
+   (node.getBroadcastConnections() != null && 
!node.getBroadcastConnections().isEmpty()))
{
+   // adjust a sort (changes grouping, so it must be for 
this driver to combining sort
+   if(in.getSource().getOptimizerNode() instanceof 
PartitionNode) {
+   Channel toCombiner = new 
Channel(in.getSource());
+   
toCombiner.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
+   // create an input node for combine with same 
parallelism as input node
+   ReduceNode combinerNode = ((ReduceNode) 
node).getCombinerUtilityNode();
+   
combinerNode.setParallelism(in.getSource().getParallelism());
+   
if(toCombiner.getSource().getInputs().iterator().hasNext()) {
+   Channel source = 
toCombiner.getSource().getInputs().iterator().next();
+   SingleInputPlanNode combiner = new 
SingleInputPlanNode(combinerNode,
+   "Combine 
("+node.getOperator().getName()+")", source,
+   
DriverStrategy.SORTED_PARTIAL_REDUCE, this.keyList);
+   addCombinerProperties(toCombiner, 
combiner);
+   Channel combinerChannel = new 
Channel(combiner);
+   
combinerChannel.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
+   // Create the partition single input 
plan node from the existing partition node
+   PlanNode partitionplanNode = 
in.getSource().getPlanNode();
+   SingleInputPlanNode partition = new 
SingleInputPlanNode(in.getSource().getOptimizerNode(), 
partitionplanNode.getNodeName(),
+   combinerChannel, 
partitionplanNode.getDriverStrategy());
+   
partition.setCosts(partitionplanNode.getNodeCosts());
+   
partition.initProperties(partitionplanNode.getGlobalProperties(), 
partitionplanNode.getLocalProperties());
+   // Create a reducer such that the input 
of the reducer is the partition node
+   Channel toReducer = new 
Channel(partition);
+   
toReducer.setShipStrategy(in.getShipStrategy(), in.getShipStrategyKeys(),
+   in.getShipStrategySortOrder(), 
in.getDataExchangeMode());
+   return new SingleInputPlanNode(node, 
"Reduce ("+node.getOperator().getName()+")", toReducer,
+   DriverStrategy.SORTED_REDUCE, 
this.keyList);
+   }
+   }
return new SingleInputPlanNode(node, "Reduce 
("+node.getOperator().getName()+")", in,
-   
DriverStrategy.SORTED_REDUCE, this.keyList);
+   DriverStrategy.SORTED_REDUCE, this.keyList);
}
else {
// non forward case. all local properties are killed 
anyways, so we can safely plug in a combiner
Channel toCombiner = new Channel(in.getSource());
toCombiner.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
-   
+
// create an input node for combine with same 
parallelism as input node
ReduceNode combinerNode = ((ReduceNode) 
node).getCombinerUtilityNode();

combinerNode.setParallelism(in.getSource().getParallelism());
 
SingleInputPlanNode combiner = new 
SingleInputPlanNode(combinerNode,
-   "Combine 
("+node.getOperator().getName()+")", toCombiner,
--- End diff --

No reformatting please.


---
If your project is set up for 

[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-02-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1553#discussion_r53153386
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
 ---
@@ -434,6 +431,95 @@ public void 
testCorrectnessOfGroupreduceWithDescendingGroupSort() throws Excepti
}
 
@Test
+   public void 
testCorrectnessOfGroupreduceWithExplicitPartitionOfReducer() throws Exception {
+   /*
+* check correctness of groupReduce with descending group sort
+*/
+   final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
--- End diff --

If we run this test with parallelism `1`, we cannot check if the 
partitioning is working correctly. Also use a custom `GroupReduceFunction` that 
implements the `GroupCombineFunction` interface. The combine function should be 
implemented in a way that we can actually verify that it was called.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-02-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1553#discussion_r53152587
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
 ---
@@ -87,19 +92,39 @@ public DriverStrategy getStrategy() {
return DriverStrategy.SORTED_GROUP_REDUCE;
}
 
-   @Override
public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
if (in.getShipStrategy() == ShipStrategyType.FORWARD) {
// adjust a sort (changes grouping, so it must be for 
this driver to combining sort
-   if (in.getLocalStrategy() == LocalStrategy.SORT) {
-   if 
(!in.getLocalStrategyKeys().isValidUnorderedPrefix(this.keys)) {
-   throw new RuntimeException("Bug: 
Inconsistent sort for group strategy.");
+   if(in.getSource().getOptimizerNode() instanceof 
PartitionNode) {
+   // Inject a combiner before the partition node
+   Channel toCombiner = new 
Channel(in.getSource());
+   
toCombiner.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
+   GroupReduceNode combinerNode = 
((GroupReduceNode) node).getCombinerUtilityNode();
+   
combinerNode.setParallelism(in.getSource().getParallelism());
+   
if(toCombiner.getSource().getInputs().iterator().hasNext()) {
+   Channel source = 
toCombiner.getSource().getInputs().iterator().next();
--- End diff --

This is the channel with the partitioner as target.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-02-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1553#discussion_r53152922
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
 ---
@@ -87,19 +92,39 @@ public DriverStrategy getStrategy() {
return DriverStrategy.SORTED_GROUP_REDUCE;
}
 
-   @Override
public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
if (in.getShipStrategy() == ShipStrategyType.FORWARD) {
// adjust a sort (changes grouping, so it must be for 
this driver to combining sort
-   if (in.getLocalStrategy() == LocalStrategy.SORT) {
-   if 
(!in.getLocalStrategyKeys().isValidUnorderedPrefix(this.keys)) {
-   throw new RuntimeException("Bug: 
Inconsistent sort for group strategy.");
+   if(in.getSource().getOptimizerNode() instanceof 
PartitionNode) {
+   // Inject a combiner before the partition node
+   Channel toCombiner = new 
Channel(in.getSource());
+   
toCombiner.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
+   GroupReduceNode combinerNode = 
((GroupReduceNode) node).getCombinerUtilityNode();
+   
combinerNode.setParallelism(in.getSource().getParallelism());
+   
if(toCombiner.getSource().getInputs().iterator().hasNext()) {
+   Channel source = 
toCombiner.getSource().getInputs().iterator().next();
+   // A combiner plan node is created with 
the map as the input
+   SingleInputPlanNode combiner = new 
SingleInputPlanNode(combinerNode, "Combine("+node.getOperator()
+   .getName()+")", source, 
DriverStrategy.SORTED_GROUP_COMBINE);
+   addCombinerNodeData(in, toCombiner, 
combiner);
+   Channel combinerChannel = new 
Channel(combiner);
+   
combinerChannel.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
--- End diff --

Note that the actual partitioning is defined on the `Channel` that points 
to the `Partitioner`, i.e., the data is already partitioned when it arrives at 
the partitioner.
Therefore, this channel must reuse the original `ShipStrategyType` and 
`DataExchangeMode` of the channel pointing to `Partition`. Setting it to 
`FORWARD` will remove the partitioning.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-02-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1553#discussion_r53152668
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
 ---
@@ -87,19 +92,39 @@ public DriverStrategy getStrategy() {
return DriverStrategy.SORTED_GROUP_REDUCE;
}
 
-   @Override
public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
if (in.getShipStrategy() == ShipStrategyType.FORWARD) {
// adjust a sort (changes grouping, so it must be for 
this driver to combining sort
-   if (in.getLocalStrategy() == LocalStrategy.SORT) {
-   if 
(!in.getLocalStrategyKeys().isValidUnorderedPrefix(this.keys)) {
-   throw new RuntimeException("Bug: 
Inconsistent sort for group strategy.");
+   if(in.getSource().getOptimizerNode() instanceof 
PartitionNode) {
+   // Inject a combiner before the partition node
+   Channel toCombiner = new 
Channel(in.getSource());
+   
toCombiner.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
+   GroupReduceNode combinerNode = 
((GroupReduceNode) node).getCombinerUtilityNode();
+   
combinerNode.setParallelism(in.getSource().getParallelism());
+   
if(toCombiner.getSource().getInputs().iterator().hasNext()) {
+   Channel source = 
toCombiner.getSource().getInputs().iterator().next();
+   // A combiner plan node is created with 
the map as the input
+   SingleInputPlanNode combiner = new 
SingleInputPlanNode(combinerNode, "Combine("+node.getOperator()
--- End diff --

We should use a new channel here instead of the existing `source` channel. 
In the other branch, the new channel is called `toCombiner`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-02-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1553#discussion_r53153193
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
 ---
@@ -59,37 +65,69 @@ public DriverStrategy getStrategy() {
@Override
public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
if (in.getShipStrategy() == ShipStrategyType.FORWARD ||
-   (node.getBroadcastConnections() != null && 
!node.getBroadcastConnections().isEmpty()))
+   (node.getBroadcastConnections() != null && 
!node.getBroadcastConnections().isEmpty()))
{
+   // adjust a sort (changes grouping, so it must be for 
this driver to combining sort
+   if(in.getSource().getOptimizerNode() instanceof 
PartitionNode) {
--- End diff --

Please refactor the code of this branch into a method called 
`injectCombinerBeforePartitioner()`. Some of the comments in the other class 
apply here as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-02-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1553#discussion_r53152430
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
 ---
@@ -87,19 +92,39 @@ public DriverStrategy getStrategy() {
return DriverStrategy.SORTED_GROUP_REDUCE;
}
 
-   @Override
public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
if (in.getShipStrategy() == ShipStrategyType.FORWARD) {
// adjust a sort (changes grouping, so it must be for 
this driver to combining sort
-   if (in.getLocalStrategy() == LocalStrategy.SORT) {
-   if 
(!in.getLocalStrategyKeys().isValidUnorderedPrefix(this.keys)) {
-   throw new RuntimeException("Bug: 
Inconsistent sort for group strategy.");
+   if(in.getSource().getOptimizerNode() instanceof 
PartitionNode) {
+   // Inject a combiner before the partition node
+   Channel toCombiner = new 
Channel(in.getSource());
--- End diff --

This channel is also not used in the plan. It is just used for look-ups.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-02-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1553#discussion_r53152240
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
 ---
@@ -87,19 +92,39 @@ public DriverStrategy getStrategy() {
return DriverStrategy.SORTED_GROUP_REDUCE;
}
 
-   @Override
public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
if (in.getShipStrategy() == ShipStrategyType.FORWARD) {
// adjust a sort (changes grouping, so it must be for 
this driver to combining sort
-   if (in.getLocalStrategy() == LocalStrategy.SORT) {
-   if 
(!in.getLocalStrategyKeys().isValidUnorderedPrefix(this.keys)) {
-   throw new RuntimeException("Bug: 
Inconsistent sort for group strategy.");
+   if(in.getSource().getOptimizerNode() instanceof 
PartitionNode) {
+   // Inject a combiner before the partition node
+   Channel toCombiner = new 
Channel(in.getSource());
+   
toCombiner.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
+   GroupReduceNode combinerNode = 
((GroupReduceNode) node).getCombinerUtilityNode();
+   
combinerNode.setParallelism(in.getSource().getParallelism());
+   
if(toCombiner.getSource().getInputs().iterator().hasNext()) {
+   Channel source = 
toCombiner.getSource().getInputs().iterator().next();
+   // A combiner plan node is created with 
the map as the input
--- End diff --

The operator before the partitioner is not necessarily a `mapper`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-02-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1553#discussion_r53152054
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
 ---
@@ -110,28 +135,49 @@ public SingleInputPlanNode instantiate(Channel in, 
SingleInputNode node) {

combinerNode.setParallelism(in.getSource().getParallelism());
 
SingleInputPlanNode combiner = new 
SingleInputPlanNode(combinerNode, "Combine("+node.getOperator()
-   .getName()+")", toCombiner, 
DriverStrategy.SORTED_GROUP_COMBINE);
+   .getName()+")", toCombiner, 
DriverStrategy.SORTED_GROUP_COMBINE);
combiner.setCosts(new Costs(0, 0));
--- End diff --

use the `setCombinerProperties()` method here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-02-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1553#discussion_r53152023
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
 ---
@@ -110,28 +135,49 @@ public SingleInputPlanNode instantiate(Channel in, 
SingleInputNode node) {

combinerNode.setParallelism(in.getSource().getParallelism());
 
SingleInputPlanNode combiner = new 
SingleInputPlanNode(combinerNode, "Combine("+node.getOperator()
-   .getName()+")", toCombiner, 
DriverStrategy.SORTED_GROUP_COMBINE);
+   .getName()+")", toCombiner, 
DriverStrategy.SORTED_GROUP_COMBINE);
combiner.setCosts(new Costs(0, 0));

combiner.initProperties(toCombiner.getGlobalProperties(), 
toCombiner.getLocalProperties());
// set sorting comparator key info
combiner.setDriverKeyInfo(in.getLocalStrategyKeys(), 
in.getLocalStrategySortOrder(), 0);
// set grouping comparator key info
combiner.setDriverKeyInfo(this.keyList, 1);
-   
+
Channel toReducer = new Channel(combiner);
toReducer.setShipStrategy(in.getShipStrategy(), 
in.getShipStrategyKeys(),
-   
in.getShipStrategySortOrder(), in.getDataExchangeMode());
+   in.getShipStrategySortOrder(), 
in.getDataExchangeMode());
if (in.getShipStrategy() == 
ShipStrategyType.PARTITION_RANGE) {

toReducer.setDataDistribution(in.getDataDistribution());
}
toReducer.setLocalStrategy(LocalStrategy.COMBININGSORT, 
in.getLocalStrategyKeys(),
-   
in.getLocalStrategySortOrder());
+   in.getLocalStrategySortOrder());
 
return new SingleInputPlanNode(node, "Reduce 
("+node.getOperator().getName()+")",
-   
toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
+   toReducer, DriverStrategy.SORTED_GROUP_REDUCE, 
this.keyList);
}
}
 
+   private void addCombinerNodeData(Channel in, Channel toCombiner, 
SingleInputPlanNode combiner) {
--- End diff --

please rename method to `setCombinerProperties()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-02-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1553#discussion_r53151692
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
 ---
@@ -87,19 +92,39 @@ public DriverStrategy getStrategy() {
return DriverStrategy.SORTED_GROUP_REDUCE;
}
 
-   @Override
public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
if (in.getShipStrategy() == ShipStrategyType.FORWARD) {
// adjust a sort (changes grouping, so it must be for 
this driver to combining sort
-   if (in.getLocalStrategy() == LocalStrategy.SORT) {
-   if 
(!in.getLocalStrategyKeys().isValidUnorderedPrefix(this.keys)) {
-   throw new RuntimeException("Bug: 
Inconsistent sort for group strategy.");
+   if(in.getSource().getOptimizerNode() instanceof 
PartitionNode) {
+   // Inject a combiner before the partition node
+   Channel toCombiner = new 
Channel(in.getSource());
--- End diff --

bad variable name. This channel will not lead to the combiner.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-02-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1553#discussion_r53152093
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
 ---
@@ -110,28 +135,49 @@ public SingleInputPlanNode instantiate(Channel in, 
SingleInputNode node) {

combinerNode.setParallelism(in.getSource().getParallelism());
 
SingleInputPlanNode combiner = new 
SingleInputPlanNode(combinerNode, "Combine("+node.getOperator()
-   .getName()+")", toCombiner, 
DriverStrategy.SORTED_GROUP_COMBINE);
+   .getName()+")", toCombiner, 
DriverStrategy.SORTED_GROUP_COMBINE);
combiner.setCosts(new Costs(0, 0));

combiner.initProperties(toCombiner.getGlobalProperties(), 
toCombiner.getLocalProperties());
// set sorting comparator key info
combiner.setDriverKeyInfo(in.getLocalStrategyKeys(), 
in.getLocalStrategySortOrder(), 0);
// set grouping comparator key info
combiner.setDriverKeyInfo(this.keyList, 1);
-   
+
Channel toReducer = new Channel(combiner);
toReducer.setShipStrategy(in.getShipStrategy(), 
in.getShipStrategyKeys(),
-   
in.getShipStrategySortOrder(), in.getDataExchangeMode());
--- End diff --

Please avoid reformatting changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-02-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1553#discussion_r53151777
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
 ---
@@ -87,19 +92,39 @@ public DriverStrategy getStrategy() {
return DriverStrategy.SORTED_GROUP_REDUCE;
}
 
-   @Override
public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
if (in.getShipStrategy() == ShipStrategyType.FORWARD) {
// adjust a sort (changes grouping, so it must be for 
this driver to combining sort
-   if (in.getLocalStrategy() == LocalStrategy.SORT) {
-   if 
(!in.getLocalStrategyKeys().isValidUnorderedPrefix(this.keys)) {
-   throw new RuntimeException("Bug: 
Inconsistent sort for group strategy.");
+   if(in.getSource().getOptimizerNode() instanceof 
PartitionNode) {
--- End diff --

I think we can move the code of this branch into a method called 
`injectCombinerBeforePartitioner()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-02-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1553#discussion_r53151545
  
--- Diff: 
flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
 ---
@@ -66,7 +66,7 @@ public static void main(String[] args) throws Exception {
 
DataSet> counts = 
// split up the lines in pairs (2-tuples) 
containing: (word,1)
-   text.flatMap(new Tokenizer())
+   (text.flatMap(new Tokenizer()))
--- End diff --

Unnecessary change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-02-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1553#discussion_r53151628
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
 ---
@@ -110,28 +135,49 @@ public SingleInputPlanNode instantiate(Channel in, 
SingleInputNode node) {

combinerNode.setParallelism(in.getSource().getParallelism());
 
SingleInputPlanNode combiner = new 
SingleInputPlanNode(combinerNode, "Combine("+node.getOperator()
-   .getName()+")", toCombiner, 
DriverStrategy.SORTED_GROUP_COMBINE);
+   .getName()+")", toCombiner, 
DriverStrategy.SORTED_GROUP_COMBINE);
combiner.setCosts(new Costs(0, 0));

combiner.initProperties(toCombiner.getGlobalProperties(), 
toCombiner.getLocalProperties());
// set sorting comparator key info
combiner.setDriverKeyInfo(in.getLocalStrategyKeys(), 
in.getLocalStrategySortOrder(), 0);
// set grouping comparator key info
combiner.setDriverKeyInfo(this.keyList, 1);
-   
+
Channel toReducer = new Channel(combiner);
toReducer.setShipStrategy(in.getShipStrategy(), 
in.getShipStrategyKeys(),
-   
in.getShipStrategySortOrder(), in.getDataExchangeMode());
+   in.getShipStrategySortOrder(), 
in.getDataExchangeMode());
if (in.getShipStrategy() == 
ShipStrategyType.PARTITION_RANGE) {

toReducer.setDataDistribution(in.getDataDistribution());
}
toReducer.setLocalStrategy(LocalStrategy.COMBININGSORT, 
in.getLocalStrategyKeys(),
-   
in.getLocalStrategySortOrder());
+   in.getLocalStrategySortOrder());
 
return new SingleInputPlanNode(node, "Reduce 
("+node.getOperator().getName()+")",
-   
toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
+   toReducer, DriverStrategy.SORTED_GROUP_REDUCE, 
this.keyList);
}
}
 
+   private void addCombinerNodeData(Channel in, Channel toCombiner, 
SingleInputPlanNode combiner) {
+   combiner.setCosts(new Costs(0, 0));
+   combiner.initProperties(toCombiner.getGlobalProperties(), 
toCombiner.getLocalProperties());
+   // set sorting comparator key info
+   combiner.setDriverKeyInfo(in.getLocalStrategyKeys(), 
in.getLocalStrategySortOrder(), 0);
+   // set grouping comparator key info
+   combiner.setDriverKeyInfo(this.keyList, 1);
+   }
+
+   private SingleInputPlanNode getReducerSingleInputPlanNode(Channel in, 
SingleInputNode node) {
--- End diff --

I don't think we need to move this into a function.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-02-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1553#discussion_r53151578
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
 ---
@@ -87,19 +92,39 @@ public DriverStrategy getStrategy() {
return DriverStrategy.SORTED_GROUP_REDUCE;
}
 
-   @Override
--- End diff --

Why do you remove the `@Override` annotation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-02-17 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1553#issuecomment-185170325
  
Hi @ramkrish86, thanks for the update.
In addition to my comments inline we also need to extend the `ReduceITCase`.

Also we must take care of the case where the result of the partition 
operator goes into more than one function. Consider the following case:

```
 /--fwd--> [Reduce]
[Input] --shuffle--> [Partitioner] -<
 \--fwd--> [Map]
```

which should be translated to:

```
   /--fwd--> [Combine] --shuffle--> [Partitioner] --fwd--> [Reduce]
[Input] --<
   \--shuffle--> [Partitioner] --fwd--> [Map]
```

Both translation tests need to be extended to cover this case.

Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-02-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1553#discussion_r53153472
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
 ---
@@ -434,6 +431,95 @@ public void 
testCorrectnessOfGroupreduceWithDescendingGroupSort() throws Excepti
}
 
@Test
+   public void 
testCorrectnessOfGroupreduceWithExplicitPartitionOfReducer() throws Exception {
+   /*
+* check correctness of groupReduce with descending group sort
+*/
+   final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
+
+   DataSet ds = CollectionDataSets.getStringDataSet(env);
+
+   DataSet> counts =
+   // split up the lines in pairs (2-tuples) containing: 
(word,1)
+   ds.flatMap(new Tokenizer()).partitionByHash(0)
+   // group by the tuple field "0" and sum up 
tuple field "1"
+   .groupBy(0)
+   .sum(1);
+
+   List> result = counts.collect();
+   String expected = "am,1\n"
+   +
+   "are,1\n" +
+   "comment,1\n" +
+   "fine,1\n" +
+   "hello,3\n" +
+   "hi,1\n"+
+   "how,1\n"+
+   "i,1\n"+
+   "lol,1\n"+
+   "luke,1\n"+
+   "random,1\n"+
+   "skywalker,1\n"+
+   "world,2\n"+
+   "you,1\n";
+
+   compareResultAsTuples(result, expected);
+   }
+
+   @Test
+   public void 
testCorrectnessOfGroupreduceWithoutExplicitPartitionOfReducer() throws 
Exception {
+   /*
+* check correctness of groupReduce with descending group sort
+*/
+   final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
--- End diff --

No parallelism of `1` and a custom `GroupReduceFunction` with 
`GroupCombineFunction` interface.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-02-16 Thread ramkrish86
Github user ramkrish86 commented on the pull request:

https://github.com/apache/flink/pull/1553#issuecomment-185025591
  
A new push request has been submitted. JYFI @fhueske .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-02-12 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1553#issuecomment-183230276
  
@ramkrish86, no worries :-)
I guess the issue description lacked a bit of detail. Flink's optimizer 
checks, if the partitioning produced by the explicit partitioning operator 
(hash, range, custom) can be reused for the Reduce. If not, the data is 
partitioned again and this time the combiner can be applied, since it is the 
regular.

Thanks for working on this..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-02-11 Thread ramkrish86
Github user ramkrish86 commented on the pull request:

https://github.com/apache/flink/pull/1553#issuecomment-183190815
  
@fhueske 
I got the problem that I was making. My bad. I was not applying the 
partition function on the Key ie. the String part returned from the flat map 
and hence the flow was going to the 'else' case always. Now that I got what was 
the issue I was having I think I can update this PR shortly. Thanks for the 
patient review. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-02-11 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1553#issuecomment-182969700
  
Might be a stupid question, but what if the partitioner depends on the 
number of elements. E.g. if you use `partitionCustom` with `Partitioner` which 
counts internally the elements and assigns the output channel number with 
respect to this count. In such a case, a combiner would change the result.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-02-01 Thread ramkrish86
Github user ramkrish86 commented on the pull request:

https://github.com/apache/flink/pull/1553#issuecomment-177957906
  
I went through the code. In both cases of WordCount program with and 
without explicit partition
`[Map] --hash-partition--> [Reduce]`
`[Map] --partition--> [Partition] --local-fwd--> [Reduce]`
I see that it goes to the 'else' part of the 
GroupReduceWithCombineProperties#instantiate() method. I debugged once again 
for both cases. 

> hence, the code enters the if case, because the input shipping strategy 
is FORWARD.
So with an explicit partitioner I don't see that happening. The input 
shipping strategy of the PARTITION node seems to have PARTITION_HASH as the 
strategy. 
So what am I missing here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-02-01 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1553#issuecomment-177997324
  
The `GroupReduceWithCombineProperties.instanciate()` method checks the 
shipping strategy of the input channel. In case of the `WordCount` example 
*without* explicit hash combiner, the shipping strategy is `PARTITION_HASH` and 
the `else` branch will inject a combiner. If you add an explicit partition 
operator, the input shipping strategy of the Reduce operator is `FORWARD` and 
the `if` branch is executed and does not add a combiner.

Hence the logic has to into the `if` branch and not into the `else` branch. 
Or even better add an additional condition to the `if` case 
`!(in.getSource().getOptimizerNode() instanceof PartitionNode)` and add an `if 
else` branch to handle the special case of the explicit partition operator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-01-29 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1553#issuecomment-176754489
  
You identified the right classes and methods for the fix, but the place 
within the method is not correct. Let me explain the issue.

In the common case as for example in a WordCount program, the operator 
order looks like this:
```
[Map] --hash-partition--> [Reduce]
```
in this case, a combiner will be append to the Map to reduce the data 
before it is partitioned over the network. This looks like:
```
[Map] --local-fwd--> [Combine] --hash-partition--> [Reduce]
```
In some cases, Flink knows that the data is already appropriately 
partitioned (e.g. after a join):
```
[Join] --local-fwd--> [Reduce]
```
in this case, the data is already local and no combiner needs to injected. 
The check is based on the shipping strategy of the input channel (this is the 
`if` case in `instantiate()`).

In case of an explicit partition operator, the operators look as follows:
```
[Map] --partition--> [Partition] --local-fwd--> [Reduce]
```
hence, the code enters the `if` case, because the input shipping strategy 
is `FORWARD`.
Instead we would like to inject a combiner between Map and Partition as 
follows:
```
[Map] --local-fwd--> [Combine] --partition--> [Partition] --local-fwd--> 
[Reduce]
```
Hence, we should adapt the condition to inject a combiner if the input 
strategy of Reduce is `FORWARD` and the input operator is a `PartitionNode`.

We should add appropriate tests for this feature. I suggest:
- a unit test case in `GroupReduceCompilationTest`
- a unit test case in `ReduceCompilationTest`
- an end-to-end integration test in `javaApiOperators.GroupReduceITCase` 
- an end-to-end integration test in `javaApiOperators.ReduceITCase` 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-01-29 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1553#discussion_r51259067
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
 ---
@@ -102,36 +107,72 @@ public SingleInputPlanNode instantiate(Channel in, 
SingleInputNode node) {

DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
} else {
// non forward case. all local properties are killed 
anyways, so we can safely plug in a combiner
--- End diff --

The `else` branch will not be entered if the GroupReduce's predecessor is a 
Partition operator.
You need to add an `if else` branch to the condition.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-01-29 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1553#discussion_r51259080
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
 ---
@@ -66,30 +70,62 @@ public SingleInputPlanNode instantiate(Channel in, 
SingleInputNode node) {
}
else {
// non forward case. all local properties are killed 
anyways, so we can safely plug in a combiner
--- End diff --

The `else` branch will not be entered if the Reduce's predecessor is a 
Partition operator.
You need to add an `if else` branch to the condition.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-01-29 Thread ramkrish86
Github user ramkrish86 commented on the pull request:

https://github.com/apache/flink/pull/1553#issuecomment-177080713
  
Thank you very much for the feedback. Let me try to understand this thing 
better and update the PR sooner. I will reach out here in case of any questions 
or doubts that I have. Thanks a lot.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-01-28 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1553#issuecomment-176059338
  
Thanks for the PR! 
I'll have a look at it and give feedback hopefully today or tomorrow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-01-27 Thread ramkrish86
GitHub user ramkrish86 opened a pull request:

https://github.com/apache/flink/pull/1553

FLINK-3179 Combiner is not injected if Reduce or GroupReduce input is 
explicitly partitioned (Ram)

Followed the guidance given in the description in order to fix this. Is the 
approach correct here?  Also using this to learn the code. 
Once we see that a partition node is the input of a reduce node or group 
reduce node -  we try to inject the combiner to the source node (the data 
source node) and the reducer node will take the actual partition node as the 
input. 
So now the structure would be DataSource->Combine->Partition->Reduce. 
Suggestions and feedback welcome as am not sure if I have covered all the 
cases here. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ramkrish86/flink FLINK-3179

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1553.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1553


commit ccf6e18418ac9cae1c27a5ff4399ea188b03bc0b
Author: ramkrishna 
Date:   2016-01-27T16:27:17Z

FLINK-3179 Combiner is not injected if Reduce or GroupReduce input is
explicitly partitioned (Ram)




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-01-27 Thread ramkrish86
Github user ramkrish86 commented on the pull request:

https://github.com/apache/flink/pull/1553#issuecomment-175730064
  
Also ensured that the related test cases passes and also the Wordcount 
program output with and without partition remains the same. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---