This is an automated email from the ASF dual-hosted git repository. anton pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 3a0d9c4 [BEAM-7783] Adding BeamTableStatistics. new ff7a803 Merge pull request #9104 from riazela/BeamTableStatistics 3a0d9c4 is described below commit 3a0d9c4fe16ced30a223557a4ad531365d4977ec Author: Alireza Samadian <alireza4...@gmail.com> AuthorDate: Thu Jul 18 16:53:54 2019 -0700 [BEAM-7783] Adding BeamTableStatistics. --- .../apache/beam/sdk/io/TextRowCountEstimator.java | 6 +- .../beam/sdk/io/TextRowCountEstimatorTest.java | 12 +-- .../beam/sdk/extensions/sql/BeamSqlTable.java | 6 +- .../sdk/extensions/sql/impl/BeamCalciteTable.java | 10 +-- .../sql/impl/BeamRowCountStatistics.java | 44 ---------- .../extensions/sql/impl/BeamTableStatistics.java | 93 ++++++++++++++++++++++ .../extensions/sql/impl/rel/BeamIOSourceRel.java | 24 ++++-- .../sql/meta/provider/bigquery/BigQueryTable.java | 14 ++-- .../sql/meta/provider/test/TestTableProvider.java | 9 +-- .../sql/meta/provider/text/TextTable.java | 15 ++-- .../sql/impl/rule/JoinReorderingTest.java | 16 ++-- .../meta/provider/bigquery/BigQueryRowCountIT.java | 13 ++- .../meta/provider/bigquery/BigQueryTestTable.java | 4 +- 13 files changed, 160 insertions(+), 106 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java index d220505..ad26fb1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java @@ -72,7 +72,7 @@ public abstract class TextRowCountEstimator { * @throws org.apache.beam.sdk.io.TextRowCountEstimator.NoEstimationException if all the sampled * lines are empty and we have not read all the lines in the matched files. */ - public Long estimateRowCount(PipelineOptions pipelineOptions) + public Double estimateRowCount(PipelineOptions pipelineOptions) throws IOException, NoEstimationException { long linesSize = 0; int numberOfReadLines = 0; @@ -129,7 +129,7 @@ public abstract class TextRowCountEstimator { } if (numberOfReadLines == 0 && sampledEverything) { - return 0L; + return 0d; } if (numberOfReadLines == 0) { @@ -138,7 +138,7 @@ public abstract class TextRowCountEstimator { } // This is total file sizes divided by average line size. - return totalFileSizes * numberOfReadLines / linesSize; + return (double) totalFileSizes * numberOfReadLines / linesSize; } /** Builder for {@link org.apache.beam.sdk.io.TextRowCountEstimator}. */ diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextRowCountEstimatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextRowCountEstimatorTest.java index b7e3f8e..6f53d1e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextRowCountEstimatorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextRowCountEstimatorTest.java @@ -59,16 +59,16 @@ public class TextRowCountEstimatorTest { writer.close(); TextRowCountEstimator textRowCountEstimator = TextRowCountEstimator.builder().setFilePattern(temporaryFolder.getRoot() + "/**").build(); - Long rows = textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create()); + Double rows = textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create()); Assert.assertNotNull(rows); - Assert.assertEquals(150L, rows.longValue()); + Assert.assertEquals(150d, rows, 0.01); } @Test(expected = FileNotFoundException.class) public void testEmptyFolder() throws Exception { TextRowCountEstimator textRowCountEstimator = TextRowCountEstimator.builder().setFilePattern(temporaryFolder.getRoot() + "/**").build(); - Long rows = textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create()); + Double rows = textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create()); } @Test @@ -82,8 +82,8 @@ public class TextRowCountEstimatorTest { writer.close(); TextRowCountEstimator textRowCountEstimator = TextRowCountEstimator.builder().setFilePattern(temporaryFolder.getRoot() + "/**").build(); - Long rows = textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create()); - Assert.assertEquals(0L, rows.longValue()); + Double rows = textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create()); + Assert.assertEquals(0d, rows, 0.01); } @Test(expected = TextRowCountEstimator.NoEstimationException.class) @@ -110,7 +110,7 @@ public class TextRowCountEstimatorTest { TextRowCountEstimator.builder() .setFilePattern(temporaryFolder.getRoot() + "/something/**") .build(); - Long rows = textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create()); + Double rows = textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create()); Assert.assertNull(rows); } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java index 63f7158..b759761 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java @@ -17,7 +17,7 @@ */ package org.apache.beam.sdk.extensions.sql; -import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics; +import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.values.PBegin; @@ -40,7 +40,7 @@ public interface BeamSqlTable { Schema getSchema(); /** Estimates the number of rows or returns null if there is no estimation. */ - default BeamRowCountStatistics getRowCount(PipelineOptions options) { - return BeamRowCountStatistics.UNKNOWN; + default BeamTableStatistics getRowCount(PipelineOptions options) { + return BeamTableStatistics.UNKNOWN; } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java index 5aa0f27..293a60b 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java @@ -26,7 +26,6 @@ import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSinkRel; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.calcite.adapter.java.AbstractQueryableTable; import org.apache.calcite.linq4j.QueryProvider; @@ -42,7 +41,6 @@ import org.apache.calcite.rex.RexNode; import org.apache.calcite.schema.ModifiableTable; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.Statistic; -import org.apache.calcite.schema.Statistics; import org.apache.calcite.schema.TranslatableTable; /** Adapter from {@link BeamSqlTable} to a calcite Table. */ @@ -91,10 +89,7 @@ public class BeamCalciteTable extends AbstractQueryableTable final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(BeamEnumerableConverter.class.getClassLoader()); - BeamRowCountStatistics beamStatistics = beamTable.getRowCount(getPipelineOptions()); - return beamStatistics.isUnknown() - ? Statistics.UNKNOWN - : Statistics.of(beamStatistics.getRowCount().doubleValue(), ImmutableList.of()); + return beamTable.getRowCount(getPipelineOptions()); } finally { Thread.currentThread().setContextClassLoader(originalClassLoader); } @@ -102,7 +97,8 @@ public class BeamCalciteTable extends AbstractQueryableTable @Override public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) { - return new BeamIOSourceRel(context.getCluster(), relOptTable, beamTable, pipelineOptionsMap); + return new BeamIOSourceRel( + context.getCluster(), relOptTable, beamTable, pipelineOptionsMap, this); } @Override diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamRowCountStatistics.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamRowCountStatistics.java deleted file mode 100644 index ac0431d..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamRowCountStatistics.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.impl; - -import java.io.Serializable; -import java.math.BigInteger; - -/** This class stores row count statistics. */ -public class BeamRowCountStatistics implements Serializable { - public static final BeamRowCountStatistics UNKNOWN = new BeamRowCountStatistics(null); - private final BigInteger rowCount; - - private BeamRowCountStatistics(BigInteger rowCount) { - this.rowCount = rowCount; - } - - public static BeamRowCountStatistics createBoundedTableStatistics(BigInteger rowCount) { - return new BeamRowCountStatistics(rowCount); - } - - /** Is true if the row count cannot be estimated. */ - public boolean isUnknown() { - return rowCount == null; - } - - public BigInteger getRowCount() { - return rowCount; - } -} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamTableStatistics.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamTableStatistics.java new file mode 100644 index 0000000..c010604 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamTableStatistics.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl; + +import java.io.Serializable; +import java.util.List; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelDistribution; +import org.apache.calcite.rel.RelDistributionTraitDef; +import org.apache.calcite.rel.RelReferentialConstraint; +import org.apache.calcite.schema.Statistic; +import org.apache.calcite.util.ImmutableBitSet; + +/** This class stores row count statistics. */ +@Experimental +@Internal +public class BeamTableStatistics implements Serializable, Statistic { + public static final BeamTableStatistics UNKNOWN = new BeamTableStatistics(100d, 0d, true); + public static final BeamTableStatistics UNBOUNDED_UNKNOWN = + new BeamTableStatistics(0d, 0.1, true); + private final boolean unknown; + private final Double rowCount; + private final Double rate; + + private BeamTableStatistics(Double rowCount, Double rate, boolean isUnknown) { + this.rowCount = rowCount; + this.rate = rate; + this.unknown = isUnknown; + } + + private BeamTableStatistics(Double rowCount, Double rate) { + this(rowCount, rate, false); + } + + public static BeamTableStatistics createBoundedTableStatistics(Double rowCount) { + return new BeamTableStatistics(rowCount, 0d); + } + + public static BeamTableStatistics createUnboundedTableStatistics(Double rate) { + return new BeamTableStatistics(0d, rate); + } + + public Double getRate() { + return rate; + } + + public boolean isUnknown() { + return unknown; + } + + @Override + public Double getRowCount() { + return rowCount; + } + + @Override + public boolean isKey(ImmutableBitSet columns) { + return false; + } + + @Override + public List<RelReferentialConstraint> getReferentialConstraints() { + return ImmutableList.of(); + } + + @Override + public List<RelCollation> getCollations() { + return ImmutableList.of(); + } + + @Override + public RelDistribution getDistribution() { + return RelDistributionTraitDef.INSTANCE.getDefault(); + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java index e14b35b..82fcd3d 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java @@ -21,6 +21,7 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec import java.util.Map; import org.apache.beam.sdk.extensions.sql.BeamSqlTable; +import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteTable; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; @@ -28,26 +29,35 @@ import org.apache.beam.sdk.values.Row; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.metadata.RelMetadataQuery; /** BeamRelNode to replace a {@code TableScan} node. */ public class BeamIOSourceRel extends TableScan implements BeamRelNode { - private final BeamSqlTable sqlTable; + private final BeamSqlTable beamTable; + private final BeamCalciteTable calciteTable; private final Map<String, String> pipelineOptions; public BeamIOSourceRel( RelOptCluster cluster, RelOptTable table, - BeamSqlTable sqlTable, - Map<String, String> pipelineOptions) { + BeamSqlTable beamTable, + Map<String, String> pipelineOptions, + BeamCalciteTable calciteTable) { super(cluster, cluster.traitSetOf(BeamLogicalConvention.INSTANCE), table); - this.sqlTable = sqlTable; + this.beamTable = beamTable; + this.calciteTable = calciteTable; this.pipelineOptions = pipelineOptions; } @Override + public double estimateRowCount(RelMetadataQuery mq) { + return super.estimateRowCount(mq); + } + + @Override public PCollection.IsBounded isBounded() { - return sqlTable.isBounded(); + return beamTable.isBounded(); } @Override @@ -64,12 +74,12 @@ public class BeamIOSourceRel extends TableScan implements BeamRelNode { "Should not have received input for %s: %s", BeamIOSourceRel.class.getSimpleName(), input); - return sqlTable.buildIOReader(input.getPipeline().begin()); + return beamTable.buildIOReader(input.getPipeline().begin()); } } protected BeamSqlTable getBeamSqlTable() { - return sqlTable; + return beamTable; } @Override diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java index 984b1bd..6d0f773 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java @@ -21,7 +21,7 @@ import java.io.IOException; import java.io.Serializable; import java.math.BigInteger; import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics; +import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable; import org.apache.beam.sdk.extensions.sql.meta.Table; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers; @@ -47,7 +47,7 @@ import org.slf4j.LoggerFactory; class BigQueryTable extends BaseBeamTable implements Serializable { @VisibleForTesting final String bqLocation; private final ConversionOptions conversionOptions; - private BeamRowCountStatistics rowCountStatistics = null; + private BeamTableStatistics rowCountStatistics = null; private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryTable.class); BigQueryTable(Table table, BigQueryUtils.ConversionOptions options) { @@ -57,7 +57,7 @@ class BigQueryTable extends BaseBeamTable implements Serializable { } @Override - public BeamRowCountStatistics getRowCount(PipelineOptions options) { + public BeamTableStatistics getRowCount(PipelineOptions options) { if (rowCountStatistics == null) { rowCountStatistics = getRowCountFromBQ(options, bqLocation); @@ -93,22 +93,22 @@ class BigQueryTable extends BaseBeamTable implements Serializable { .to(bqLocation)); } - private static BeamRowCountStatistics getRowCountFromBQ(PipelineOptions o, String bqLocation) { + private static BeamTableStatistics getRowCountFromBQ(PipelineOptions o, String bqLocation) { try { BigInteger rowCount = BigQueryHelpers.getNumRows( o.as(BigQueryOptions.class), BigQueryHelpers.parseTableSpec(bqLocation)); if (rowCount == null) { - return BeamRowCountStatistics.UNKNOWN; + return BeamTableStatistics.UNKNOWN; } - return BeamRowCountStatistics.createBoundedTableStatistics(rowCount); + return BeamTableStatistics.createBoundedTableStatistics(rowCount.doubleValue()); } catch (IOException | InterruptedException e) { LOGGER.warn("Could not get the row count for the table " + bqLocation, e); } - return BeamRowCountStatistics.UNKNOWN; + return BeamTableStatistics.UNKNOWN; } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java index 0a6df10..d093eb6 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java @@ -21,7 +21,6 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec import com.google.auto.service.AutoService; import java.io.Serializable; -import java.math.BigInteger; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -31,7 +30,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.extensions.sql.BeamSqlTable; -import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics; +import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; import org.apache.beam.sdk.extensions.sql.meta.Table; import org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider; import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider; @@ -140,9 +139,9 @@ public class TestTableProvider extends InMemoryMetaTableProvider { } @Override - public BeamRowCountStatistics getRowCount(PipelineOptions options) { - return BeamRowCountStatistics.createBoundedTableStatistics( - BigInteger.valueOf(tableWithRows.getRows().size())); + public BeamTableStatistics getRowCount(PipelineOptions options) { + return BeamTableStatistics.createBoundedTableStatistics( + (double) tableWithRows.getRows().size()); } @Override diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTable.java index 60232ab..a30269f 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTable.java @@ -18,9 +18,8 @@ package org.apache.beam.sdk.extensions.sql.meta.provider.text; import java.io.IOException; -import java.math.BigInteger; import org.apache.beam.sdk.annotations.Internal; -import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics; +import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.TextRowCountEstimator; @@ -51,7 +50,7 @@ public class TextTable extends BaseBeamTable { private static final TextRowCountEstimator.SamplingStrategy DEFAULT_SAMPLING_STRATEGY = new TextRowCountEstimator.LimitNumberOfTotalBytes(1024 * 1024L); private final String filePattern; - private BeamRowCountStatistics rowCountStatistics = null; + private BeamTableStatistics rowCountStatistics = null; private static final Logger LOGGER = LoggerFactory.getLogger(TextTable.class); /** Text table with the specified read and write transforms. */ @@ -71,7 +70,7 @@ public class TextTable extends BaseBeamTable { } @Override - public BeamRowCountStatistics getRowCount(PipelineOptions options) { + public BeamTableStatistics getRowCount(PipelineOptions options) { if (rowCountStatistics == null) { rowCountStatistics = getTextRowEstimate(options, getFilePattern()); } @@ -79,7 +78,7 @@ public class TextTable extends BaseBeamTable { return rowCountStatistics; } - private static BeamRowCountStatistics getTextRowEstimate( + private static BeamTableStatistics getTextRowEstimate( PipelineOptions options, String filePattern) { TextRowCountEstimator textRowCountEstimator = TextRowCountEstimator.builder() @@ -87,12 +86,12 @@ public class TextTable extends BaseBeamTable { .setSamplingStrategy(DEFAULT_SAMPLING_STRATEGY) .build(); try { - Long rows = textRowCountEstimator.estimateRowCount(options); - return BeamRowCountStatistics.createBoundedTableStatistics(BigInteger.valueOf(rows)); + Double rows = textRowCountEstimator.estimateRowCount(options); + return BeamTableStatistics.createBoundedTableStatistics(rows); } catch (IOException | TextRowCountEstimator.NoEstimationException e) { LOGGER.warn("Could not get the row count for the text table " + filePattern, e); } - return BeamRowCountStatistics.UNKNOWN; + return BeamTableStatistics.UNKNOWN; } @Override diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/JoinReorderingTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/JoinReorderingTest.java index 9671a44..c8dd998d 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/JoinReorderingTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/JoinReorderingTest.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.extensions.sql.impl.rule; -import java.math.BigInteger; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -85,25 +84,28 @@ public class JoinReorderingTest { createThreeTables(tableProvider); Assert.assertEquals( - BigInteger.ONE, + 1d, tableProvider .buildBeamSqlTable(tableProvider.getTable("small_table")) .getRowCount(null) - .getRowCount()); + .getRowCount(), + 0.01); Assert.assertEquals( - BigInteger.valueOf(3), + 3d, tableProvider .buildBeamSqlTable(tableProvider.getTable("medium_table")) .getRowCount(null) - .getRowCount()); + .getRowCount(), + 0.01); Assert.assertEquals( - BigInteger.valueOf(100), + 100d, tableProvider .buildBeamSqlTable(tableProvider.getTable("large_table")) .getRowCount(null) - .getRowCount()); + .getRowCount(), + 0.01); } @Test diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryRowCountIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryRowCountIT.java index dc03923..b70092e 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryRowCountIT.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryRowCountIT.java @@ -27,11 +27,10 @@ import static org.junit.Assert.assertTrue; import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; -import java.math.BigInteger; import java.util.stream.Stream; import org.apache.beam.sdk.extensions.sql.BeamSqlTable; import org.apache.beam.sdk.extensions.sql.SqlTransform; -import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics; +import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; import org.apache.beam.sdk.extensions.sql.meta.Table; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder; @@ -61,9 +60,9 @@ public class BigQueryRowCountIT { BigQueryTableProvider provider = new BigQueryTableProvider(); Table table = getTable("testTable", bigQuery.tableSpec()); BeamSqlTable sqlTable = provider.buildBeamSqlTable(table); - BeamRowCountStatistics size = sqlTable.getRowCount(TestPipeline.testingPipelineOptions()); + BeamTableStatistics size = sqlTable.getRowCount(TestPipeline.testingPipelineOptions()); assertNotNull(size); - assertEquals(BigInteger.ZERO, size.getRowCount()); + assertEquals(0d, size.getRowCount(), 0.1); } @Test @@ -91,10 +90,10 @@ public class BigQueryRowCountIT { pipeline.run().waitUntilFinish(); BeamSqlTable sqlTable = provider.buildBeamSqlTable(table); - BeamRowCountStatistics size1 = sqlTable.getRowCount(TestPipeline.testingPipelineOptions()); + BeamTableStatistics size1 = sqlTable.getRowCount(TestPipeline.testingPipelineOptions()); assertNotNull(size1); - assertEquals(BigInteger.valueOf(3), size1.getRowCount()); + assertEquals(3d, size1.getRowCount(), 0.1); } /** This tests if the pipeline options are injected in the path of SQL Transform. */ @@ -143,7 +142,7 @@ public class BigQueryRowCountIT { Table table = getTable("fakeTable", "project:dataset.table"); BeamSqlTable sqlTable = provider.buildBeamSqlTable(table); - BeamRowCountStatistics size = sqlTable.getRowCount(TestPipeline.testingPipelineOptions()); + BeamTableStatistics size = sqlTable.getRowCount(TestPipeline.testingPipelineOptions()); assertTrue(size.isUnknown()); } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTable.java index db954ae..a801814 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTable.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTable.java @@ -17,7 +17,7 @@ */ package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery; -import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics; +import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; import org.apache.beam.sdk.extensions.sql.meta.Table; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; import org.apache.beam.sdk.options.PipelineOptions; @@ -34,7 +34,7 @@ public class BigQueryTestTable extends BigQueryTable { } @Override - public BeamRowCountStatistics getRowCount(PipelineOptions options) { + public BeamTableStatistics getRowCount(PipelineOptions options) { jobName = options.getJobName(); return super.getRowCount(options); }