Repository: beam Updated Branches: refs/heads/master e9cd41165 -> 77b136603
BigQuery: refactor services so that all queryConfig happens in BigQueryIO By putting all the configuration in the same place, we can avoid bugs that happen from mismatching code across files. Also made a few unnecessarily-public APIs package-private. And improved tests, removed a few dataflow references. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/21e2cf6b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/21e2cf6b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/21e2cf6b Branch: refs/heads/master Commit: 21e2cf6b0a3905cd2768948287115b69f4b6bd6c Parents: e9cd411 Author: Dan Halperin <dhalp...@google.com> Authored: Mon Jan 30 14:04:32 2017 -0800 Committer: Dan Halperin <dhalp...@google.com> Committed: Tue Jan 31 17:11:18 2017 -0800 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 4 +- .../sdk/io/gcp/bigquery/BigQueryServices.java | 3 +- .../io/gcp/bigquery/BigQueryServicesImpl.java | 17 ++-- .../gcp/bigquery/BigQueryTableRowIterator.java | 82 ++++++++------------ .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 3 +- .../bigquery/BigQueryTableRowIteratorTest.java | 51 ++++++------ 6 files changed, 67 insertions(+), 93 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/21e2cf6b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 4ace985..b15807e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -1074,7 +1074,7 @@ public class BigQueryIO { public BoundedReader<TableRow> createReader(PipelineOptions options) throws IOException { BigQueryOptions bqOptions = options.as(BigQueryOptions.class); return new BigQueryReader(this, bqServices.getReaderFromQuery( - bqOptions, query.get(), executingProject.get(), flattenResults, useLegacySql)); + bqOptions, executingProject.get(), createBasicQueryConfig())); } @Override @@ -1162,8 +1162,8 @@ public class BigQueryIO { private JobConfigurationQuery createBasicQueryConfig() { return new JobConfigurationQuery() - .setQuery(query.get()) .setFlattenResults(flattenResults) + .setQuery(query.get()) .setUseLegacySql(useLegacySql); } http://git-wip-us.apache.org/repos/asf/beam/blob/21e2cf6b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java index 03e4391..a85d16d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java @@ -57,8 +57,7 @@ interface BigQueryServices extends Serializable { * Returns a real, mock, or fake {@link BigQueryJsonReader} to query tables. */ BigQueryJsonReader getReaderFromQuery( - BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten, - @Nullable Boolean useLegacySql); + BigQueryOptions bqOptions, String projectId, JobConfigurationQuery queryConfig); /** * An interface for the Cloud BigQuery load service. http://git-wip-us.apache.org/repos/asf/beam/blob/21e2cf6b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 7c3edbe..b958c8d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -100,9 +100,8 @@ class BigQueryServicesImpl implements BigQueryServices { @Override public BigQueryJsonReader getReaderFromQuery( - BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten, - @Nullable Boolean useLegacySql) { - return BigQueryJsonReaderImpl.fromQuery(bqOptions, query, projectId, flatten, useLegacySql); + BigQueryOptions bqOptions, String projectId, JobConfigurationQuery queryConfig) { + return BigQueryJsonReaderImpl.fromQuery(bqOptions, projectId, queryConfig); } @VisibleForTesting @@ -800,20 +799,14 @@ class BigQueryServicesImpl implements BigQueryServices { } private static BigQueryJsonReader fromQuery( - BigQueryOptions bqOptions, - String query, - String projectId, - @Nullable Boolean flattenResults, - @Nullable Boolean useLegacySql) { + BigQueryOptions bqOptions, String projectId, JobConfigurationQuery queryConfig) { return new BigQueryJsonReaderImpl( BigQueryTableRowIterator.fromQuery( - query, projectId, Transport.newBigQueryClient(bqOptions).build(), flattenResults, - useLegacySql)); + queryConfig, projectId, Transport.newBigQueryClient(bqOptions).build())); } private static BigQueryJsonReader fromTable( - BigQueryOptions bqOptions, - TableReference tableRef) { + BigQueryOptions bqOptions, TableReference tableRef) { return new BigQueryJsonReaderImpl(BigQueryTableRowIterator.fromTable( tableRef, Transport.newBigQueryClient(bqOptions).build())); } http://git-wip-us.apache.org/repos/asf/beam/blob/21e2cf6b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java index 92f7542..5edc78c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java @@ -44,7 +44,6 @@ import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; -import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.Uninterruptibles; import java.io.IOException; @@ -72,6 +71,7 @@ class BigQueryTableRowIterator implements AutoCloseable { @Nullable private TableReference ref; @Nullable private final String projectId; @Nullable private TableSchema schema; + @Nullable private JobConfigurationQuery queryConfig; private final Bigquery client; private String pageToken; private Iterator<TableRow> iteratorOverCurrentBatch; @@ -88,64 +88,54 @@ class BigQueryTableRowIterator implements AutoCloseable { // following interval to check the status of query execution job private static final Duration QUERY_COMPLETION_POLL_TIME = Duration.standardSeconds(1); - private final String query; - // Whether to flatten query results. - private final boolean flattenResults; - // Whether to use the BigQuery legacy SQL dialect.. - private final boolean useLegacySql; // Temporary dataset used to store query results. private String temporaryDatasetId = null; // Temporary table used to store query results. private String temporaryTableId = null; private BigQueryTableRowIterator( - @Nullable TableReference ref, @Nullable String query, @Nullable String projectId, - Bigquery client, boolean flattenResults, boolean useLegacySql) { + @Nullable TableReference ref, @Nullable JobConfigurationQuery queryConfig, + @Nullable String projectId, Bigquery client) { this.ref = ref; - this.query = query; + this.queryConfig = queryConfig; this.projectId = projectId; this.client = checkNotNull(client, "client"); - this.flattenResults = flattenResults; - this.useLegacySql = useLegacySql; } /** * Constructs a {@code BigQueryTableRowIterator} that reads from the specified table. */ - public static BigQueryTableRowIterator fromTable(TableReference ref, Bigquery client) { + static BigQueryTableRowIterator fromTable(TableReference ref, Bigquery client) { checkNotNull(ref, "ref"); checkNotNull(client, "client"); - return new BigQueryTableRowIterator(ref, null, ref.getProjectId(), client, true, true); + return new BigQueryTableRowIterator(ref, /* queryConfig */null, ref.getProjectId(), client); } /** * Constructs a {@code BigQueryTableRowIterator} that reads from the results of executing the * specified query in the specified project. */ - public static BigQueryTableRowIterator fromQuery( - String query, String projectId, Bigquery client, @Nullable Boolean flattenResults, - @Nullable Boolean useLegacySql) { - checkNotNull(query, "query"); + static BigQueryTableRowIterator fromQuery( + JobConfigurationQuery queryConfig, String projectId, Bigquery client) { + checkNotNull(queryConfig, "queryConfig"); checkNotNull(projectId, "projectId"); checkNotNull(client, "client"); - return new BigQueryTableRowIterator(null, query, projectId, client, - MoreObjects.firstNonNull(flattenResults, Boolean.TRUE), - MoreObjects.firstNonNull(useLegacySql, Boolean.TRUE)); + return new BigQueryTableRowIterator(/* ref */null, queryConfig, projectId, client); } /** * Opens the table for read. * @throws IOException on failure */ - public void open() throws IOException, InterruptedException { - if (query != null) { + void open() throws IOException, InterruptedException { + if (queryConfig != null) { ref = executeQueryAndWaitForCompletion(); } // Get table schema. schema = getTable(ref).getSchema(); } - public boolean advance() throws IOException, InterruptedException { + boolean advance() throws IOException, InterruptedException { while (true) { if (iteratorOverCurrentBatch != null && iteratorOverCurrentBatch.hasNext()) { // Embed schema information into the raw row, so that values have an @@ -183,7 +173,7 @@ class BigQueryTableRowIterator implements AutoCloseable { } } - public TableRow getCurrent() { + TableRow getCurrent() { if (current == null) { throw new NoSuchElementException(); } @@ -193,7 +183,7 @@ class BigQueryTableRowIterator implements AutoCloseable { /** * Adjusts a field returned from the BigQuery API to match what we will receive when running * BigQuery's export-to-GCS and parallel read, which is the efficient parallel implementation - * used for batch jobs executed on the Cloud Dataflow service. + * used for batch jobs executed on the Beam Runners that perform initial splitting. * * <p>The following is the relationship between BigQuery schema and Java types: * @@ -254,7 +244,7 @@ class BigQueryTableRowIterator implements AutoCloseable { } /** - * A list of the field names that cannot be used in BigQuery tables processed by Dataflow, + * A list of the field names that cannot be used in BigQuery tables processed by Apache Beam, * because they are reserved keywords in {@link TableRow}. */ // TODO: This limitation is unfortunate. We need to give users a way to use BigQueryIO that does @@ -388,15 +378,17 @@ class BigQueryTableRowIterator implements AutoCloseable { */ private TableReference executeQueryAndWaitForCompletion() throws IOException, InterruptedException { + checkState(projectId != null, "Unable to execute a query without a configured project id"); + checkState(queryConfig != null, "Unable to execute a query without a configured query"); // Dry run query to get source table location Job dryRunJob = new Job() .setConfiguration(new JobConfiguration() - .setQuery(new JobConfigurationQuery() - .setQuery(query)) + .setQuery(queryConfig) .setDryRun(true)); JobStatistics jobStats = executeWithBackOff( client.jobs().insert(projectId, dryRunJob), - String.format("Error when trying to dry run query %s.", query)).getStatistics(); + String.format("Error when trying to dry run query %s.", + queryConfig.toPrettyString())).getStatistics(); // Let BigQuery to pick default location if the query does not read any tables. String location = null; @@ -409,35 +401,33 @@ class BigQueryTableRowIterator implements AutoCloseable { // Create a temporary dataset to store results. // Starting dataset name with an "_" so that it is hidden. Random rnd = new Random(System.currentTimeMillis()); - temporaryDatasetId = "_dataflow_temporary_dataset_" + rnd.nextInt(1000000); - temporaryTableId = "dataflow_temporary_table_" + rnd.nextInt(1000000); + temporaryDatasetId = "_beam_temporary_dataset_" + rnd.nextInt(1000000); + temporaryTableId = "beam_temporary_table_" + rnd.nextInt(1000000); createDataset(temporaryDatasetId, location); Job job = new Job(); JobConfiguration config = new JobConfiguration(); - JobConfigurationQuery queryConfig = new JobConfigurationQuery(); config.setQuery(queryConfig); job.setConfiguration(config); - queryConfig.setQuery(query); - queryConfig.setAllowLargeResults(true); - queryConfig.setFlattenResults(flattenResults); - queryConfig.setUseLegacySql(useLegacySql); TableReference destinationTable = new TableReference(); destinationTable.setProjectId(projectId); destinationTable.setDatasetId(temporaryDatasetId); destinationTable.setTableId(temporaryTableId); queryConfig.setDestinationTable(destinationTable); + queryConfig.setAllowLargeResults(true); Job queryJob = executeWithBackOff( client.jobs().insert(projectId, job), - String.format("Error when trying to execute the job for query %s.", query)); + String.format("Error when trying to execute the job for query %s.", + queryConfig.toPrettyString())); JobReference jobId = queryJob.getJobReference(); while (true) { Job pollJob = executeWithBackOff( client.jobs().get(projectId, jobId.getJobId()), - String.format("Error when trying to get status of the job for query %s.", query)); + String.format("Error when trying to get status of the job for query %s.", + queryConfig.toPrettyString())); JobStatus status = pollJob.getStatus(); if (status.getState().equals("DONE")) { // Job is DONE, but did not necessarily succeed. @@ -447,7 +437,8 @@ class BigQueryTableRowIterator implements AutoCloseable { } else { // There will be no temporary table to delete, so null out the reference. temporaryTableId = null; - throw new IOException("Executing query " + query + " failed: " + error.getMessage()); + throw new IOException(String.format( + "Executing query %s failed: %s", queryConfig.toPrettyString(), error.getMessage())); } } Uninterruptibles.sleepUninterruptibly( @@ -455,22 +446,11 @@ class BigQueryTableRowIterator implements AutoCloseable { } } - /** - * Execute a BQ request with exponential backoff and return the result. - * - * @deprecated use {@link #executeWithBackOff(AbstractGoogleClientRequest, String)}. - */ - @Deprecated - public static <T> T executeWithBackOff(AbstractGoogleClientRequest<T> client, String error, - Object... errorArgs) throws IOException, InterruptedException { - return executeWithBackOff(client, String.format(error, errorArgs)); - } - // Execute a BQ request with exponential backoff and return the result. // client - BQ request to be executed // error - Formatted message to log if when a request fails. Takes exception message as a // formatter parameter. - public static <T> T executeWithBackOff(AbstractGoogleClientRequest<T> client, String error) + private static <T> T executeWithBackOff(AbstractGoogleClientRequest<T> client, String error) throws IOException, InterruptedException { Sleeper sleeper = Sleeper.DEFAULT; BackOff backOff = http://git-wip-us.apache.org/repos/asf/beam/blob/21e2cf6b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index bbfc2ce..c0ce027 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -226,8 +226,7 @@ public class BigQueryIOTest implements Serializable { @Override public BigQueryJsonReader getReaderFromQuery( - BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten, - @Nullable Boolean useLegacySql) { + BigQueryOptions bqOptions, String projectId, JobConfigurationQuery queryConfig) { return new FakeBigQueryReader(jsonTableRowReturns); } http://git-wip-us.apache.org/repos/asf/beam/blob/21e2cf6b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java index a41b455..f84d412 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java @@ -175,14 +175,16 @@ public class BigQueryTableRowIteratorTest { // Mock job polling. JobStatus status = new JobStatus().setState("DONE"); - TableReference tableRef = - new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table"); - JobConfigurationQuery queryConfig = new JobConfigurationQuery().setDestinationTable(tableRef); + JobConfigurationQuery resultQueryConfig = new JobConfigurationQuery() + .setDestinationTable(new TableReference() + .setProjectId("project") + .setDatasetId("tempdataset") + .setTableId("temptable")); Job getJob = new Job() .setJobReference(new JobReference()) .setStatus(status) - .setConfiguration(new JobConfiguration().setQuery(queryConfig)); + .setConfiguration(new JobConfiguration().setQuery(resultQueryConfig)); when(mockJobsGet.execute()).thenReturn(getJob); // Mock table schema fetch. @@ -198,8 +200,9 @@ public class BigQueryTableRowIteratorTest { // Run query and verify String query = "SELECT name, count, photo, anniversary_date, " + "anniversary_datetime, anniversary_time from table"; + JobConfigurationQuery queryConfig = new JobConfigurationQuery().setQuery(query); try (BigQueryTableRowIterator iterator = - BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null, null)) { + BigQueryTableRowIterator.fromQuery(queryConfig, "project", mockClient)) { iterator.open(); assertTrue(iterator.advance()); TableRow row = iterator.getCurrent(); @@ -240,7 +243,7 @@ public class BigQueryTableRowIteratorTest { verify(mockTablesDelete).execute(); // Table data read. verify(mockClient).tabledata(); - verify(mockTabledata).list("project", "dataset", "table"); + verify(mockTabledata).list("project", "tempdataset", "temptable"); verify(mockTabledataList).execute(); } @@ -257,14 +260,16 @@ public class BigQueryTableRowIteratorTest { // Mock job polling. JobStatus status = new JobStatus().setState("DONE"); - TableReference tableRef = - new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table"); - JobConfigurationQuery queryConfig = new JobConfigurationQuery().setDestinationTable(tableRef); + JobConfigurationQuery resultQueryConfig = new JobConfigurationQuery() + .setDestinationTable(new TableReference() + .setProjectId("project") + .setDatasetId("tempdataset") + .setTableId("temptable")); Job getJob = new Job() .setJobReference(new JobReference()) .setStatus(status) - .setConfiguration(new JobConfiguration().setQuery(queryConfig)); + .setConfiguration(new JobConfiguration().setQuery(resultQueryConfig)); when(mockJobsGet.execute()).thenReturn(getJob); // Mock table schema fetch. @@ -280,8 +285,9 @@ public class BigQueryTableRowIteratorTest { String query = String.format( "SELECT \"Arthur\" as name, 42 as count, \"%s\" as photo", photoBytesEncoded); + JobConfigurationQuery queryConfig = new JobConfigurationQuery().setQuery(query); try (BigQueryTableRowIterator iterator = - BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null, null)) { + BigQueryTableRowIterator.fromQuery(queryConfig, "project", mockClient)) { iterator.open(); assertTrue(iterator.advance()); TableRow row = iterator.getCurrent(); @@ -316,7 +322,7 @@ public class BigQueryTableRowIteratorTest { verify(mockTablesDelete).execute(); // Table data read. verify(mockClient).tabledata(); - verify(mockTabledata).list("project", "dataset", "table"); + verify(mockTabledata).list("project", "tempdataset", "temptable"); verify(mockTabledataList).execute(); } @@ -332,19 +338,16 @@ public class BigQueryTableRowIteratorTest { Exception exception = new IOException(errorReason); when(mockJobsInsert.execute()).thenThrow(exception, exception, exception, exception); - String query = "NOT A QUERY"; + JobConfigurationQuery queryConfig = new JobConfigurationQuery().setQuery("NOT A QUERY"); try (BigQueryTableRowIterator iterator = - BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null, null)) { - - try { - iterator.open(); - fail(); - } catch (Exception expected) { - // Verify message explains cause and reports the query. - assertThat(expected.getMessage(), containsString("Error")); - assertThat(expected.getMessage(), containsString(query)); - assertThat(expected.getCause().getMessage(), containsString(errorReason)); - } + BigQueryTableRowIterator.fromQuery(queryConfig, "project", mockClient)) { + iterator.open(); + fail(); + } catch (Exception expected) { + // Verify message explains cause and reports the query. + assertThat(expected.getMessage(), containsString("Error")); + assertThat(expected.getMessage(), containsString("NOT A QUERY")); + assertThat(expected.getCause().getMessage(), containsString(errorReason)); } // Job inserted to run the query, then polled once.