This is an automated email from the ASF dual-hosted git repository.

anton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 11f9ca5  [BEAM-7844] Custom MetadataHandler for NodeStats 
(RowRateWindow)
     new e5366eb  Merge pull request #9185 from riazela/RowRateWindow
11f9ca5 is described below

commit 11f9ca5adc656e06bcab168b23a3b6bc2ea09242
Author: Alireza Samadian <alireza4...@gmail.com>
AuthorDate: Mon Jul 29 12:54:16 2019 -0700

    [BEAM-7844] Custom MetadataHandler for NodeStats (RowRateWindow)
---
 .../extensions/sql/impl/CalciteQueryPlanner.java   |  5 +-
 .../sdk/extensions/sql/impl/planner/NodeStats.java | 86 ++++++++++++++++++++++
 .../sql/impl/planner/NodeStatsMetadata.java        | 55 ++++++++++++++
 .../sql/impl/planner/RelMdNodeStats.java           | 84 +++++++++++++++++++++
 .../sql/impl/rel/BeamAggregationRel.java           |  7 ++
 .../sdk/extensions/sql/impl/rel/BeamCalcRel.java   |  6 ++
 .../sdk/extensions/sql/impl/rel/BeamIOSinkRel.java |  7 ++
 .../extensions/sql/impl/rel/BeamIOSourceRel.java   |  6 ++
 .../extensions/sql/impl/rel/BeamIntersectRel.java  |  7 ++
 .../sdk/extensions/sql/impl/rel/BeamJoinRel.java   |  6 ++
 .../sdk/extensions/sql/impl/rel/BeamMinusRel.java  |  7 ++
 .../sdk/extensions/sql/impl/rel/BeamRelNode.java   |  4 +
 .../sdk/extensions/sql/impl/rel/BeamSortRel.java   |  7 ++
 .../extensions/sql/impl/rel/BeamUncollectRel.java  |  7 ++
 .../sdk/extensions/sql/impl/rel/BeamUnionRel.java  |  7 ++
 .../sdk/extensions/sql/impl/rel/BeamUnnestRel.java |  7 ++
 .../sdk/extensions/sql/impl/rel/BeamValuesRel.java |  7 ++
 .../extensions/sql/impl/planner/NodeStatsTest.java | 79 ++++++++++++++++++++
 18 files changed, 393 insertions(+), 1 deletion(-)

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java
index 3d4e6ca..a4ec34f 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl;
 
