This is an automated email from the ASF dual-hosted git repository.

anton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a0d9c4  [BEAM-7783] Adding BeamTableStatistics.
     new ff7a803  Merge pull request #9104 from riazela/BeamTableStatistics
3a0d9c4 is described below

commit 3a0d9c4fe16ced30a223557a4ad531365d4977ec
Author: Alireza Samadian <alireza4...@gmail.com>
AuthorDate: Thu Jul 18 16:53:54 2019 -0700

    [BEAM-7783] Adding BeamTableStatistics.
---
 .../apache/beam/sdk/io/TextRowCountEstimator.java  |  6 +-
 .../beam/sdk/io/TextRowCountEstimatorTest.java     | 12 +--
 .../beam/sdk/extensions/sql/BeamSqlTable.java      |  6 +-
 .../sdk/extensions/sql/impl/BeamCalciteTable.java  | 10 +--
 .../sql/impl/BeamRowCountStatistics.java           | 44 ----------
 .../extensions/sql/impl/BeamTableStatistics.java   | 93 ++++++++++++++++++++++
 .../extensions/sql/impl/rel/BeamIOSourceRel.java   | 24 ++++--
 .../sql/meta/provider/bigquery/BigQueryTable.java  | 14 ++--
 .../sql/meta/provider/test/TestTableProvider.java  |  9 +--
 .../sql/meta/provider/text/TextTable.java          | 15 ++--
 .../sql/impl/rule/JoinReorderingTest.java          | 16 ++--
 .../meta/provider/bigquery/BigQueryRowCountIT.java | 13 ++-
 .../meta/provider/bigquery/BigQueryTestTable.java  |  4 +-
 13 files changed, 160 insertions(+), 106 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java
index d220505..ad26fb1 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java
@@ -72,7 +72,7 @@ public abstract class TextRowCountEstimator {
    * @throws 
org.apache.beam.sdk.io.TextRowCountEstimator.NoEstimationException if all the 
sampled
    *     lines are empty and we have not read all the lines in the matched 
files.
    */
-  public Long estimateRowCount(PipelineOptions pipelineOptions)
+  public Double estimateRowCount(PipelineOptions pipelineOptions)
       throws IOException, NoEstimationException {
     long linesSize = 0;
     int numberOfReadLines = 0;
@@ -129,7 +129,7 @@ public abstract class TextRowCountEstimator {
     }
 
     if (numberOfReadLines == 0 && sampledEverything) {
-      return 0L;
+      return 0d;
     }
 
     if (numberOfReadLines == 0) {
@@ -138,7 +138,7 @@ public abstract class TextRowCountEstimator {
     }
 
     // This is total file sizes divided by average line size.
-    return totalFileSizes * numberOfReadLines / linesSize;
+    return (double) totalFileSizes * numberOfReadLines / linesSize;
   }
 
   /** Builder for {@link org.apache.beam.sdk.io.TextRowCountEstimator}. */
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextRowCountEstimatorTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextRowCountEstimatorTest.java
index b7e3f8e..6f53d1e 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextRowCountEstimatorTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextRowCountEstimatorTest.java
@@ -59,16 +59,16 @@ public class TextRowCountEstimatorTest {
     writer.close();
     TextRowCountEstimator textRowCountEstimator =
         
TextRowCountEstimator.builder().setFilePattern(temporaryFolder.getRoot() + 
"/**").build();
-    Long rows = 
textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create());
+    Double rows = 
textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create());
     Assert.assertNotNull(rows);
-    Assert.assertEquals(150L, rows.longValue());
+    Assert.assertEquals(150d, rows, 0.01);
   }
 
   @Test(expected = FileNotFoundException.class)
   public void testEmptyFolder() throws Exception {
     TextRowCountEstimator textRowCountEstimator =
         
TextRowCountEstimator.builder().setFilePattern(temporaryFolder.getRoot() + 
"/**").build();
-    Long rows = 
textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create());
+    Double rows = 
textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create());
   }
 
   @Test
@@ -82,8 +82,8 @@ public class TextRowCountEstimatorTest {
     writer.close();
     TextRowCountEstimator textRowCountEstimator =
         
TextRowCountEstimator.builder().setFilePattern(temporaryFolder.getRoot() + 
"/**").build();
-    Long rows = 
textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create());
-    Assert.assertEquals(0L, rows.longValue());
+    Double rows = 
textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create());
+    Assert.assertEquals(0d, rows, 0.01);
   }
 
   @Test(expected = TextRowCountEstimator.NoEstimationException.class)
