[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1553#issuecomment-199798994 Closed this PR in favor of PR #1822 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1553 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1553#issuecomment-198364412 Hi @ramkrish86, I thought about this PR and came to the conclusion that we should not continue. The optimizer's design does not allow to modify operators in or inject operators into enumerated subplans. This might cause invalid execution plans and in worst case wrong results without somebody noticing it. I would simply log a WARN message that a combiner was not added if the optimizer identifies a Partition operator in front of a Reduce or combinable GroupReduce operator and give a hint that an explicit CombinerFunction can be added with groupCombine in front of the partition operator. Sorry again @ramkrish86 that I lead you into a dead end with this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user ramkrish86 commented on the pull request: https://github.com/apache/flink/pull/1553#issuecomment-193665240 @fhueske Everthing is a learning. good that I got to know some flows out of this issue. Ya am interested to take up some other JIRA in the meantime. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1553#issuecomment-193658558 Hi @ramkrish86, I totally understand that you are disappointed. I'm very sorry to raise these concerns this late after you put a lot of effort into this PR. I should have noticed this issue much earlier :-( Touching the optimizer is always a little bit like open heart surgery and must be done very carefully with the whole picture in mind. I have not completely investigated the possible side effects yet, but will definitely let you know once I have. Would you like to work on a different issue in the meantime? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user ramkrish86 commented on the pull request: https://github.com/apache/flink/pull/1553#issuecomment-193595361 @fhueske I was looking into the comments and the refactoring I can avoid by creating a new patch altogether. But saying the last comment I think I can hold this off for some time. Felt sad as I wanted to bring this to closure. Anyway you are the expert so I need to adhere to your words here. No problem. Will check if I can take up some other JIRA or do you have any suggestions till then? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1553#issuecomment-193272515 Hi Ram, I just realized that the approach taken here might not work. We are modifying the plan while it is enumerated. There might be cases, where this leads to compiler errors or wrong plans. I have to check which side effects the plan modification might have. I would suggest we put this PR for a few days on ice and I check whether it is possible to continue or if we have to find another approach. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1553#discussion_r55208348 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java --- @@ -126,12 +117,66 @@ public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { } toReducer.setLocalStrategy(LocalStrategy.COMBININGSORT, in.getLocalStrategyKeys(), in.getLocalStrategySortOrder()); - return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList); } } + private SingleInputPlanNode injectCombinerBeforPartitioner(Channel in, SingleInputNode node) { + // Inject a combiner before the partition node + Channel channelWithPartitionSrc = new Channel(in.getSource()); + GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode(); + combinerNode.setParallelism(in.getSource().getParallelism()); + if(in.getSource().getInputs().iterator().hasNext()) { + Channel oldChannelToPartitioner = channelWithPartitionSrc.getSource().getInputs().iterator().next(); + Channel toCombiner = new Channel(oldChannelToPartitioner.getSource()); +// A combiner plan node is created with the channel as input that has the partitioner as the target + toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); +SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator() +.getName()+")", toCombiner, DriverStrategy.SORTED_GROUP_COMBINE); +setCombinerProperties(in, oldChannelToPartitioner, combiner); + + Channel toPartitioner = new Channel(combiner); + // Set the actual partitioner node's strategy key and strategy order + toPartitioner.setShipStrategy(oldChannelToPartitioner.getShipStrategy(), oldChannelToPartitioner.getShipStrategyKeys(), + oldChannelToPartitioner.getShipStrategySortOrder(), oldChannelToPartitioner.getDataExchangeMode()); +// Create the partition single input plan node from the existing partition node +PlanNode partitionplanNode = in.getSource().getPlanNode(); +SingleInputPlanNode partition = new SingleInputPlanNode(in.getSource().getOptimizerNode(), partitionplanNode.getNodeName(), +toPartitioner, partitionplanNode.getDriverStrategy()); +partition.setCosts(partitionplanNode.getNodeCosts()); + partition.initProperties(partitionplanNode.getGlobalProperties(), partitionplanNode.getLocalProperties()); +// Create a reducer such that the input of the reducer is the partition node +Channel toReducer = new Channel(partition); +toReducer.setShipStrategy(in.getShipStrategy(), in.getShipStrategyKeys(), +in.getShipStrategySortOrder(), in.getDataExchangeMode()); +return getReducerSingleInputPlanNode(toReducer, node); +} else { +return getReducerSingleInputPlanNode(in, node); +} + } + + private void setCombinerProperties(Channel in, Channel oldChannelToPartitioner, SingleInputPlanNode combiner) { + combiner.setCosts(new Costs(0, 0)); + combiner.initProperties(oldChannelToPartitioner.getGlobalProperties(), oldChannelToPartitioner.getLocalProperties()); + // set sorting comparator key info + combiner.setDriverKeyInfo(in.getLocalStrategyKeys(), in.getLocalStrategySortOrder(), 0); + // set grouping comparator key info + combiner.setDriverKeyInfo(this.keyList, 1); + } + + private SingleInputPlanNode getReducerSingleInputPlanNode(Channel in, SingleInputNode node) { --- End diff -- Please change parameter names: in -> toReducer, node -> reduceNode --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1553#discussion_r55208210 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java --- @@ -126,12 +117,66 @@ public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { } toReducer.setLocalStrategy(LocalStrategy.COMBININGSORT, in.getLocalStrategyKeys(), in.getLocalStrategySortOrder()); - return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList); } } + private SingleInputPlanNode injectCombinerBeforPartitioner(Channel in, SingleInputNode node) { + // Inject a combiner before the partition node + Channel channelWithPartitionSrc = new Channel(in.getSource()); + GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode(); + combinerNode.setParallelism(in.getSource().getParallelism()); + if(in.getSource().getInputs().iterator().hasNext()) { + Channel oldChannelToPartitioner = channelWithPartitionSrc.getSource().getInputs().iterator().next(); + Channel toCombiner = new Channel(oldChannelToPartitioner.getSource()); +// A combiner plan node is created with the channel as input that has the partitioner as the target + toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); +SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator() +.getName()+")", toCombiner, DriverStrategy.SORTED_GROUP_COMBINE); +setCombinerProperties(in, oldChannelToPartitioner, combiner); + + Channel toPartitioner = new Channel(combiner); + // Set the actual partitioner node's strategy key and strategy order + toPartitioner.setShipStrategy(oldChannelToPartitioner.getShipStrategy(), oldChannelToPartitioner.getShipStrategyKeys(), + oldChannelToPartitioner.getShipStrategySortOrder(), oldChannelToPartitioner.getDataExchangeMode()); +// Create the partition single input plan node from the existing partition node +PlanNode partitionplanNode = in.getSource().getPlanNode(); +SingleInputPlanNode partition = new SingleInputPlanNode(in.getSource().getOptimizerNode(), partitionplanNode.getNodeName(), +toPartitioner, partitionplanNode.getDriverStrategy()); +partition.setCosts(partitionplanNode.getNodeCosts()); + partition.initProperties(partitionplanNode.getGlobalProperties(), partitionplanNode.getLocalProperties()); +// Create a reducer such that the input of the reducer is the partition node +Channel toReducer = new Channel(partition); +toReducer.setShipStrategy(in.getShipStrategy(), in.getShipStrategyKeys(), +in.getShipStrategySortOrder(), in.getDataExchangeMode()); +return getReducerSingleInputPlanNode(toReducer, node); +} else { +return getReducerSingleInputPlanNode(in, node); +} + } + + private void setCombinerProperties(Channel in, Channel oldChannelToPartitioner, SingleInputPlanNode combiner) { --- End diff -- please use meaningful parameter names for `in` and `oldChannelToPartitioner`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1553#discussion_r55205009 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java --- @@ -126,12 +117,66 @@ public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { } toReducer.setLocalStrategy(LocalStrategy.COMBININGSORT, in.getLocalStrategyKeys(), in.getLocalStrategySortOrder()); - return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList); } } + private SingleInputPlanNode injectCombinerBeforPartitioner(Channel in, SingleInputNode node) { + // Inject a combiner before the partition node + Channel channelWithPartitionSrc = new Channel(in.getSource()); --- End diff -- This channel is not really used and should not be created --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1553#discussion_r55205422 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java --- @@ -126,12 +117,66 @@ public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { } toReducer.setLocalStrategy(LocalStrategy.COMBININGSORT, in.getLocalStrategyKeys(), in.getLocalStrategySortOrder()); - return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList); } } + private SingleInputPlanNode injectCombinerBeforPartitioner(Channel in, SingleInputNode node) { + // Inject a combiner before the partition node + Channel channelWithPartitionSrc = new Channel(in.getSource()); + GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode(); + combinerNode.setParallelism(in.getSource().getParallelism()); + if(in.getSource().getInputs().iterator().hasNext()) { + Channel oldChannelToPartitioner = channelWithPartitionSrc.getSource().getInputs().iterator().next(); + Channel toCombiner = new Channel(oldChannelToPartitioner.getSource()); +// A combiner plan node is created with the channel as input that has the partitioner as the target + toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); +SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator() +.getName()+")", toCombiner, DriverStrategy.SORTED_GROUP_COMBINE); +setCombinerProperties(in, oldChannelToPartitioner, combiner); + + Channel toPartitioner = new Channel(combiner); + // Set the actual partitioner node's strategy key and strategy order + toPartitioner.setShipStrategy(oldChannelToPartitioner.getShipStrategy(), oldChannelToPartitioner.getShipStrategyKeys(), + oldChannelToPartitioner.getShipStrategySortOrder(), oldChannelToPartitioner.getDataExchangeMode()); +// Create the partition single input plan node from the existing partition node +PlanNode partitionplanNode = in.getSource().getPlanNode(); --- End diff -- Indention is off (please check all your changes) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1553#discussion_r55205324 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java --- @@ -126,12 +117,66 @@ public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { } toReducer.setLocalStrategy(LocalStrategy.COMBININGSORT, in.getLocalStrategyKeys(), in.getLocalStrategySortOrder()); - return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList); } } + private SingleInputPlanNode injectCombinerBeforPartitioner(Channel in, SingleInputNode node) { + // Inject a combiner before the partition node + Channel channelWithPartitionSrc = new Channel(in.getSource()); + GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode(); + combinerNode.setParallelism(in.getSource().getParallelism()); + if(in.getSource().getInputs().iterator().hasNext()) { + Channel oldChannelToPartitioner = channelWithPartitionSrc.getSource().getInputs().iterator().next(); + Channel toCombiner = new Channel(oldChannelToPartitioner.getSource()); +// A combiner plan node is created with the channel as input that has the partitioner as the target + toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); +SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator() +.getName()+")", toCombiner, DriverStrategy.SORTED_GROUP_COMBINE); +setCombinerProperties(in, oldChannelToPartitioner, combiner); + + Channel toPartitioner = new Channel(combiner); + // Set the actual partitioner node's strategy key and strategy order + toPartitioner.setShipStrategy(oldChannelToPartitioner.getShipStrategy(), oldChannelToPartitioner.getShipStrategyKeys(), + oldChannelToPartitioner.getShipStrategySortOrder(), oldChannelToPartitioner.getDataExchangeMode()); +// Create the partition single input plan node from the existing partition node +PlanNode partitionplanNode = in.getSource().getPlanNode(); +SingleInputPlanNode partition = new SingleInputPlanNode(in.getSource().getOptimizerNode(), partitionplanNode.getNodeName(), +toPartitioner, partitionplanNode.getDriverStrategy()); +partition.setCosts(partitionplanNode.getNodeCosts()); + partition.initProperties(partitionplanNode.getGlobalProperties(), partitionplanNode.getLocalProperties()); +// Create a reducer such that the input of the reducer is the partition node +Channel toReducer = new Channel(partition); +toReducer.setShipStrategy(in.getShipStrategy(), in.getShipStrategyKeys(), +in.getShipStrategySortOrder(), in.getDataExchangeMode()); +return getReducerSingleInputPlanNode(toReducer, node); +} else { +return getReducerSingleInputPlanNode(in, node); --- End diff -- in.getSource is the partitioner an should thus have exactly one input. If this is not the case, something is wrong and we should throw a `CompilerException`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1553#discussion_r55205361 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java --- @@ -126,12 +117,66 @@ public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { } toReducer.setLocalStrategy(LocalStrategy.COMBININGSORT, in.getLocalStrategyKeys(), in.getLocalStrategySortOrder()); - return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList); } } + private SingleInputPlanNode injectCombinerBeforPartitioner(Channel in, SingleInputNode node) { + // Inject a combiner before the partition node + Channel channelWithPartitionSrc = new Channel(in.getSource()); + GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode(); + combinerNode.setParallelism(in.getSource().getParallelism()); + if(in.getSource().getInputs().iterator().hasNext()) { + Channel oldChannelToPartitioner = channelWithPartitionSrc.getSource().getInputs().iterator().next(); + Channel toCombiner = new Channel(oldChannelToPartitioner.getSource()); +// A combiner plan node is created with the channel as input that has the partitioner as the target + toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); +SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator() --- End diff -- Indention is off --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1553#discussion_r55205091 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java --- @@ -126,12 +117,66 @@ public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { } toReducer.setLocalStrategy(LocalStrategy.COMBININGSORT, in.getLocalStrategyKeys(), in.getLocalStrategySortOrder()); - return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList); } } + private SingleInputPlanNode injectCombinerBeforPartitioner(Channel in, SingleInputNode node) { --- End diff -- Please use meaningful parameter name: in -> toReducer, node -> reduceNode --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1553#discussion_r55204942 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java --- @@ -126,12 +117,66 @@ public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { } toReducer.setLocalStrategy(LocalStrategy.COMBININGSORT, in.getLocalStrategyKeys(), in.getLocalStrategySortOrder()); - return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList); } } + private SingleInputPlanNode injectCombinerBeforPartitioner(Channel in, SingleInputNode node) { --- End diff -- Typo in method name injectCombinerBefor**e**Partitioner --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user ramkrish86 commented on the pull request: https://github.com/apache/flink/pull/1553#issuecomment-192131350 New PR submitted @fhueske . Thanks for helping me thro this code review. It is was more of a beginner and there is a lot to learn from my side. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1553#issuecomment-191752181 Sorry, I forgot a `groupBy()` in my example. It should be ``` DataSet> data = ... DataSet> pData = data.partitionByHash(0); pData.map(new SomeMapFunc()) .output(new DiscardingOutputFormat>()); pData.groupBy(0) .groupReduce(new SomeCombinableGReduceFunc()) .output(new DiscardingOutputFormat>()); ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user ramkrish86 commented on the pull request: https://github.com/apache/flink/pull/1553#issuecomment-191748107 @fhueske So for doing the above example where the partioned input goes both to the map and reducer as input should this class AllGroupWithPartialPreGroupProperties be changed like GroupReduceCombineProperties? I can find both having similar code and the code flow too goes there only? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1553#issuecomment-190124885 I do not have a concrete use case in mind, but it is certainly possible to implement such a job in the DataSet API. Hence, it should be correctly translated. You can do this for example like this: ``` DataSet> data = ... DataSet> pData = data.partitionByHash(0); pData.map(new SomeMapFunc()) .output(new DiscardingOutputFormat>()); pData.groupReduce(new SomeCombinableGReduceFunc()) .output(new DiscardingOutputFormat>()); ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1553#discussion_r54386465 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java --- @@ -87,19 +92,39 @@ public DriverStrategy getStrategy() { return DriverStrategy.SORTED_GROUP_REDUCE; } - @Override public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { if (in.getShipStrategy() == ShipStrategyType.FORWARD) { // adjust a sort (changes grouping, so it must be for this driver to combining sort - if (in.getLocalStrategy() == LocalStrategy.SORT) { - if (!in.getLocalStrategyKeys().isValidUnorderedPrefix(this.keys)) { - throw new RuntimeException("Bug: Inconsistent sort for group strategy."); + if(in.getSource().getOptimizerNode() instanceof PartitionNode) { + // Inject a combiner before the partition node + Channel toCombiner = new Channel(in.getSource()); + toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); + GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode(); + combinerNode.setParallelism(in.getSource().getParallelism()); + if(toCombiner.getSource().getInputs().iterator().hasNext()) { + Channel source = toCombiner.getSource().getInputs().iterator().next(); + // A combiner plan node is created with the map as the input + SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator() + .getName()+")", source, DriverStrategy.SORTED_GROUP_COMBINE); + addCombinerNodeData(in, toCombiner, combiner); + Channel combinerChannel = new Channel(combiner); + combinerChannel.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); --- End diff -- If we have: `[Some-Op] --(a)-partition--> [Partition-Op] --(b)-fwd--> [Reduce-Op]` then `in` is the `--(b)-fwd-->` channel and `node` is the the `[Reduce-Op]`. The combine operator should be inserted like this: `[Some-Op] --(1)-fwd--> [Combine-Op] --(2)-partition--> [Partition-Op] --(3)-fwd--> [Reduce-Op]` - The channel (1) must be a new forward/pipelined channel. - The channel (2) must be new channel with the same shipping and exchange strategies as channel (a). - The channel (3) should be the original channel (b) which is the `in` parameter. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/1553#discussion_r54329786 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java --- @@ -87,19 +92,39 @@ public DriverStrategy getStrategy() { return DriverStrategy.SORTED_GROUP_REDUCE; } - @Override public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { if (in.getShipStrategy() == ShipStrategyType.FORWARD) { // adjust a sort (changes grouping, so it must be for this driver to combining sort - if (in.getLocalStrategy() == LocalStrategy.SORT) { - if (!in.getLocalStrategyKeys().isValidUnorderedPrefix(this.keys)) { - throw new RuntimeException("Bug: Inconsistent sort for group strategy."); + if(in.getSource().getOptimizerNode() instanceof PartitionNode) { + // Inject a combiner before the partition node + Channel toCombiner = new Channel(in.getSource()); + toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); + GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode(); + combinerNode.setParallelism(in.getSource().getParallelism()); + if(toCombiner.getSource().getInputs().iterator().hasNext()) { + Channel source = toCombiner.getSource().getInputs().iterator().next(); + // A combiner plan node is created with the map as the input + SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator() + .getName()+")", source, DriverStrategy.SORTED_GROUP_COMBINE); + addCombinerNodeData(in, toCombiner, combiner); + Channel combinerChannel = new Channel(combiner); + combinerChannel.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); --- End diff -- If am not wrong, the ShipStrategyType and DataExchangeMode that we set to the CombinerChannel should be the one associated with the 'in' node that is passed on to the method? Which in the case of Wordcount example is FORWARD and PIPELINED? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/1553#discussion_r54329750 --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java --- @@ -434,6 +431,95 @@ public void testCorrectnessOfGroupreduceWithDescendingGroupSort() throws Excepti } @Test + public void testCorrectnessOfGroupreduceWithExplicitPartitionOfReducer() throws Exception { + /* +* check correctness of groupReduce with descending group sort +*/ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); --- End diff -- Ya. Copying and adapting the test is what I meant too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user ramkrish86 commented on the pull request: https://github.com/apache/flink/pull/1553#issuecomment-189599549 @fhueske Could you give some examples for the above use case where the partition is an input to more than one function? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1553#discussion_r53998405 --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java --- @@ -434,6 +431,95 @@ public void testCorrectnessOfGroupreduceWithDescendingGroupSort() throws Excepti } @Test + public void testCorrectnessOfGroupreduceWithExplicitPartitionOfReducer() throws Exception { + /* +* check correctness of groupReduce with descending group sort +*/ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); --- End diff -- I'd rather copy the test and adapt the copy. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/1553#discussion_r53978205 --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java --- @@ -434,6 +431,95 @@ public void testCorrectnessOfGroupreduceWithDescendingGroupSort() throws Excepti } @Test + public void testCorrectnessOfGroupreduceWithExplicitPartitionOfReducer() throws Exception { + /* +* check correctness of groupReduce with descending group sort +*/ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); --- End diff -- Is it ok if I modify the existing test testCorrectnessOfGroupReduceOnTuplesWithCombine() and add a partitionByHash to the input dataset? I tested and debugged it and it seems to work fine after the changes and gives the same expected result as given by testCorrectnessOfGroupReduceOnTuplesWithCombine(). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/1553#discussion_r53659684 --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java --- @@ -434,6 +431,95 @@ public void testCorrectnessOfGroupreduceWithDescendingGroupSort() throws Excepti } @Test + public void testCorrectnessOfGroupreduceWithExplicitPartitionOfReducer() throws Exception { + /* +* check correctness of groupReduce with descending group sort +*/ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); --- End diff -- I see. Okie --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/1553#discussion_r53659650 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java --- @@ -87,19 +92,39 @@ public DriverStrategy getStrategy() { return DriverStrategy.SORTED_GROUP_REDUCE; } - @Override public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { if (in.getShipStrategy() == ShipStrategyType.FORWARD) { // adjust a sort (changes grouping, so it must be for this driver to combining sort - if (in.getLocalStrategy() == LocalStrategy.SORT) { - if (!in.getLocalStrategyKeys().isValidUnorderedPrefix(this.keys)) { - throw new RuntimeException("Bug: Inconsistent sort for group strategy."); + if(in.getSource().getOptimizerNode() instanceof PartitionNode) { + // Inject a combiner before the partition node + Channel toCombiner = new Channel(in.getSource()); + toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); + GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode(); + combinerNode.setParallelism(in.getSource().getParallelism()); + if(toCombiner.getSource().getInputs().iterator().hasNext()) { + Channel source = toCombiner.getSource().getInputs().iterator().next(); + // A combiner plan node is created with the map as the input + SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator() --- End diff -- Thanks for all the comments. Checking one by one. I have few questions. Let me get back to you after fixing the other comments. Sorry about the formatting things. I will take care from next time so that they are not repeated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1553#discussion_r53153223 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java --- @@ -59,37 +65,69 @@ public DriverStrategy getStrategy() { @Override public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { if (in.getShipStrategy() == ShipStrategyType.FORWARD || - (node.getBroadcastConnections() != null && !node.getBroadcastConnections().isEmpty())) + (node.getBroadcastConnections() != null && !node.getBroadcastConnections().isEmpty())) { + // adjust a sort (changes grouping, so it must be for this driver to combining sort + if(in.getSource().getOptimizerNode() instanceof PartitionNode) { + Channel toCombiner = new Channel(in.getSource()); + toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); + // create an input node for combine with same parallelism as input node + ReduceNode combinerNode = ((ReduceNode) node).getCombinerUtilityNode(); + combinerNode.setParallelism(in.getSource().getParallelism()); + if(toCombiner.getSource().getInputs().iterator().hasNext()) { + Channel source = toCombiner.getSource().getInputs().iterator().next(); + SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, + "Combine ("+node.getOperator().getName()+")", source, + DriverStrategy.SORTED_PARTIAL_REDUCE, this.keyList); + addCombinerProperties(toCombiner, combiner); + Channel combinerChannel = new Channel(combiner); + combinerChannel.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); + // Create the partition single input plan node from the existing partition node + PlanNode partitionplanNode = in.getSource().getPlanNode(); + SingleInputPlanNode partition = new SingleInputPlanNode(in.getSource().getOptimizerNode(), partitionplanNode.getNodeName(), + combinerChannel, partitionplanNode.getDriverStrategy()); + partition.setCosts(partitionplanNode.getNodeCosts()); + partition.initProperties(partitionplanNode.getGlobalProperties(), partitionplanNode.getLocalProperties()); + // Create a reducer such that the input of the reducer is the partition node + Channel toReducer = new Channel(partition); + toReducer.setShipStrategy(in.getShipStrategy(), in.getShipStrategyKeys(), + in.getShipStrategySortOrder(), in.getDataExchangeMode()); + return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", toReducer, + DriverStrategy.SORTED_REDUCE, this.keyList); + } + } return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", in, - DriverStrategy.SORTED_REDUCE, this.keyList); + DriverStrategy.SORTED_REDUCE, this.keyList); } else { // non forward case. all local properties are killed anyways, so we can safely plug in a combiner Channel toCombiner = new Channel(in.getSource()); toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - + // create an input node for combine with same parallelism as input node ReduceNode combinerNode = ((ReduceNode) node).getCombinerUtilityNode(); combinerNode.setParallelism(in.getSource().getParallelism()); SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, - "Combine ("+node.getOperator().getName()+")", toCombiner, --- End diff -- No reformatting please. --- If your project is set up for i
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1553#discussion_r53153386 --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java --- @@ -434,6 +431,95 @@ public void testCorrectnessOfGroupreduceWithDescendingGroupSort() throws Excepti } @Test + public void testCorrectnessOfGroupreduceWithExplicitPartitionOfReducer() throws Exception { + /* +* check correctness of groupReduce with descending group sort +*/ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); --- End diff -- If we run this test with parallelism `1`, we cannot check if the partitioning is working correctly. Also use a custom `GroupReduceFunction` that implements the `GroupCombineFunction` interface. The combine function should be implemented in a way that we can actually verify that it was called. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1553#discussion_r53152587 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java --- @@ -87,19 +92,39 @@ public DriverStrategy getStrategy() { return DriverStrategy.SORTED_GROUP_REDUCE; } - @Override public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { if (in.getShipStrategy() == ShipStrategyType.FORWARD) { // adjust a sort (changes grouping, so it must be for this driver to combining sort - if (in.getLocalStrategy() == LocalStrategy.SORT) { - if (!in.getLocalStrategyKeys().isValidUnorderedPrefix(this.keys)) { - throw new RuntimeException("Bug: Inconsistent sort for group strategy."); + if(in.getSource().getOptimizerNode() instanceof PartitionNode) { + // Inject a combiner before the partition node + Channel toCombiner = new Channel(in.getSource()); + toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); + GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode(); + combinerNode.setParallelism(in.getSource().getParallelism()); + if(toCombiner.getSource().getInputs().iterator().hasNext()) { + Channel source = toCombiner.getSource().getInputs().iterator().next(); --- End diff -- This is the channel with the partitioner as target. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1553#discussion_r53152922 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java --- @@ -87,19 +92,39 @@ public DriverStrategy getStrategy() { return DriverStrategy.SORTED_GROUP_REDUCE; } - @Override public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { if (in.getShipStrategy() == ShipStrategyType.FORWARD) { // adjust a sort (changes grouping, so it must be for this driver to combining sort - if (in.getLocalStrategy() == LocalStrategy.SORT) { - if (!in.getLocalStrategyKeys().isValidUnorderedPrefix(this.keys)) { - throw new RuntimeException("Bug: Inconsistent sort for group strategy."); + if(in.getSource().getOptimizerNode() instanceof PartitionNode) { + // Inject a combiner before the partition node + Channel toCombiner = new Channel(in.getSource()); + toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); + GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode(); + combinerNode.setParallelism(in.getSource().getParallelism()); + if(toCombiner.getSource().getInputs().iterator().hasNext()) { + Channel source = toCombiner.getSource().getInputs().iterator().next(); + // A combiner plan node is created with the map as the input + SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator() + .getName()+")", source, DriverStrategy.SORTED_GROUP_COMBINE); + addCombinerNodeData(in, toCombiner, combiner); + Channel combinerChannel = new Channel(combiner); + combinerChannel.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); --- End diff -- Note that the actual partitioning is defined on the `Channel` that points to the `Partitioner`, i.e., the data is already partitioned when it arrives at the partitioner. Therefore, this channel must reuse the original `ShipStrategyType` and `DataExchangeMode` of the channel pointing to `Partition`. Setting it to `FORWARD` will remove the partitioning. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1553#discussion_r53152668 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java --- @@ -87,19 +92,39 @@ public DriverStrategy getStrategy() { return DriverStrategy.SORTED_GROUP_REDUCE; } - @Override public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { if (in.getShipStrategy() == ShipStrategyType.FORWARD) { // adjust a sort (changes grouping, so it must be for this driver to combining sort - if (in.getLocalStrategy() == LocalStrategy.SORT) { - if (!in.getLocalStrategyKeys().isValidUnorderedPrefix(this.keys)) { - throw new RuntimeException("Bug: Inconsistent sort for group strategy."); + if(in.getSource().getOptimizerNode() instanceof PartitionNode) { + // Inject a combiner before the partition node + Channel toCombiner = new Channel(in.getSource()); + toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); + GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode(); + combinerNode.setParallelism(in.getSource().getParallelism()); + if(toCombiner.getSource().getInputs().iterator().hasNext()) { + Channel source = toCombiner.getSource().getInputs().iterator().next(); + // A combiner plan node is created with the map as the input + SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator() --- End diff -- We should use a new channel here instead of the existing `source` channel. In the other branch, the new channel is called `toCombiner`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1553#discussion_r53153193 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java --- @@ -59,37 +65,69 @@ public DriverStrategy getStrategy() { @Override public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { if (in.getShipStrategy() == ShipStrategyType.FORWARD || - (node.getBroadcastConnections() != null && !node.getBroadcastConnections().isEmpty())) + (node.getBroadcastConnections() != null && !node.getBroadcastConnections().isEmpty())) { + // adjust a sort (changes grouping, so it must be for this driver to combining sort + if(in.getSource().getOptimizerNode() instanceof PartitionNode) { --- End diff -- Please refactor the code of this branch into a method called `injectCombinerBeforePartitioner()`. Some of the comments in the other class apply here as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1553#discussion_r53152430 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java --- @@ -87,19 +92,39 @@ public DriverStrategy getStrategy() { return DriverStrategy.SORTED_GROUP_REDUCE; } - @Override public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { if (in.getShipStrategy() == ShipStrategyType.FORWARD) { // adjust a sort (changes grouping, so it must be for this driver to combining sort - if (in.getLocalStrategy() == LocalStrategy.SORT) { - if (!in.getLocalStrategyKeys().isValidUnorderedPrefix(this.keys)) { - throw new RuntimeException("Bug: Inconsistent sort for group strategy."); + if(in.getSource().getOptimizerNode() instanceof PartitionNode) { + // Inject a combiner before the partition node + Channel toCombiner = new Channel(in.getSource()); --- End diff -- This channel is also not used in the plan. It is just used for look-ups. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1553#discussion_r53152240 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java --- @@ -87,19 +92,39 @@ public DriverStrategy getStrategy() { return DriverStrategy.SORTED_GROUP_REDUCE; } - @Override public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { if (in.getShipStrategy() == ShipStrategyType.FORWARD) { // adjust a sort (changes grouping, so it must be for this driver to combining sort - if (in.getLocalStrategy() == LocalStrategy.SORT) { - if (!in.getLocalStrategyKeys().isValidUnorderedPrefix(this.keys)) { - throw new RuntimeException("Bug: Inconsistent sort for group strategy."); + if(in.getSource().getOptimizerNode() instanceof PartitionNode) { + // Inject a combiner before the partition node + Channel toCombiner = new Channel(in.getSource()); + toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); + GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode(); + combinerNode.setParallelism(in.getSource().getParallelism()); + if(toCombiner.getSource().getInputs().iterator().hasNext()) { + Channel source = toCombiner.getSource().getInputs().iterator().next(); + // A combiner plan node is created with the map as the input --- End diff -- The operator before the partitioner is not necessarily a `mapper`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1553#discussion_r53152054 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java --- @@ -110,28 +135,49 @@ public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { combinerNode.setParallelism(in.getSource().getParallelism()); SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator() - .getName()+")", toCombiner, DriverStrategy.SORTED_GROUP_COMBINE); + .getName()+")", toCombiner, DriverStrategy.SORTED_GROUP_COMBINE); combiner.setCosts(new Costs(0, 0)); --- End diff -- use the `setCombinerProperties()` method here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1553#discussion_r53152023 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java --- @@ -110,28 +135,49 @@ public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { combinerNode.setParallelism(in.getSource().getParallelism()); SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator() - .getName()+")", toCombiner, DriverStrategy.SORTED_GROUP_COMBINE); + .getName()+")", toCombiner, DriverStrategy.SORTED_GROUP_COMBINE); combiner.setCosts(new Costs(0, 0)); combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties()); // set sorting comparator key info combiner.setDriverKeyInfo(in.getLocalStrategyKeys(), in.getLocalStrategySortOrder(), 0); // set grouping comparator key info combiner.setDriverKeyInfo(this.keyList, 1); - + Channel toReducer = new Channel(combiner); toReducer.setShipStrategy(in.getShipStrategy(), in.getShipStrategyKeys(), - in.getShipStrategySortOrder(), in.getDataExchangeMode()); + in.getShipStrategySortOrder(), in.getDataExchangeMode()); if (in.getShipStrategy() == ShipStrategyType.PARTITION_RANGE) { toReducer.setDataDistribution(in.getDataDistribution()); } toReducer.setLocalStrategy(LocalStrategy.COMBININGSORT, in.getLocalStrategyKeys(), - in.getLocalStrategySortOrder()); + in.getLocalStrategySortOrder()); return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", - toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList); + toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList); } } + private void addCombinerNodeData(Channel in, Channel toCombiner, SingleInputPlanNode combiner) { --- End diff -- please rename method to `setCombinerProperties()` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1553#discussion_r53151692 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java --- @@ -87,19 +92,39 @@ public DriverStrategy getStrategy() { return DriverStrategy.SORTED_GROUP_REDUCE; } - @Override public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { if (in.getShipStrategy() == ShipStrategyType.FORWARD) { // adjust a sort (changes grouping, so it must be for this driver to combining sort - if (in.getLocalStrategy() == LocalStrategy.SORT) { - if (!in.getLocalStrategyKeys().isValidUnorderedPrefix(this.keys)) { - throw new RuntimeException("Bug: Inconsistent sort for group strategy."); + if(in.getSource().getOptimizerNode() instanceof PartitionNode) { + // Inject a combiner before the partition node + Channel toCombiner = new Channel(in.getSource()); --- End diff -- bad variable name. This channel will not lead to the combiner. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1553#discussion_r53152093 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java --- @@ -110,28 +135,49 @@ public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { combinerNode.setParallelism(in.getSource().getParallelism()); SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator() - .getName()+")", toCombiner, DriverStrategy.SORTED_GROUP_COMBINE); + .getName()+")", toCombiner, DriverStrategy.SORTED_GROUP_COMBINE); combiner.setCosts(new Costs(0, 0)); combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties()); // set sorting comparator key info combiner.setDriverKeyInfo(in.getLocalStrategyKeys(), in.getLocalStrategySortOrder(), 0); // set grouping comparator key info combiner.setDriverKeyInfo(this.keyList, 1); - + Channel toReducer = new Channel(combiner); toReducer.setShipStrategy(in.getShipStrategy(), in.getShipStrategyKeys(), - in.getShipStrategySortOrder(), in.getDataExchangeMode()); --- End diff -- Please avoid reformatting changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1553#discussion_r53151777 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java --- @@ -87,19 +92,39 @@ public DriverStrategy getStrategy() { return DriverStrategy.SORTED_GROUP_REDUCE; } - @Override public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { if (in.getShipStrategy() == ShipStrategyType.FORWARD) { // adjust a sort (changes grouping, so it must be for this driver to combining sort - if (in.getLocalStrategy() == LocalStrategy.SORT) { - if (!in.getLocalStrategyKeys().isValidUnorderedPrefix(this.keys)) { - throw new RuntimeException("Bug: Inconsistent sort for group strategy."); + if(in.getSource().getOptimizerNode() instanceof PartitionNode) { --- End diff -- I think we can move the code of this branch into a method called `injectCombinerBeforePartitioner()`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1553#discussion_r53151545 --- Diff: flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java --- @@ -66,7 +66,7 @@ public static void main(String[] args) throws Exception { DataSet> counts = // split up the lines in pairs (2-tuples) containing: (word,1) - text.flatMap(new Tokenizer()) + (text.flatMap(new Tokenizer())) --- End diff -- Unnecessary change --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1553#discussion_r53151628 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java --- @@ -110,28 +135,49 @@ public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { combinerNode.setParallelism(in.getSource().getParallelism()); SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator() - .getName()+")", toCombiner, DriverStrategy.SORTED_GROUP_COMBINE); + .getName()+")", toCombiner, DriverStrategy.SORTED_GROUP_COMBINE); combiner.setCosts(new Costs(0, 0)); combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties()); // set sorting comparator key info combiner.setDriverKeyInfo(in.getLocalStrategyKeys(), in.getLocalStrategySortOrder(), 0); // set grouping comparator key info combiner.setDriverKeyInfo(this.keyList, 1); - + Channel toReducer = new Channel(combiner); toReducer.setShipStrategy(in.getShipStrategy(), in.getShipStrategyKeys(), - in.getShipStrategySortOrder(), in.getDataExchangeMode()); + in.getShipStrategySortOrder(), in.getDataExchangeMode()); if (in.getShipStrategy() == ShipStrategyType.PARTITION_RANGE) { toReducer.setDataDistribution(in.getDataDistribution()); } toReducer.setLocalStrategy(LocalStrategy.COMBININGSORT, in.getLocalStrategyKeys(), - in.getLocalStrategySortOrder()); + in.getLocalStrategySortOrder()); return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", - toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList); + toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList); } } + private void addCombinerNodeData(Channel in, Channel toCombiner, SingleInputPlanNode combiner) { + combiner.setCosts(new Costs(0, 0)); + combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties()); + // set sorting comparator key info + combiner.setDriverKeyInfo(in.getLocalStrategyKeys(), in.getLocalStrategySortOrder(), 0); + // set grouping comparator key info + combiner.setDriverKeyInfo(this.keyList, 1); + } + + private SingleInputPlanNode getReducerSingleInputPlanNode(Channel in, SingleInputNode node) { --- End diff -- I don't think we need to move this into a function. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1553#discussion_r53151578 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java --- @@ -87,19 +92,39 @@ public DriverStrategy getStrategy() { return DriverStrategy.SORTED_GROUP_REDUCE; } - @Override --- End diff -- Why do you remove the `@Override` annotation? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1553#issuecomment-185170325 Hi @ramkrish86, thanks for the update. In addition to my comments inline we also need to extend the `ReduceITCase`. Also we must take care of the case where the result of the partition operator goes into more than one function. Consider the following case: ``` /--fwd--> [Reduce] [Input] --shuffle--> [Partitioner] -< \--fwd--> [Map] ``` which should be translated to: ``` /--fwd--> [Combine] --shuffle--> [Partitioner] --fwd--> [Reduce] [Input] --< \--shuffle--> [Partitioner] --fwd--> [Map] ``` Both translation tests need to be extended to cover this case. Thanks, Fabian --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1553#discussion_r53153472 --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java --- @@ -434,6 +431,95 @@ public void testCorrectnessOfGroupreduceWithDescendingGroupSort() throws Excepti } @Test + public void testCorrectnessOfGroupreduceWithExplicitPartitionOfReducer() throws Exception { + /* +* check correctness of groupReduce with descending group sort +*/ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + DataSet ds = CollectionDataSets.getStringDataSet(env); + + DataSet> counts = + // split up the lines in pairs (2-tuples) containing: (word,1) + ds.flatMap(new Tokenizer()).partitionByHash(0) + // group by the tuple field "0" and sum up tuple field "1" + .groupBy(0) + .sum(1); + + List> result = counts.collect(); + String expected = "am,1\n" + + + "are,1\n" + + "comment,1\n" + + "fine,1\n" + + "hello,3\n" + + "hi,1\n"+ + "how,1\n"+ + "i,1\n"+ + "lol,1\n"+ + "luke,1\n"+ + "random,1\n"+ + "skywalker,1\n"+ + "world,2\n"+ + "you,1\n"; + + compareResultAsTuples(result, expected); + } + + @Test + public void testCorrectnessOfGroupreduceWithoutExplicitPartitionOfReducer() throws Exception { + /* +* check correctness of groupReduce with descending group sort +*/ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); --- End diff -- No parallelism of `1` and a custom `GroupReduceFunction` with `GroupCombineFunction` interface. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user ramkrish86 commented on the pull request: https://github.com/apache/flink/pull/1553#issuecomment-185025591 A new push request has been submitted. JYFI @fhueske . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1553#issuecomment-183230276 @ramkrish86, no worries :-) I guess the issue description lacked a bit of detail. Flink's optimizer checks, if the partitioning produced by the explicit partitioning operator (hash, range, custom) can be reused for the Reduce. If not, the data is partitioned again and this time the combiner can be applied, since it is the regular. Thanks for working on this.. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user ramkrish86 commented on the pull request: https://github.com/apache/flink/pull/1553#issuecomment-183190815 @fhueske I got the problem that I was making. My bad. I was not applying the partition function on the Key ie. the String part returned from the flat map and hence the flow was going to the 'else' case always. Now that I got what was the issue I was having I think I can update this PR shortly. Thanks for the patient review. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1553#issuecomment-183014873 If a `Partitioner` is implemented such that is does not partition based on the key attribute, it cannot be used for a Reduce or GroupReduce transformation anyways. Also users should expect that a combiner is applied if a `ReduceFunction` or a `GroupReduceFunction` that implements a combine interface is used. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1553#issuecomment-182969700 Might be a stupid question, but what if the partitioner depends on the number of elements. E.g. if you use `partitionCustom` with `Partitioner` which counts internally the elements and assigns the output channel number with respect to this count. In such a case, a combiner would change the result. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1553#issuecomment-177997324 The `GroupReduceWithCombineProperties.instanciate()` method checks the shipping strategy of the input channel. In case of the `WordCount` example *without* explicit hash combiner, the shipping strategy is `PARTITION_HASH` and the `else` branch will inject a combiner. If you add an explicit partition operator, the input shipping strategy of the Reduce operator is `FORWARD` and the `if` branch is executed and does not add a combiner. Hence the logic has to into the `if` branch and not into the `else` branch. Or even better add an additional condition to the `if` case `!(in.getSource().getOptimizerNode() instanceof PartitionNode)` and add an `if else` branch to handle the special case of the explicit partition operator. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user ramkrish86 commented on the pull request: https://github.com/apache/flink/pull/1553#issuecomment-177957906 I went through the code. In both cases of WordCount program with and without explicit partition `[Map] --hash-partition--> [Reduce]` `[Map] --partition--> [Partition] --local-fwd--> [Reduce]` I see that it goes to the 'else' part of the GroupReduceWithCombineProperties#instantiate() method. I debugged once again for both cases. > hence, the code enters the if case, because the input shipping strategy is FORWARD. So with an explicit partitioner I don't see that happening. The input shipping strategy of the PARTITION node seems to have PARTITION_HASH as the strategy. So what am I missing here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user ramkrish86 commented on the pull request: https://github.com/apache/flink/pull/1553#issuecomment-177080713 Thank you very much for the feedback. Let me try to understand this thing better and update the PR sooner. I will reach out here in case of any questions or doubts that I have. Thanks a lot. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1553#issuecomment-176754489 You identified the right classes and methods for the fix, but the place within the method is not correct. Let me explain the issue. In the common case as for example in a WordCount program, the operator order looks like this: ``` [Map] --hash-partition--> [Reduce] ``` in this case, a combiner will be append to the Map to reduce the data before it is partitioned over the network. This looks like: ``` [Map] --local-fwd--> [Combine] --hash-partition--> [Reduce] ``` In some cases, Flink knows that the data is already appropriately partitioned (e.g. after a join): ``` [Join] --local-fwd--> [Reduce] ``` in this case, the data is already local and no combiner needs to injected. The check is based on the shipping strategy of the input channel (this is the `if` case in `instantiate()`). In case of an explicit partition operator, the operators look as follows: ``` [Map] --partition--> [Partition] --local-fwd--> [Reduce] ``` hence, the code enters the `if` case, because the input shipping strategy is `FORWARD`. Instead we would like to inject a combiner between Map and Partition as follows: ``` [Map] --local-fwd--> [Combine] --partition--> [Partition] --local-fwd--> [Reduce] ``` Hence, we should adapt the condition to inject a combiner if the input strategy of Reduce is `FORWARD` and the input operator is a `PartitionNode`. We should add appropriate tests for this feature. I suggest: - a unit test case in `GroupReduceCompilationTest` - a unit test case in `ReduceCompilationTest` - an end-to-end integration test in `javaApiOperators.GroupReduceITCase` - an end-to-end integration test in `javaApiOperators.ReduceITCase` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1553#discussion_r51259080 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java --- @@ -66,30 +70,62 @@ public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { } else { // non forward case. all local properties are killed anyways, so we can safely plug in a combiner --- End diff -- The `else` branch will not be entered if the Reduce's predecessor is a Partition operator. You need to add an `if else` branch to the condition. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1553#discussion_r51259067 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java --- @@ -102,36 +107,72 @@ public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { DriverStrategy.SORTED_GROUP_REDUCE, this.keyList); } else { // non forward case. all local properties are killed anyways, so we can safely plug in a combiner --- End diff -- The `else` branch will not be entered if the GroupReduce's predecessor is a Partition operator. You need to add an `if else` branch to the condition. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1553#issuecomment-176059338 Thanks for the PR! I'll have a look at it and give feedback hopefully today or tomorrow. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
Github user ramkrish86 commented on the pull request: https://github.com/apache/flink/pull/1553#issuecomment-175730064 Also ensured that the related test cases passes and also the Wordcount program output with and without partition remains the same. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...
GitHub user ramkrish86 opened a pull request: https://github.com/apache/flink/pull/1553 FLINK-3179 Combiner is not injected if Reduce or GroupReduce input is explicitly partitioned (Ram) Followed the guidance given in the description in order to fix this. Is the approach correct here? Also using this to learn the code. Once we see that a partition node is the input of a reduce node or group reduce node - we try to inject the combiner to the source node (the data source node) and the reducer node will take the actual partition node as the input. So now the structure would be DataSource->Combine->Partition->Reduce. Suggestions and feedback welcome as am not sure if I have covered all the cases here. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ramkrish86/flink FLINK-3179 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1553.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1553 commit ccf6e18418ac9cae1c27a5ff4399ea188b03bc0b Author: ramkrishna Date: 2016-01-27T16:27:17Z FLINK-3179 Combiner is not injected if Reduce or GroupReduce input is explicitly partitioned (Ram) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---