tledkov-gridgain commented on a change in pull request #8683:
URL: https://github.com/apache/ignite/pull/8683#discussion_r586179150



##########
File path: 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregate.java
##########
@@ -17,258 +17,112 @@
 
 package org.apache.ignite.internal.processors.query.calcite.rel;
 
-import java.util.ArrayList;
 import java.util.List;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelTrait;
 import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelCollations;
-import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.calcite.util.ImmutableIntList;
-import org.apache.calcite.util.Pair;
-import org.apache.calcite.util.mapping.Mappings;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
-import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
-import 
org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
-import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
-import 
org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.NotNull;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
 
-import static org.apache.calcite.plan.RelOptRule.convert;
-import static org.apache.calcite.rel.RelDistribution.Type.HASH_DISTRIBUTED;
-import static org.apache.calcite.util.ImmutableIntList.range;
-import static 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.broadcast;
-import static 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.hash;
-import static 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.random;
-import static 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.single;
 import static 
org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
 
 /**
  *
  */
-public class IgniteAggregate extends Aggregate implements TraitsAwareIgniteRel 
{
+public abstract class IgniteAggregate extends Aggregate {
     /** {@inheritDoc} */
-    public IgniteAggregate(RelOptCluster cluster, RelTraitSet traitSet, 
RelNode input, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, 
List<AggregateCall> aggCalls) {
+    protected IgniteAggregate(
+        RelOptCluster cluster,
+        RelTraitSet traitSet,
+        RelNode input,
+        ImmutableBitSet groupSet,
+        List<ImmutableBitSet> groupSets,
+        List<AggregateCall> aggCalls
+    ) {
         super(cluster, traitSet, ImmutableList.of(), input, groupSet, 
groupSets, aggCalls);
     }
 
     /** {@inheritDoc} */
-    public IgniteAggregate(RelInput input) {
+    protected IgniteAggregate(RelInput input) {
         super(changeTraits(input, IgniteConvention.INSTANCE));
     }
 
-    /** {@inheritDoc} */
-    @Override public Aggregate copy(RelTraitSet traitSet, RelNode input, 
ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> 
aggCalls) {
-        return new IgniteAggregate(getCluster(), traitSet, input, groupSet, 
groupSets, aggCalls);
-    }
-
-    /** {@inheritDoc} */
-    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
-        return visitor.visit(this);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
passThroughDistribution(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
-        // Distribution propagation is based on next rules:
-        // 1) Any aggregation is possible on single or broadcast distribution.
-        // 2) hash-distributed aggregation is possible in case it's a simple 
aggregate having hash distributed input
-        //    and all of input distribution keys are parts of aggregation 
group and vice versa.
-        // 3) Map-reduce aggregation is possible in case it's a simple 
aggregate and its input has random distribution.
-
-        RelTraitSet in = inputTraits.get(0);
-
-        IgniteDistribution distribution = TraitUtils.distribution(nodeTraits);
-
-        RelDistribution.Type distrType = distribution.getType();
-
-        switch (distrType) {
-            case SINGLETON:
-            case BROADCAST_DISTRIBUTED:
-                return Pair.of(nodeTraits, 
ImmutableList.of(in.replace(distribution)));
-
-            case RANDOM_DISTRIBUTED:
-                if (!groupSet.isEmpty() && isSimple(this)) {
-                    IgniteDistribution outDistr = hash(range(0, 
groupSet.cardinality()));
-                    IgniteDistribution inDistr = hash(groupSet.asList());
-
-                    return Pair.of(nodeTraits.replace(outDistr), 
ImmutableList.of(in.replace(inDistr)));
-                }
-
-                break;
-
-            case HASH_DISTRIBUTED:
-                ImmutableIntList keys = distribution.getKeys();
-
-                if (isSimple(this) && groupSet.cardinality() == keys.size()) {
-                    Mappings.TargetMapping mapping = Commons.inverseMapping(
-                        groupSet, getInput().getRowType().getFieldCount());
-
-                    List<Integer> srcKeys = new ArrayList<>(keys.size());
-
-                    for (int key : keys) {
-                        int src = mapping.getSourceOpt(key);
-
-                        if (src == -1)
-                            break;
-
-                        srcKeys.add(src);
-                    }
-
-                    if (srcKeys.size() == keys.size())
-                        return Pair.of(nodeTraits, 
ImmutableList.of(in.replace(hash(srcKeys, distribution.function()))));
-                }
-
-                break;
-
-            default:
-                break;
-        }
-
-        return Pair.of(nodeTraits.replace(single()), 
ImmutableList.of(in.replace(single())));
-    }
-
-    /** {@inheritDoc} */
-    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
-        // Since it's a hash aggregate it erases collation.
-        return Pair.of(nodeTraits.replace(RelCollations.EMPTY),
-            ImmutableList.of(inputTraits.get(0).replace(RelCollations.EMPTY)));
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
deriveRewindability(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
-        // Aggregate is rewindable if its input is rewindable.
-
-        RelTraitSet in = inputTraits.get(0);
-
-        RewindabilityTrait rewindability = isMapReduce(nodeTraits, in)
-            ? RewindabilityTrait.ONE_WAY
-            : TraitUtils.rewindability(in);
-
-        return ImmutableList.of(Pair.of(nodeTraits.replace(rewindability), 
ImmutableList.of(in.replace(rewindability))));
+    /** */
+    @Override public double estimateRowCount(RelMetadataQuery mq) {
+        if (groupSet.cardinality() == 0)
+            return 1;
+
+        Double groupsCnt = mq.getDistinctRowCount(getInput(), groupSet, null);
+
+        // Estimation of the groups count is not available.
+        // Use heuristic estimation for result rows count.
+        if (groupsCnt == null)
+            return super.estimateRowCount(mq);
+        else
+            return groupsCnt;
     }
 
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
deriveDistribution(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
-        // Distribution propagation is based on next rules:
-        // 1) Any aggregation is possible on single or broadcast distribution.
-        // 2) hash-distributed aggregation is possible in case it's a simple 
aggregate having hash distributed input
-        //    and all of input distribution keys are parts aggregation group.
-        // 3) Map-reduce aggregation is possible in case it's a simple 
aggregate and its input has random distribution.
-
-        RelTraitSet in = inputTraits.get(0);
-
-        List<Pair<RelTraitSet, List<RelTraitSet>>> res = new ArrayList<>();
-
-        IgniteDistribution distribution = TraitUtils.distribution(in);
-
-        RelDistribution.Type distrType = distribution.getType();
-
-        switch (distrType) {
-            case SINGLETON:
-            case BROADCAST_DISTRIBUTED:
-                res.add(Pair.of(nodeTraits.replace(distribution), 
ImmutableList.of(in)));
-
-                break;
-
-            case HASH_DISTRIBUTED:
-                if (isSimple(this)) {
-                    ImmutableIntList keys = distribution.getKeys();
-
-                    if (groupSet.cardinality() == keys.size()) {
-                        Mappings.TargetMapping mapping = 
Commons.inverseMapping(
-                            groupSet, getInput().getRowType().getFieldCount());
-
-                        IgniteDistribution outDistr = 
distribution.apply(mapping);
-
-                        if (outDistr.getType() == HASH_DISTRIBUTED)
-                            res.add(Pair.of(nodeTraits.replace(outDistr), 
ImmutableList.of(in)));
-                    }
-                }
-
-                break;
-
-            case RANDOM_DISTRIBUTED:
-                // Map-reduce aggregates
-                if (isSimple(this)) {
-                    res.add(Pair.of(nodeTraits.replace(single()), 
ImmutableList.of(in.replace(random()))));
-                    res.add(Pair.of(nodeTraits.replace(broadcast()), 
ImmutableList.of(in.replace(random()))));
-                }
-
-                break;
-
-            default:
-                break;
+    /** */
+    public double estimateMemoryForGroup(RelMetadataQuery mq) {
+        if (aggCalls.isEmpty())
+            return groupSet.cardinality() * IgniteCost.AVERAGE_FIELD_SIZE;

Review comment:
       changed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to