@@ -110,7 +110,7 @@ public class TextRowCountEstimatorTest {
         TextRowCountEstimator.builder()
             .setFilePattern(temporaryFolder.getRoot() + "/something/**")
             .build();
-    Long rows = 
textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create());
+    Double rows = 
textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create());
     Assert.assertNull(rows);
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
index 63f7158..b759761 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.sdk.extensions.sql;
 
-import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.values.PBegin;
@@ -40,7 +40,7 @@ public interface BeamSqlTable {
   Schema getSchema();
 
   /** Estimates the number of rows or returns null if there is no estimation. 
*/
-  default BeamRowCountStatistics getRowCount(PipelineOptions options) {
-    return BeamRowCountStatistics.UNKNOWN;
+  default BeamTableStatistics getRowCount(PipelineOptions options) {
+    return BeamTableStatistics.UNKNOWN;
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
index 5aa0f27..293a60b 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
@@ -26,7 +26,6 @@ import 
org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSinkRel;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.options.PipelineOptions;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.apache.calcite.adapter.java.AbstractQueryableTable;
 import org.apache.calcite.linq4j.QueryProvider;
@@ -42,7 +41,6 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.schema.ModifiableTable;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Statistic;
-import org.apache.calcite.schema.Statistics;
 import org.apache.calcite.schema.TranslatableTable;
 
 /** Adapter from {@link BeamSqlTable} to a calcite Table. */
@@ -91,10 +89,7 @@ public class BeamCalciteTable extends AbstractQueryableTable
     final ClassLoader originalClassLoader = 
Thread.currentThread().getContextClassLoader();
     try {
       
Thread.currentThread().setContextClassLoader(BeamEnumerableConverter.class.getClassLoader());
-      BeamRowCountStatistics beamStatistics = 
beamTable.getRowCount(getPipelineOptions());
-      return beamStatistics.isUnknown()
-          ? Statistics.UNKNOWN
-          : Statistics.of(beamStatistics.getRowCount().doubleValue(), 
ImmutableList.of());
+      return beamTable.getRowCount(getPipelineOptions());
     } finally {
       Thread.currentThread().setContextClassLoader(originalClassLoader);
     }
@@ -102,7 +97,8 @@ public class BeamCalciteTable extends AbstractQueryableTable
 
   @Override
   public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable 
relOptTable) {
-    return new BeamIOSourceRel(context.getCluster(), relOptTable, beamTable, 
pipelineOptionsMap);
+    return new BeamIOSourceRel(
+        context.getCluster(), relOptTable, beamTable, pipelineOptionsMap, 
this);
   }
 
   @Override
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamRowCountStatistics.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamRowCountStatistics.java
deleted file mode 100644
index ac0431d..0000000
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamRowCountStatistics.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.impl;
-
-import java.io.Serializable;
-import java.math.BigInteger;
-
-/** This class stores row count statistics. */
-public class BeamRowCountStatistics implements Serializable {
-  public static final BeamRowCountStatistics UNKNOWN = new 
BeamRowCountStatistics(null);
-  private final BigInteger rowCount;
-
-  private BeamRowCountStatistics(BigInteger rowCount) {
-    this.rowCount = rowCount;
-  }
-
-  public static BeamRowCountStatistics createBoundedTableStatistics(BigInteger 
rowCount) {
-    return new BeamRowCountStatistics(rowCount);
-  }
-
-  /** Is true if the row count cannot be estimated. */
-  public boolean isUnknown() {
-    return rowCount == null;
-  }
-
-  public BigInteger getRowCount() {
-    return rowCount;
-  }
-}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamTableStatistics.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamTableStatistics.java
new file mode 100644
index 0000000..c010604
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamTableStatistics.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelDistributionTraitDef;
+import org.apache.calcite.rel.RelReferentialConstraint;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.util.ImmutableBitSet;
+
+/** This class stores row count statistics. */
+@Experimental
+@Internal
+public class BeamTableStatistics implements Serializable, Statistic {
+  public static final BeamTableStatistics UNKNOWN = new 
BeamTableStatistics(100d, 0d, true);
+  public static final BeamTableStatistics UNBOUNDED_UNKNOWN =
+      new BeamTableStatistics(0d, 0.1, true);
+  private final boolean unknown;
+  private final Double rowCount;
+  private final Double rate;
+
+  private BeamTableStatistics(Double rowCount, Double rate, boolean isUnknown) 
{
+    this.rowCount = rowCount;
+    this.rate = rate;
+    this.unknown = isUnknown;
+  }
+
+  private BeamTableStatistics(Double rowCount, Double rate) {
+    this(rowCount, rate, false);
+  }
+
+  public static BeamTableStatistics createBoundedTableStatistics(Double 
rowCount) {
+    return new BeamTableStatistics(rowCount, 0d);
+  }
+
+  public static BeamTableStatistics createUnboundedTableStatistics(Double 
rate) {
+    return new BeamTableStatistics(0d, rate);
+  }
+
+  public Double getRate() {
+    return rate;
+  }
+
+  public boolean isUnknown() {
+    return unknown;
+  }
+
+  @Override
+  public Double getRowCount() {
+    return rowCount;
+  }
+
+  @Override
+  public boolean isKey(ImmutableBitSet columns) {
+    return false;
+  }
+
+  @Override
+  public List<RelReferentialConstraint> getReferentialConstraints() {
+    return ImmutableList.of();
+  }
+
+  @Override
+  public List<RelCollation> getCollations() {
+    return ImmutableList.of();
+  }
+
+  @Override
+  public RelDistribution getDistribution() {
+    return RelDistributionTraitDef.INSTANCE.getDefault();
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
index e14b35b..82fcd3d 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
@@ -21,6 +21,7 @@ import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
 
 import java.util.Map;
 import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteTable;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
@@ -28,26 +29,35 @@ import org.apache.beam.sdk.values.Row;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
 
 /** BeamRelNode to replace a {@code TableScan} node. */
 public class BeamIOSourceRel extends TableScan implements BeamRelNode {
 
-  private final BeamSqlTable sqlTable;
+  private final BeamSqlTable beamTable;
+  private final BeamCalciteTable calciteTable;
   private final Map<String, String> pipelineOptions;
 
   public BeamIOSourceRel(
       RelOptCluster cluster,
       RelOptTable table,
-      BeamSqlTable sqlTable,
-      Map<String, String> pipelineOptions) {
+      BeamSqlTable beamTable,
+      Map<String, String> pipelineOptions,
+      BeamCalciteTable calciteTable) {
     super(cluster, cluster.traitSetOf(BeamLogicalConvention.INSTANCE), table);
-    this.sqlTable = sqlTable;
+    this.beamTable = beamTable;
+    this.calciteTable = calciteTable;
     this.pipelineOptions = pipelineOptions;
   }
 
   @Override
+  public double estimateRowCount(RelMetadataQuery mq) {
+    return super.estimateRowCount(mq);
+  }
+
+  @Override
   public PCollection.IsBounded isBounded() {
-    return sqlTable.isBounded();
+    return beamTable.isBounded();
   }
 
   @Override
@@ -64,12 +74,12 @@ public class BeamIOSourceRel extends TableScan implements 
BeamRelNode {
           "Should not have received input for %s: %s",
           BeamIOSourceRel.class.getSimpleName(),
           input);
-      return sqlTable.buildIOReader(input.getPipeline().begin());
+      return beamTable.buildIOReader(input.getPipeline().begin());
     }
   }
 
   protected BeamSqlTable getBeamSqlTable() {
-    return sqlTable;
+    return beamTable;
   }
 
   @Override
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java
index 984b1bd..6d0f773 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java
@@ -21,7 +21,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.math.BigInteger;
 import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
 import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
@@ -47,7 +47,7 @@ import org.slf4j.LoggerFactory;
 class BigQueryTable extends BaseBeamTable implements Serializable {
   @VisibleForTesting final String bqLocation;
   private final ConversionOptions conversionOptions;
-  private BeamRowCountStatistics rowCountStatistics = null;
+  private BeamTableStatistics rowCountStatistics = null;
   private static final Logger LOGGER = 
LoggerFactory.getLogger(BigQueryTable.class);
 
   BigQueryTable(Table table, BigQueryUtils.ConversionOptions options) {
@@ -57,7 +57,7 @@ class BigQueryTable extends BaseBeamTable implements 
Serializable {
   }
 
   @Override
-  public BeamRowCountStatistics getRowCount(PipelineOptions options) {
+  public BeamTableStatistics getRowCount(PipelineOptions options) {
 
     if (rowCountStatistics == null) {
       rowCountStatistics = getRowCountFromBQ(options, bqLocation);
@@ -93,22 +93,22 @@ class BigQueryTable extends BaseBeamTable implements 
Serializable {
             .to(bqLocation));
   }
 
-  private static BeamRowCountStatistics getRowCountFromBQ(PipelineOptions o, 
String bqLocation) {
+  private static BeamTableStatistics getRowCountFromBQ(PipelineOptions o, 
String bqLocation) {
     try {
       BigInteger rowCount =
           BigQueryHelpers.getNumRows(
               o.as(BigQueryOptions.class), 
BigQueryHelpers.parseTableSpec(bqLocation));
 
       if (rowCount == null) {
-        return BeamRowCountStatistics.UNKNOWN;
+        return BeamTableStatistics.UNKNOWN;
       }
 
-      return BeamRowCountStatistics.createBoundedTableStatistics(rowCount);
+      return 
BeamTableStatistics.createBoundedTableStatistics(rowCount.doubleValue());
 
     } catch (IOException | InterruptedException e) {
       LOGGER.warn("Could not get the row count for the table " + bqLocation, 
e);
     }
 
-    return BeamRowCountStatistics.UNKNOWN;
+    return BeamTableStatistics.UNKNOWN;
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
index 0a6df10..d093eb6 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
@@ -21,7 +21,6 @@ import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
 
 import com.google.auto.service.AutoService;
 import java.io.Serializable;
-import java.math.BigInteger;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -31,7 +30,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
 import 
org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
@@ -140,9 +139,9 @@ public class TestTableProvider extends 
InMemoryMetaTableProvider {
     }
 
     @Override
-    public BeamRowCountStatistics getRowCount(PipelineOptions options) {
-      return BeamRowCountStatistics.createBoundedTableStatistics(
-          BigInteger.valueOf(tableWithRows.getRows().size()));
+    public BeamTableStatistics getRowCount(PipelineOptions options) {
+      return BeamTableStatistics.createBoundedTableStatistics(
+          (double) tableWithRows.getRows().size());
     }
 
     @Override
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTable.java
index 60232ab..a30269f 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTable.java
@@ -18,9 +18,8 @@
 package org.apache.beam.sdk.extensions.sql.meta.provider.text;
 
 import java.io.IOException;
-import java.math.BigInteger;
 import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
 import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.io.TextRowCountEstimator;
@@ -51,7 +50,7 @@ public class TextTable extends BaseBeamTable {
   private static final TextRowCountEstimator.SamplingStrategy 
DEFAULT_SAMPLING_STRATEGY =
       new TextRowCountEstimator.LimitNumberOfTotalBytes(1024 * 1024L);
   private final String filePattern;
-  private BeamRowCountStatistics rowCountStatistics = null;
+  private BeamTableStatistics rowCountStatistics = null;
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TextTable.class);
 
   /** Text table with the specified read and write transforms. */
@@ -71,7 +70,7 @@ public class TextTable extends BaseBeamTable {
   }
 
   @Override
-  public BeamRowCountStatistics getRowCount(PipelineOptions options) {
+  public BeamTableStatistics getRowCount(PipelineOptions options) {
     if (rowCountStatistics == null) {
       rowCountStatistics = getTextRowEstimate(options, getFilePattern());
     }
@@ -79,7 +78,7 @@ public class TextTable extends BaseBeamTable {
     return rowCountStatistics;
   }
 
-  private static BeamRowCountStatistics getTextRowEstimate(
+  private static BeamTableStatistics getTextRowEstimate(
       PipelineOptions options, String filePattern) {
     TextRowCountEstimator textRowCountEstimator =
         TextRowCountEstimator.builder()
@@ -87,12 +86,12 @@ public class TextTable extends BaseBeamTable {
             .setSamplingStrategy(DEFAULT_SAMPLING_STRATEGY)
             .build();
     try {
-      Long rows = textRowCountEstimator.estimateRowCount(options);
-      return 
BeamRowCountStatistics.createBoundedTableStatistics(BigInteger.valueOf(rows));
+      Double rows = textRowCountEstimator.estimateRowCount(options);
+      return BeamTableStatistics.createBoundedTableStatistics(rows);
     } catch (IOException | TextRowCountEstimator.NoEstimationException e) {
       LOGGER.warn("Could not get the row count for the text table " + 
filePattern, e);
     }
-    return BeamRowCountStatistics.UNKNOWN;
+    return BeamTableStatistics.UNKNOWN;
   }
 
   @Override
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/JoinReorderingTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/JoinReorderingTest.java
index 9671a44..c8dd998d 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/JoinReorderingTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/JoinReorderingTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl.rule;
 
-import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -85,25 +84,28 @@ public class JoinReorderingTest {
     createThreeTables(tableProvider);
 
     Assert.assertEquals(
-        BigInteger.ONE,
+        1d,
         tableProvider
             .buildBeamSqlTable(tableProvider.getTable("small_table"))
             .getRowCount(null)
-            .getRowCount());
+            .getRowCount(),
+        0.01);
 
     Assert.assertEquals(
-        BigInteger.valueOf(3),
+        3d,
         tableProvider
             .buildBeamSqlTable(tableProvider.getTable("medium_table"))
             .getRowCount(null)
-            .getRowCount());
+            .getRowCount(),
+        0.01);
 
     Assert.assertEquals(
-        BigInteger.valueOf(100),
+        100d,
         tableProvider
             .buildBeamSqlTable(tableProvider.getTable("large_table"))
             .getRowCount(null)
-            .getRowCount());
+            .getRowCount(),
+        0.01);
   }
 
   @Test
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryRowCountIT.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryRowCountIT.java
index dc03923..b70092e 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryRowCountIT.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryRowCountIT.java
@@ -27,11 +27,10 @@ import static org.junit.Assert.assertTrue;
 import com.google.api.services.bigquery.model.TableFieldSchema;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
-import java.math.BigInteger;
 import java.util.stream.Stream;
 import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.SqlTransform;
-import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
 import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
@@ -61,9 +60,9 @@ public class BigQueryRowCountIT {
     BigQueryTableProvider provider = new BigQueryTableProvider();
     Table table = getTable("testTable", bigQuery.tableSpec());
     BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
-    BeamRowCountStatistics size = 
sqlTable.getRowCount(TestPipeline.testingPipelineOptions());
+    BeamTableStatistics size = 
sqlTable.getRowCount(TestPipeline.testingPipelineOptions());
     assertNotNull(size);
-    assertEquals(BigInteger.ZERO, size.getRowCount());
+    assertEquals(0d, size.getRowCount(), 0.1);
   }
 
   @Test
@@ -91,10 +90,10 @@ public class BigQueryRowCountIT {
     pipeline.run().waitUntilFinish();
 
     BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
-    BeamRowCountStatistics size1 = 
sqlTable.getRowCount(TestPipeline.testingPipelineOptions());
+    BeamTableStatistics size1 = 
sqlTable.getRowCount(TestPipeline.testingPipelineOptions());
 
     assertNotNull(size1);
-    assertEquals(BigInteger.valueOf(3), size1.getRowCount());
+    assertEquals(3d, size1.getRowCount(), 0.1);
   }
 
   /** This tests if the pipeline options are injected in the path of SQL 
Transform. */
@@ -143,7 +142,7 @@ public class BigQueryRowCountIT {
     Table table = getTable("fakeTable", "project:dataset.table");
 
     BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
-    BeamRowCountStatistics size = 
sqlTable.getRowCount(TestPipeline.testingPipelineOptions());
+    BeamTableStatistics size = 
sqlTable.getRowCount(TestPipeline.testingPipelineOptions());
     assertTrue(size.isUnknown());
   }
 
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTable.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTable.java
index db954ae..a801814 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTable.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTable.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery;
 
-import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -34,7 +34,7 @@ public class BigQueryTestTable extends BigQueryTable {
   }
 
   @Override
-  public BeamRowCountStatistics getRowCount(PipelineOptions options) {
+  public BeamTableStatistics getRowCount(PipelineOptions options) {
     jobName = options.getJobName();
     return super.getRowCount(options);
   }

Reply via email to