+import org.apache.beam.sdk.extensions.sql.impl.planner.RelMdNodeStats;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
@@ -152,7 +153,9 @@ class CalciteQueryPlanner implements QueryPlanner {
           .setMetadataProvider(
               ChainedRelMetadataProvider.of(
                   ImmutableList.of(
-                      NonCumulativeCostImpl.SOURCE, 
root.rel.getCluster().getMetadataProvider())));
+                      NonCumulativeCostImpl.SOURCE,
+                      RelMdNodeStats.SOURCE,
+                      root.rel.getCluster().getMetadataProvider())));
       RelMetadataQuery.THREAD_PROVIDERS.set(
           
JaninoRelMetadataProvider.of(root.rel.getCluster().getMetadataProvider()));
       root.rel.getCluster().invalidateMetadataQuery();
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStats.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStats.java
new file mode 100644
index 0000000..88d7ad2
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStats.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.planner;
+
+import com.google.auto.value.AutoValue;
+
+/** This is a utility class to represent rowCount, rate and window. */
+@AutoValue
+public abstract class NodeStats {
+
+  /**
+   * Returns an instance with all values set to INFINITY. This will be only 
used when the node is
+   * not a BeamRelNode and we don't have an estimation implementation for it 
in the metadata
+   * handler. In this case we return INFINITE and it will be propagated up in 
the estimates.
+   */
+  public static final NodeStats UNKNOWN =
+      create(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY, 
Double.POSITIVE_INFINITY);
+
+  public abstract double getRowCount();
+
+  public abstract double getRate();
+
+  /**
+   * This method returns the number of tuples in each window. It is different 
than the windowing
+   * notion of Beam.
+   */
+  public abstract double getWindow();
+
+  public static NodeStats create(double rowCount, double rate, double window) {
+    if (window < 0 || rate < 0 || rowCount < 0) {
+      throw new IllegalArgumentException("All the estimates in NodeStats 
should be positive");
+    }
+    return new AutoValue_NodeStats(rowCount, rate, window);
+  }
+
+  /** It creates an instance with rate=0 and window=rowCount for bounded 
sources. */
+  public static NodeStats create(double rowCount) {
+    return create(rowCount, 0d, rowCount);
+  }
+
+  /** If any of the values for rowCount, rate or window is infinite, it 
returns true. */
+  public boolean isUnknown() {
+    return Double.isInfinite(getRowCount())
+        || Double.isInfinite(getRate())
+        || Double.isInfinite(getWindow());
+  }
+
+  public NodeStats multiply(double factor) {
+    return create(getRowCount() * factor, getRate() * factor, getWindow() * 
factor);
+  }
+
+  public NodeStats plus(NodeStats that) {
+    if (this.isUnknown() || that.isUnknown()) {
+      return UNKNOWN;
+    }
+    return create(
+        this.getRowCount() + that.getRowCount(),
+        this.getRate() + that.getRate(),
+        this.getWindow() + that.getWindow());
+  }
+
+  public NodeStats minus(NodeStats that) {
+    if (this.isUnknown() || that.isUnknown()) {
+      return UNKNOWN;
+    }
+    return create(
+        Math.max(this.getRowCount() - that.getRowCount(), 0),
+        Math.max(this.getRate() - that.getRate(), 0),
+        Math.max(this.getWindow() - that.getWindow(), 0));
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStatsMetadata.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStatsMetadata.java
new file mode 100644
index 0000000..4a9e79f
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStatsMetadata.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.planner;
+
+import java.lang.reflect.Method;
+import org.apache.calcite.linq4j.tree.Types;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.Metadata;
+import org.apache.calcite.rel.metadata.MetadataDef;
+import org.apache.calcite.rel.metadata.MetadataHandler;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+
+/**
+ * This is a metadata used for row count and rate estimation. It extends 
Calcite's Metadata
+ * interface so that we can use MetadataQuery to get our own estimates.
+ */
+public interface NodeStatsMetadata extends Metadata {
+  Method METHOD = Types.lookupMethod(NodeStatsMetadata.class, "getNodeStats");
+
+  MetadataDef<NodeStatsMetadata> DEF =
+      MetadataDef.of(NodeStatsMetadata.class, NodeStatsMetadata.Handler.class, 
METHOD);
+
+  // In order to use this we need to call it by 
relNode.metadata(RowRateWindowMetadata.class,
+  // mq).getRowRateWindow() where mq is the MetadataQuery (can be obtained by
+  // relNode.getCluster().getMetadataQuery()). After this, Calcite looks for 
the implementation of
+  // this metadata that we have registered in MetadataProvider (it is 
RelMdNodeStats.class in
+  // this case and we have registered it in CalciteQueryPlanner). Then 
Calcite's generated Code
+  // decides the type of the rel node and calls appropriate method in 
RelMdNodeStats.
+  // For instance: Join is a subclass of RelNode and if we have both 
getNodeStats(RelNode rel,
+  // RelMetadataQuery mq) and getNodeStats(Join rel, RelMetadataQuery mq) then 
if the rel is an
+  // instance of Join it will call getNodeStats((Join) rel, mq).
+  // Currently we only register it in SQLTransform path. JDBC does not 
register this and it does not
+  // use it. (because it does not register the our NonCumulativeMetadata 
implementation either).
+  NodeStats getNodeStats();
+
+  /** Handler API. */
+  interface Handler extends MetadataHandler<NodeStatsMetadata> {
+    NodeStats getNodeStats(RelNode r, RelMetadataQuery mq);
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/RelMdNodeStats.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/RelMdNodeStats.java
new file mode 100644
index 0000000..c01fbb5
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/RelMdNodeStats.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.planner;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.MetadataDef;
+import org.apache.calcite.rel.metadata.MetadataHandler;
+import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+
+/**
+ * This is the implementation of NodeStatsMetadata. Methods to estimate rate 
and row count for
+ * Calcite's logical nodes be implemented here.
+ */
+public class RelMdNodeStats implements MetadataHandler<NodeStatsMetadata> {
+
+  public static final RelMetadataProvider SOURCE =
+      ReflectiveRelMetadataProvider.reflectiveSource(
+          NodeStatsMetadata.METHOD, new RelMdNodeStats());
+
+  @Override
+  public MetadataDef<NodeStatsMetadata> getDef() {
+    return NodeStatsMetadata.DEF;
+  }
+
+  @SuppressWarnings("UnusedDeclaration")
+  public NodeStats getNodeStats(RelNode rel, RelMetadataQuery mq) {
+
+    if (rel instanceof BeamRelNode) {
+      return this.getBeamNodeStats((BeamRelNode) rel, mq);
+    }
+
+    // We can later define custom methods for all different RelNodes to 
prevent hitting this point.
+    // Similar to RelMdRowCount in calcite.
+
+    return NodeStats.UNKNOWN;
+  }
+
+  private NodeStats getBeamNodeStats(BeamRelNode rel, RelMetadataQuery mq) {
+
+    // Removing the unknown results.
+    // Calcite caches previous results in mq.map. This is done to prevent 
cyclic calls of this
+    // method and also improving the performance. However, we might have 
returned an unknown result
+    // because one of the inputs of the node was unknown (it is a logical node 
that we have not
+    // implemented getNodeStats for it). Later we should not get the Unknown, 
therefore we need to
+    // remove unknown results everyTime that this method is called.
+    // Results are also cached in CachingRelMetadataProvider because calcite 
PlannerImpl#Transform
+    // wraps the metadata provider with CachingRelMetadataProvider. However,
+    // CachingRelMetadataProvider checks timestamp before returning previous 
results. Therefore,
+    // there wouldn't be a problem in that case.
+    List<List> keys =
+        mq.map.entrySet().stream()
+            .filter(entry -> entry.getValue() instanceof NodeStats)
+            .filter(entry -> ((NodeStats) entry.getValue()).isUnknown())
+            .map(Map.Entry::getKey)
+            .collect(Collectors.toList());
+
+    for (List key : keys) {
+      mq.map.remove(key);
+    }
+
+    return rel.estimateNodeStats(mq);
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
index 15038f7..14e7475 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
@@ -24,6 +24,7 @@ import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
 import java.io.Serializable;
 import java.util.List;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
 import 
org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.schemas.Schema;
@@ -55,6 +56,7 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.joda.time.Duration;
 
@@ -81,6 +83,11 @@ public class BeamAggregationRel extends Aggregate implements 
BeamRelNode {
   }
 
   @Override
+  public NodeStats estimateNodeStats(RelMetadataQuery mq) {
+    return NodeStats.create(mq.getRowCount(this));
+  }
+
+  @Override
   public RelWriter explainTerms(RelWriter pw) {
     super.explainTerms(pw);
     if (this.windowFn != null) {
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
index f8f69a5..31ae94a 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
@@ -34,6 +34,7 @@ import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.extensions.sql.impl.planner.BeamJavaTypeFactory;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.CharType;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.DateType;
@@ -213,6 +214,11 @@ public class BeamCalcRel extends Calc implements 
BeamRelNode {
     throw new RuntimeException("Could not get the limit count from a non 
BeamSortRel input.");
   }
 
+  @Override
+  public NodeStats estimateNodeStats(RelMetadataQuery mq) {
+    return NodeStats.create(mq.getRowCount(this));
+  }
+
   public boolean isInputSortRelAndLimitOnly() {
     return (input instanceof BeamSortRel) && ((BeamSortRel) 
input).isLimitOnly();
   }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
index 2c77f56..5aaa635 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
@@ -22,6 +22,7 @@ import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
 import org.apache.beam.sdk.extensions.sql.impl.rule.BeamIOSinkRule;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
@@ -34,6 +35,7 @@ import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.prepare.Prepare;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql2rel.RelStructuredTypeFlattener;
 
@@ -71,6 +73,11 @@ public class BeamIOSinkRel extends TableModify
   }
 
   @Override
+  public NodeStats estimateNodeStats(RelMetadataQuery mq) {
+    return NodeStats.create(mq.getRowCount(this));
+  }
+
+  @Override
   public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
     boolean flattened = isFlattened() || isFlattening;
     BeamIOSinkRel newRel =
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
index e22b64b..d87f152 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteTable;
 import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
@@ -64,6 +65,11 @@ public class BeamIOSourceRel extends TableScan implements 
BeamRelNode {
   }
 
   @Override
+  public NodeStats estimateNodeStats(RelMetadataQuery mq) {
+    return NodeStats.create(mq.getRowCount(this));
+  }
+
+  @Override
   public PCollection.IsBounded isBounded() {
     return beamTable.isBounded();
   }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
index cc2590e..a90972a 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
@@ -27,6 +28,7 @@ import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Intersect;
 import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
 
 /**
  * {@code BeamRelNode} to replace a {@code Intersect} node.
@@ -49,4 +51,9 @@ public class BeamIntersectRel extends Intersect implements 
BeamRelNode {
   public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
     return new BeamSetOperatorRelBase(this, 
BeamSetOperatorRelBase.OpType.INTERSECT, all);
   }
+
+  @Override
+  public NodeStats estimateNodeStats(RelMetadataQuery mq) {
+    return NodeStats.create(mq.getRowCount(this));
+  }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
index 94c5a6b..167b8a5 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
@@ -31,6 +31,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.extensions.sql.BeamSqlSeekableTable;
 import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
 import org.apache.beam.sdk.extensions.sql.impl.transform.BeamJoinTransforms;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.schemas.Schema;
@@ -141,6 +142,11 @@ public class BeamJoinRel extends Join implements 
BeamRelNode {
     return super.computeSelfCost(planner, mq);
   }
 
+  @Override
+  public NodeStats estimateNodeStats(RelMetadataQuery mq) {
+    return NodeStats.create(mq.getRowCount(this));
+  }
+
   /**
    * This method checks if a join is legal and can be converted into Beam SQL. 
It is used during
    * planning and applying {@link
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
index 482b8be..0a8103d 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
@@ -27,6 +28,7 @@ import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Minus;
 import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
 
 /**
  * {@code BeamRelNode} to replace a {@code Minus} node.
@@ -49,4 +51,9 @@ public class BeamMinusRel extends Minus implements 
BeamRelNode {
   public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
     return new BeamSetOperatorRelBase(this, 
BeamSetOperatorRelBase.OpType.MINUS, all);
   }
+
+  @Override
+  public NodeStats estimateNodeStats(RelMetadataQuery mq) {
+    return NodeStats.create(mq.getRowCount(this));
+  }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
index 29e53d1..8b1d577 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
@@ -19,11 +19,13 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.Row;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
 
 /** A {@link RelNode} that can also give a {@link PTransform} that implements 
the expression. */
 public interface BeamRelNode extends RelNode {
@@ -61,4 +63,6 @@ public interface BeamRelNode extends RelNode {
     }
     return options;
   }
+
+  NodeStats estimateNodeStats(RelMetadataQuery mq);
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
index 57270c3..f252f11 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.state.StateSpec;
@@ -56,6 +57,7 @@ import org.apache.calcite.rel.RelCollationImpl;
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
@@ -134,6 +136,11 @@ public class BeamSortRel extends Sort implements 
BeamRelNode {
     }
   }
 
+  @Override
+  public NodeStats estimateNodeStats(RelMetadataQuery mq) {
+    return NodeStats.create(mq.getRowCount(this));
+  }
+
   public boolean isLimitOnly() {
     return fieldIndices.isEmpty();
   }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRel.java
index 6424d9a..28d2d69 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRel.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
 
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -31,6 +32,7 @@ import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Uncollect;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
 
 /** {@link BeamRelNode} to implement an uncorrelated {@link Uncollect}, aka 
UNNEST. */
 public class BeamUncollectRel extends Uncollect implements BeamRelNode {
@@ -72,6 +74,11 @@ public class BeamUncollectRel extends Uncollect implements 
BeamRelNode {
     }
   }
 
+  @Override
+  public NodeStats estimateNodeStats(RelMetadataQuery mq) {
+    return NodeStats.create(mq.getRowCount(this));
+  }
+
   private static class UncollectDoFn extends DoFn<Row, Row> {
 
     private final Schema schema;
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
index 175b139..95a1826 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.values.PCollection;
@@ -28,6 +29,7 @@ import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.SetOp;
 import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
 
 /**
  * {@link BeamRelNode} to replace a {@link Union}.
@@ -76,4 +78,9 @@ public class BeamUnionRel extends Union implements 
BeamRelNode {
   public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
     return new BeamSetOperatorRelBase(this, 
BeamSetOperatorRelBase.OpType.UNION, all);
   }
+
+  @Override
+  public NodeStats estimateNodeStats(RelMetadataQuery mq) {
+    return NodeStats.create(mq.getRowCount(this));
+  }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnnestRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnnestRel.java
index da675d1..b51df06 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnnestRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnnestRel.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import java.util.List;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -35,6 +36,7 @@ import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.core.Correlate;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.core.Uncollect;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.validate.SqlValidatorUtil;
 
@@ -75,6 +77,11 @@ public class BeamUnnestRel extends Uncollect implements 
BeamRelNode {
   }
 
   @Override
+  public NodeStats estimateNodeStats(RelMetadataQuery mq) {
+    return NodeStats.create(mq.getRowCount(this));
+  }
+
+  @Override
   public RelWriter explainTerms(RelWriter pw) {
     return super.explainTerms(pw).item("unnestIndex", 
Integer.toString(unnestIndex));
   }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
index 3b285c4..c4405ba 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
@@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableList;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.IntStream;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.transforms.Create;
@@ -37,6 +38,7 @@ import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Immutabl
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.core.Values;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexLiteral;
 
@@ -91,4 +93,9 @@ public class BeamValuesRel extends Values implements 
BeamRelNode {
         .mapToObj(i -> autoCastField(schema.getField(i), 
tuple.get(i).getValue()))
         .collect(toRow(schema));
   }
+
+  @Override
+  public NodeStats estimateNodeStats(RelMetadataQuery mq) {
+    return NodeStats.create(mq.getRowCount(this));
+  }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStatsTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStatsTest.java
new file mode 100644
index 0000000..820f4fc
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStatsTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.planner;
+
+import org.apache.beam.sdk.extensions.sql.impl.rel.BaseRelTest;
+import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/** This tests the NodeStats Metadata handler and the estimations. */
+public class NodeStatsTest extends BaseRelTest {
+  static class UnknownRel extends SingleRel {
+    protected UnknownRel(RelOptCluster cluster, RelTraitSet traits, RelNode 
input) {
+      super(cluster, traits, input);
+    }
+  }
+
+  public static final TestBoundedTable ORDER_DETAILS1 =
+      TestBoundedTable.of(
+              Schema.FieldType.INT32, "order_id",
+              Schema.FieldType.INT32, "site_id",
+              Schema.FieldType.INT32, "price")
+          .addRows(1, 2, 3, 2, 3, 3, 3, 4, 5);
+
+  public static final TestBoundedTable ORDER_DETAILS2 =
+      TestBoundedTable.of(
+              Schema.FieldType.INT32, "order_id",
+              Schema.FieldType.INT32, "site_id",
+              Schema.FieldType.INT32, "price")
+          .addRows(1, 2, 3, 2, 3, 3, 3, 4, 5);
+
+  @BeforeClass
+  public static void prepare() {
+    registerTable("ORDER_DETAILS1", ORDER_DETAILS1);
+    registerTable("ORDER_DETAILS2", ORDER_DETAILS2);
+  }
+
+  @Test
+  public void testUnknownRel() {
+    String sql = " select * from ORDER_DETAILS1 ";
+    RelNode root = env.parseQuery(sql);
+    RelNode unknown = new UnknownRel(root.getCluster(), null, null);
+    NodeStats nodeStats =
+        unknown
+            .metadata(NodeStatsMetadata.class, 
unknown.getCluster().getMetadataQuery())
+            .getNodeStats();
+    Assert.assertTrue(nodeStats.isUnknown());
+  }
+
+  @Test
+  public void testKnownRel() {
+    String sql = " select * from ORDER_DETAILS1 ";
+    RelNode root = env.parseQuery(sql);
+    NodeStats nodeStats =
+        root.metadata(NodeStatsMetadata.class, 
root.getCluster().getMetadataQuery()).getNodeStats();
+    Assert.assertFalse(nodeStats.isUnknown());
+  }
+}

Reply via email to