This is an automated email from the ASF dual-hosted git repository. anton pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 9cf6f0d [BEAM-7898] Remove default implementation of getRowCount and change the name to getTableStatistics new 0d911b8 Merge pull request #9254 from riazela/TablesStatEstimation 9cf6f0d is described below commit 9cf6f0d93fffda74c8075fe5858b17aadb8ad645 Author: Alireza Samadian <alireza4...@gmail.com> AuthorDate: Mon Aug 5 11:11:56 2019 -0700 [BEAM-7898] Remove default implementation of getRowCount and change the name to getTableStatistics --- .../sdk/extensions/sql/meta/provider/hcatalog/HCatalogTable.java | 7 +++++++ .../java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java | 9 +++++---- .../apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java | 2 +- .../apache/beam/sdk/extensions/sql/impl/BeamTableStatistics.java | 2 +- .../sdk/extensions/sql/impl/schema/BeamPCollectionTable.java | 7 +++++++ .../sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java | 6 +++--- .../sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java | 7 +++++++ .../sdk/extensions/sql/meta/provider/parquet/ParquetTable.java | 7 +++++++ .../extensions/sql/meta/provider/pubsub/PubsubIOJsonTable.java | 7 +++++++ .../sql/meta/provider/seqgen/GenerateSequenceTable.java | 7 +++++++ .../sdk/extensions/sql/meta/provider/test/TestBoundedTable.java | 2 +- .../sdk/extensions/sql/meta/provider/test/TestTableProvider.java | 2 +- .../extensions/sql/meta/provider/test/TestUnboundedTable.java | 2 +- .../beam/sdk/extensions/sql/meta/provider/text/TextTable.java | 4 ++-- .../sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java | 6 ++++++ .../sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java | 7 +++++++ .../beam/sdk/extensions/sql/impl/rule/JoinReorderingTest.java | 6 +++--- .../sql/meta/provider/bigquery/BigQueryRowCountIT.java | 6 +++--- .../extensions/sql/meta/provider/bigquery/BigQueryTestTable.java | 4 ++-- 19 files changed, 78 insertions(+), 22 deletions(-) diff --git a/sdks/java/extensions/sql/hcatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/hcatalog/HCatalogTable.java b/sdks/java/extensions/sql/hcatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/hcatalog/HCatalogTable.java index c96c657..2606941 100644 --- a/sdks/java/extensions/sql/hcatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/hcatalog/HCatalogTable.java +++ b/sdks/java/extensions/sql/hcatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/hcatalog/HCatalogTable.java @@ -21,8 +21,10 @@ import com.google.auto.value.AutoValue; import java.util.Map; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.extensions.sql.BeamSqlTable; +import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; import org.apache.beam.sdk.io.hcatalog.HCatToRow; import org.apache.beam.sdk.io.hcatalog.HCatalogIO; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -68,6 +70,11 @@ public abstract class HCatalogTable implements BeamSqlTable { } @Override + public BeamTableStatistics getTableStatistics(PipelineOptions options) { + return BeamTableStatistics.BOUNDED_UNKNOWN; + } + + @Override public Schema getSchema() { return schema(); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java index 6ddf8bd..ea7c030 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java @@ -39,8 +39,9 @@ public interface BeamSqlTable { /** Get the schema info of the table. */ Schema getSchema(); - /** Estimates the number of rows or the rate for unbounded Tables. */ - default BeamTableStatistics getRowCount(PipelineOptions options) { - return BeamTableStatistics.UNKNOWN; - } + /** + * Estimates the number of rows or the rate for unbounded Tables. If it is not possible to + * estimate the row count or rate it will return BeamTableStatistics.BOUNDED_UNKNOWN. + */ + BeamTableStatistics getTableStatistics(PipelineOptions options); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java index b6dbf53..9a889a9 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java @@ -88,7 +88,7 @@ public class BeamCalciteTable extends AbstractQueryableTable final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(BeamEnumerableConverter.class.getClassLoader()); - return beamTable.getRowCount(getPipelineOptions()); + return beamTable.getTableStatistics(getPipelineOptions()); } finally { Thread.currentThread().setContextClassLoader(originalClassLoader); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamTableStatistics.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamTableStatistics.java index c010604..0571d77 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamTableStatistics.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamTableStatistics.java @@ -33,7 +33,7 @@ import org.apache.calcite.util.ImmutableBitSet; @Experimental @Internal public class BeamTableStatistics implements Serializable, Statistic { - public static final BeamTableStatistics UNKNOWN = new BeamTableStatistics(100d, 0d, true); + public static final BeamTableStatistics BOUNDED_UNKNOWN = new BeamTableStatistics(100d, 0d, true); public static final BeamTableStatistics UNBOUNDED_UNKNOWN = new BeamTableStatistics(0d, 0.1, true); private final boolean unknown; diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java index 661aec9..38d0f87 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.extensions.sql.impl.schema; +import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.transforms.Convert; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -53,4 +55,9 @@ public class BeamPCollectionTable<InputT> extends BaseBeamTable { public POutput buildIOWriter(PCollection<Row> input) { throw new IllegalArgumentException("cannot use [BeamPCollectionTable] as target"); } + + @Override + public BeamTableStatistics getTableStatistics(PipelineOptions options) { + return BeamTableStatistics.BOUNDED_UNKNOWN; + } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java index 6d0f773..4f1b6a9 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java @@ -57,7 +57,7 @@ class BigQueryTable extends BaseBeamTable implements Serializable { } @Override - public BeamTableStatistics getRowCount(PipelineOptions options) { + public BeamTableStatistics getTableStatistics(PipelineOptions options) { if (rowCountStatistics == null) { rowCountStatistics = getRowCountFromBQ(options, bqLocation); @@ -100,7 +100,7 @@ class BigQueryTable extends BaseBeamTable implements Serializable { o.as(BigQueryOptions.class), BigQueryHelpers.parseTableSpec(bqLocation)); if (rowCount == null) { - return BeamTableStatistics.UNKNOWN; + return BeamTableStatistics.BOUNDED_UNKNOWN; } return BeamTableStatistics.createBoundedTableStatistics(rowCount.doubleValue()); @@ -109,6 +109,6 @@ class BigQueryTable extends BaseBeamTable implements Serializable { LOGGER.warn("Could not get the row count for the table " + bqLocation, e); } - return BeamTableStatistics.UNKNOWN; + return BeamTableStatistics.BOUNDED_UNKNOWN; } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java index 6fcbf48..0e1dab3 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java @@ -23,8 +23,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable; import org.apache.beam.sdk.io.kafka.KafkaIO; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.KV; @@ -133,4 +135,9 @@ public abstract class BeamKafkaTable extends BaseBeamTable { public List<String> getTopics() { return topics; } + + @Override + public BeamTableStatistics getTableStatistics(PipelineOptions options) { + return BeamTableStatistics.UNBOUNDED_UNKNOWN; + } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java index bb27e2b..71deebc 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java @@ -19,8 +19,10 @@ package org.apache.beam.sdk.extensions.sql.meta.provider.parquet; import java.io.Serializable; import org.apache.avro.generic.GenericRecord; +import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable; import org.apache.beam.sdk.io.parquet.ParquetIO; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.utils.AvroUtils; import org.apache.beam.sdk.transforms.PTransform; @@ -57,4 +59,9 @@ public class ParquetTable extends BaseBeamTable implements Serializable { public PCollection.IsBounded isBounded() { return PCollection.IsBounded.BOUNDED; } + + @Override + public BeamTableStatistics getTableStatistics(PipelineOptions options) { + return BeamTableStatistics.BOUNDED_UNKNOWN; + } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubIOJsonTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubIOJsonTable.java index a3d338a..74575b4 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubIOJsonTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubIOJsonTable.java @@ -26,8 +26,10 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.extensions.sql.BeamSqlTable; +import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PBegin; @@ -186,6 +188,11 @@ abstract class PubsubIOJsonTable implements BeamSqlTable, Serializable { throw new UnsupportedOperationException("Writing to a Pubsub topic is not supported"); } + @Override + public BeamTableStatistics getTableStatistics(PipelineOptions options) { + return BeamTableStatistics.UNBOUNDED_UNKNOWN; + } + @AutoValue.Builder abstract static class Builder { abstract Builder setSchema(Schema schema); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/seqgen/GenerateSequenceTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/seqgen/GenerateSequenceTable.java index 0d65b63..775d797 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/seqgen/GenerateSequenceTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/seqgen/GenerateSequenceTable.java @@ -18,9 +18,11 @@ package org.apache.beam.sdk.extensions.sql.meta.provider.seqgen; import java.io.Serializable; +import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable; import org.apache.beam.sdk.extensions.sql.meta.Table; import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; @@ -63,6 +65,11 @@ class GenerateSequenceTable extends BaseBeamTable implements Serializable { } @Override + public BeamTableStatistics getTableStatistics(PipelineOptions options) { + return BeamTableStatistics.createUnboundedTableStatistics((double) elementsPerSecond); + } + + @Override public POutput buildIOWriter(PCollection<Row> input) { throw new UnsupportedOperationException("buildIOWriter unsupported!"); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestBoundedTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestBoundedTable.java index 73edca9..6f88b15 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestBoundedTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestBoundedTable.java @@ -47,7 +47,7 @@ public class TestBoundedTable extends TestTable { } @Override - public BeamTableStatistics getRowCount(PipelineOptions options) { + public BeamTableStatistics getTableStatistics(PipelineOptions options) { return BeamTableStatistics.createBoundedTableStatistics((double) rows.size()); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java index d093eb6..9dd1923 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java @@ -139,7 +139,7 @@ public class TestTableProvider extends InMemoryMetaTableProvider { } @Override - public BeamTableStatistics getRowCount(PipelineOptions options) { + public BeamTableStatistics getTableStatistics(PipelineOptions options) { return BeamTableStatistics.createBoundedTableStatistics( (double) tableWithRows.getRows().size()); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestUnboundedTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestUnboundedTable.java index 1fc4da5..f3b56f4 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestUnboundedTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestUnboundedTable.java @@ -71,7 +71,7 @@ public class TestUnboundedTable extends TestTable { } @Override - public BeamTableStatistics getRowCount(PipelineOptions options) { + public BeamTableStatistics getTableStatistics(PipelineOptions options) { return this.statistics; } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTable.java index a30269f..8cf071e 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTable.java @@ -70,7 +70,7 @@ public class TextTable extends BaseBeamTable { } @Override - public BeamTableStatistics getRowCount(PipelineOptions options) { + public BeamTableStatistics getTableStatistics(PipelineOptions options) { if (rowCountStatistics == null) { rowCountStatistics = getTextRowEstimate(options, getFilePattern()); } @@ -91,7 +91,7 @@ public class TextTable extends BaseBeamTable { } catch (IOException | TextRowCountEstimator.NoEstimationException e) { LOGGER.warn("Could not get the row count for the text table " + filePattern, e); } - return BeamTableStatistics.UNKNOWN; + return BeamTableStatistics.BOUNDED_UNKNOWN; } @Override diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java index 5dfa38d..7a0d04b 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue; import com.google.common.collect.ImmutableList; import java.math.BigDecimal; import java.util.List; +import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; import org.apache.beam.sdk.options.PipelineOptions; @@ -148,6 +149,11 @@ public class BeamEnumerableConverterTest { })); return PDone.in(input.getPipeline()); } + + @Override + public BeamTableStatistics getTableStatistics(PipelineOptions options) { + return BeamTableStatistics.BOUNDED_UNKNOWN; + } } @Test diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java index e1a5d8b..d19f5e0 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java @@ -21,12 +21,14 @@ import java.util.Arrays; import java.util.List; import org.apache.beam.sdk.extensions.sql.BeamSqlSeekableTable; import org.apache.beam.sdk.extensions.sql.TestUtils; +import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable; import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlOutputToConsoleFn; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableUtils; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestUnboundedTable; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -132,6 +134,11 @@ public class BeamJoinRelUnboundedVsBoundedTest extends BaseRelTest { public List<Row> seekRow(Row lookupSubRow) { return Arrays.asList(Row.withSchema(getSchema()).addValues(1, "SITE1").build()); } + + @Override + public BeamTableStatistics getTableStatistics(PipelineOptions options) { + return BeamTableStatistics.BOUNDED_UNKNOWN; + } } @Test diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/JoinReorderingTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/JoinReorderingTest.java index c8dd998d..9b2602f 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/JoinReorderingTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/JoinReorderingTest.java @@ -87,7 +87,7 @@ public class JoinReorderingTest { 1d, tableProvider .buildBeamSqlTable(tableProvider.getTable("small_table")) - .getRowCount(null) + .getTableStatistics(null) .getRowCount(), 0.01); @@ -95,7 +95,7 @@ public class JoinReorderingTest { 3d, tableProvider .buildBeamSqlTable(tableProvider.getTable("medium_table")) - .getRowCount(null) + .getTableStatistics(null) .getRowCount(), 0.01); @@ -103,7 +103,7 @@ public class JoinReorderingTest { 100d, tableProvider .buildBeamSqlTable(tableProvider.getTable("large_table")) - .getRowCount(null) + .getTableStatistics(null) .getRowCount(), 0.01); } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryRowCountIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryRowCountIT.java index b70092e..3a97754 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryRowCountIT.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryRowCountIT.java @@ -60,7 +60,7 @@ public class BigQueryRowCountIT { BigQueryTableProvider provider = new BigQueryTableProvider(); Table table = getTable("testTable", bigQuery.tableSpec()); BeamSqlTable sqlTable = provider.buildBeamSqlTable(table); - BeamTableStatistics size = sqlTable.getRowCount(TestPipeline.testingPipelineOptions()); + BeamTableStatistics size = sqlTable.getTableStatistics(TestPipeline.testingPipelineOptions()); assertNotNull(size); assertEquals(0d, size.getRowCount(), 0.1); } @@ -90,7 +90,7 @@ public class BigQueryRowCountIT { pipeline.run().waitUntilFinish(); BeamSqlTable sqlTable = provider.buildBeamSqlTable(table); - BeamTableStatistics size1 = sqlTable.getRowCount(TestPipeline.testingPipelineOptions()); + BeamTableStatistics size1 = sqlTable.getTableStatistics(TestPipeline.testingPipelineOptions()); assertNotNull(size1); assertEquals(3d, size1.getRowCount(), 0.1); @@ -142,7 +142,7 @@ public class BigQueryRowCountIT { Table table = getTable("fakeTable", "project:dataset.table"); BeamSqlTable sqlTable = provider.buildBeamSqlTable(table); - BeamTableStatistics size = sqlTable.getRowCount(TestPipeline.testingPipelineOptions()); + BeamTableStatistics size = sqlTable.getTableStatistics(TestPipeline.testingPipelineOptions()); assertTrue(size.isUnknown()); } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTable.java index a801814..a674e40 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTable.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTable.java @@ -34,9 +34,9 @@ public class BigQueryTestTable extends BigQueryTable { } @Override - public BeamTableStatistics getRowCount(PipelineOptions options) { + public BeamTableStatistics getTableStatistics(PipelineOptions options) { jobName = options.getJobName(); - return super.getRowCount(options); + return super.getTableStatistics(options); } String getJobName() {