This is an automated email from the ASF dual-hosted git repository. chamikara pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 20405c3 [BEAM-3774] Adds support for reading from/writing to more BQ geographical locations (#5001) 20405c3 is described below commit 20405c3eb5d5a58176ab93e62fa730f76758e208 Author: Chamikara Jayalath <chamik...@google.com> AuthorDate: Wed Apr 4 14:20:23 2018 -0700 [BEAM-3774] Adds support for reading from/writing to more BQ geographical locations (#5001) * Adds support for reading from/writing to BigQuery datasets that are not in US or EU locations. * Addressing reviewer comments. --- .../beam/sdk/io/gcp/bigquery/BigQueryHelpers.java | 21 ++++++++- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 28 ++++++++++- .../sdk/io/gcp/bigquery/BigQueryQuerySource.java | 54 +++++++++++++++------- .../beam/sdk/io/gcp/bigquery/BigQueryServices.java | 2 +- .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 10 ++-- .../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 17 ++++--- .../beam/sdk/io/gcp/bigquery/WriteTables.java | 11 +++-- .../sdk/io/gcp/bigquery/BigQueryIOReadTest.java | 6 ++- .../beam/sdk/io/gcp/bigquery/FakeJobService.java | 2 +- 9 files changed, 115 insertions(+), 36 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java index 96a0622..29b405b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static com.google.common.base.Preconditions.checkState; +import com.google.api.services.bigquery.model.Dataset; import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobStatus; import com.google.api.services.bigquery.model.TableReference; @@ -203,12 +204,28 @@ public class BigQueryHelpers { } else { throw new RuntimeException( String.format( - UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "dataset", toTableSpec(table)), - e); + UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "dataset", toTableSpec(table)), e); } } } + static String getDatasetLocation( + DatasetService datasetService, String projectId, String datasetId) { + Dataset dataset; + try { + dataset = datasetService.getDataset(projectId, datasetId); + } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + throw new RuntimeException( + String.format( + "unable to obtain dataset for dataset %s in project %s", datasetId, projectId), + e); + } + return dataset.getLocation(); + } + static void verifyTablePresence(DatasetService datasetService, TableReference table) { try { datasetService.getTable(table); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 88de9b4..fab238c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -170,6 +170,13 @@ import org.slf4j.LoggerFactory; * .fromQuery("SELECT year, mean_temp FROM [samples.weather_stations]")); * }</pre> * + * <p>Users can optionally specify a query priority using {@link TypedRead#withQueryPriority( + * TypedRead.QueryPriority)} and a geographic location where the query will be executed using {@link + * TypedRead#withQueryLocation(String)}. Query location must be specified for jobs that are not + * executed in US or EU. See <a + * href="https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query">BigQuery Jobs: + * query</a>. + * * <h3>Writing</h3> * * <p>To write to a BigQuery table, apply a {@link BigQueryIO.Write} transformation. This consumes a @@ -549,6 +556,7 @@ public class BigQueryIO { abstract Builder<T> setWithTemplateCompatibility(Boolean useTemplateCompatibility); abstract Builder<T> setBigQueryServices(BigQueryServices bigQueryServices); abstract Builder<T> setQueryPriority(QueryPriority priority); + abstract Builder<T> setQueryLocation(String location); abstract TypedRead<T> build(); abstract Builder<T> setParseFn( @@ -570,6 +578,8 @@ public class BigQueryIO { @Nullable abstract QueryPriority getQueryPriority(); + @Nullable abstract String getQueryLocation(); + @Nullable abstract Coder<T> getCoder(); /** @@ -632,7 +642,8 @@ public class BigQueryIO { getBigQueryServices(), coder, getParseFn(), - MoreObjects.firstNonNull(getQueryPriority(), QueryPriority.BATCH)); + MoreObjects.firstNonNull(getQueryPriority(), QueryPriority.BATCH), + getQueryLocation()); } return source; } @@ -687,7 +698,8 @@ public class BigQueryIO { new JobConfigurationQuery() .setQuery(getQuery().get()) .setFlattenResults(getFlattenResults()) - .setUseLegacySql(getUseLegacySql())); + .setUseLegacySql(getUseLegacySql()), + getQueryLocation()); } catch (Exception e) { throw new IllegalArgumentException( String.format(QUERY_VALIDATION_FAILURE_ERROR, getQuery().get()), e); @@ -939,6 +951,18 @@ public class BigQueryIO { return toBuilder().setQueryPriority(priority).build(); } + /** + * BigQuery geographic location where the query <a + * href="https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs">job</a> will be + * executed. If not specified, Beam tries to determine the location by examining the tables + * referenced by the query. Location must be specified for queries not executed in US or EU. See + * <a href="https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query">BigQuery Jobs: + * query</a>. + */ + public TypedRead<T> withQueryLocation(String location) { + return toBuilder().setQueryLocation(location).build(); + } + @Experimental(Experimental.Kind.SOURCE_SINK) public TypedRead<T> withTemplateCompatibility() { return toBuilder().setWithTemplateCompatibility(true).build(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java index 34d7c68..f380b7d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java @@ -59,9 +59,18 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> { BigQueryServices bqServices, Coder<T> coder, SerializableFunction<SchemaAndRecord, T> parseFn, - QueryPriority priority) { + QueryPriority priority, + String location) { return new BigQueryQuerySource<>( - stepUuid, query, flattenResults, useLegacySql, bqServices, coder, parseFn, priority); + stepUuid, + query, + flattenResults, + useLegacySql, + bqServices, + coder, + parseFn, + priority, + location); } private final ValueProvider<String> query; @@ -69,6 +78,7 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> { private final Boolean useLegacySql; private transient AtomicReference<JobStatistics> dryRunJobStats; private final QueryPriority priority; + private final String location; private BigQueryQuerySource( String stepUuid, @@ -78,13 +88,15 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> { BigQueryServices bqServices, Coder<T> coder, SerializableFunction<SchemaAndRecord, T> parseFn, - QueryPriority priority) { + QueryPriority priority, + String location) { super(stepUuid, bqServices, coder, parseFn); this.query = checkNotNull(query, "query"); this.flattenResults = checkNotNull(flattenResults, "flattenResults"); this.useLegacySql = checkNotNull(useLegacySql, "useLegacySql"); this.dryRunJobStats = new AtomicReference<>(); this.priority = priority; + this.location = location; } @Override @@ -97,13 +109,17 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> { protected TableReference getTableToExtract(BigQueryOptions bqOptions) throws IOException, InterruptedException { // 1. Find the location of the query. - String location = null; - List<TableReference> referencedTables = - dryRunQueryIfNeeded(bqOptions).getQuery().getReferencedTables(); + String location = this.location; DatasetService tableService = bqServices.getDatasetService(bqOptions); - if (referencedTables != null && !referencedTables.isEmpty()) { - TableReference queryTable = referencedTables.get(0); - location = tableService.getTable(queryTable).getLocation(); + if (location == null) { + // If location was not provided we try to determine it from the tables referenced by the + // Query. This will only work for BQ locations US and EU. + List<TableReference> referencedTables = + dryRunQueryIfNeeded(bqOptions).getQuery().getReferencedTables(); + if (referencedTables != null && !referencedTables.isEmpty()) { + TableReference queryTable = referencedTables.get(0); + location = tableService.getTable(queryTable).getLocation(); + } } String jobIdToken = createJobIdToken(bqOptions.getJobName(), stepUuid); @@ -125,7 +141,8 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> { // 3. Execute the query. executeQuery( - jobIdToken, bqOptions.getProject(), tableToExtract, bqServices.getJobService(bqOptions)); + jobIdToken, bqOptions.getProject(), tableToExtract, bqServices.getJobService(bqOptions), + location); return tableToExtract; } @@ -151,8 +168,10 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> { private synchronized JobStatistics dryRunQueryIfNeeded(BigQueryOptions bqOptions) throws InterruptedException, IOException { if (dryRunJobStats.get() == null) { - JobStatistics jobStats = bqServices.getJobService(bqOptions).dryRunQuery( - bqOptions.getProject(), createBasicQueryConfig()); + JobStatistics jobStats = + bqServices + .getJobService(bqOptions) + .dryRunQuery(bqOptions.getProject(), createBasicQueryConfig(), this.location); dryRunJobStats.compareAndSet(null, jobStats); } return dryRunJobStats.get(); @@ -162,7 +181,8 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> { String jobIdToken, String executingProject, TableReference destinationTable, - JobService jobService) throws IOException, InterruptedException { + JobService jobService, + String bqLocation) throws IOException, InterruptedException { // Generate a transient (random) query job ID, because this code may be retried after the // temporary dataset and table have already been deleted by a previous attempt - // in that case we want to re-generate the temporary dataset and table, and we'll need @@ -174,9 +194,11 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> { destinationTable, queryJobId); - JobReference jobRef = new JobReference() - .setProjectId(executingProject) - .setJobId(queryJobId); + JobReference jobRef = + new JobReference() + .setProjectId(executingProject) + .setLocation(bqLocation) + .setJobId(queryJobId); JobConfigurationQuery queryConfig = createBasicQueryConfig() .setAllowLargeResults(true) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java index dde005d..1295cc0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java @@ -85,7 +85,7 @@ interface BigQueryServices extends Serializable { /** * Dry runs the query in the given project. */ - JobStatistics dryRunQuery(String projectId, JobConfigurationQuery queryConfig) + JobStatistics dryRunQuery(String projectId, JobConfigurationQuery queryConfig, String location) throws InterruptedException, IOException; /** diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 6c76688..9771733 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -254,7 +254,9 @@ class BigQueryServicesImpl implements BigQueryServices { BackOff backoff) throws InterruptedException { do { try { - Job job = client.jobs().get(jobRef.getProjectId(), jobRef.getJobId()).execute(); + Job job = client.jobs().get( + jobRef.getProjectId(), jobRef.getJobId()).setLocation( + jobRef.getLocation()).execute(); JobStatus status = job.getStatus(); if (status != null && status.getState() != null && status.getState().equals("DONE")) { LOG.info("BigQuery job {} completed in state DONE", jobRef); @@ -281,9 +283,11 @@ class BigQueryServicesImpl implements BigQueryServices { } @Override - public JobStatistics dryRunQuery(String projectId, JobConfigurationQuery queryConfig) + public JobStatistics dryRunQuery(String projectId, JobConfigurationQuery queryConfig, + String location) throws InterruptedException, IOException { - Job job = new Job() + JobReference jobRef = new JobReference().setLocation(location).setProjectId(projectId); + Job job = new Job().setJobReference(jobRef) .setConfiguration(new JobConfiguration() .setQuery(queryConfig) .setDryRun(true)); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java index a15afed..6d82c56 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java @@ -101,7 +101,8 @@ abstract class BigQuerySourceBase<T> extends BoundedSource<T> { protected ExtractResult extractFiles(PipelineOptions options) throws Exception { BigQueryOptions bqOptions = options.as(BigQueryOptions.class); TableReference tableToExtract = getTableToExtract(bqOptions); - Table table = bqServices.getDatasetService(bqOptions).getTable(tableToExtract); + BigQueryServices.DatasetService datasetService = bqServices.getDatasetService(bqOptions); + Table table = datasetService.getTable(tableToExtract); if (table == null) { throw new IOException(String.format( "Cannot start an export job since table %s does not exist", @@ -113,13 +114,17 @@ abstract class BigQuerySourceBase<T> extends BoundedSource<T> { String extractJobId = getExtractJobId(createJobIdToken(options.getJobName(), stepUuid)); final String extractDestinationDir = resolveTempLocation(bqOptions.getTempLocation(), "BigQueryExtractTemp", stepUuid); + String bqLocation = + BigQueryHelpers.getDatasetLocation( + datasetService, tableToExtract.getProjectId(), tableToExtract.getDatasetId()); List<ResourceId> tempFiles = executeExtract( extractJobId, tableToExtract, jobService, bqOptions.getProject(), - extractDestinationDir); + extractDestinationDir, + bqLocation); return new ExtractResult(schema, tempFiles); } @@ -160,11 +165,11 @@ abstract class BigQuerySourceBase<T> extends BoundedSource<T> { private List<ResourceId> executeExtract( String jobId, TableReference table, JobService jobService, String executingProject, - String extractDestinationDir) + String extractDestinationDir, String bqLocation) throws InterruptedException, IOException { - JobReference jobRef = new JobReference() - .setProjectId(executingProject) - .setJobId(jobId); + + JobReference jobRef = + new JobReference().setProjectId(executingProject).setLocation(bqLocation).setJobId(jobId); String destinationUri = BigQueryIO.getExtractDestinationUri(extractDestinationDir); JobConfigurationExtract extract = new JobConfigurationExtract() diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index 339003d..cd128a1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -228,8 +228,6 @@ class WriteTables<DestinationT> return writeTablesOutputs.get(mainOutputTag); } - - private void load( JobService jobService, DatasetService datasetService, @@ -255,13 +253,20 @@ class WriteTables<DestinationT> } String projectId = ref.getProjectId(); Job lastFailedLoadJob = null; + String bqLocation = + BigQueryHelpers.getDatasetLocation(datasetService, ref.getProjectId(), ref.getDatasetId()); for (int i = 0; i < BatchLoads.MAX_RETRY_JOBS; ++i) { String jobId = jobIdPrefix + "-" + i; - JobReference jobRef = new JobReference().setProjectId(projectId).setJobId(jobId); + + JobReference jobRef = + new JobReference().setProjectId(projectId).setJobId(jobId).setLocation(bqLocation); + LOG.info("Loading {} files into {} using job {}, attempt {}", gcsUris.size(), ref, jobRef, i); jobService.startLoadJob(jobRef, loadConfig); LOG.info("Load job {} started", jobRef); + Job loadJob = jobService.pollJob(jobRef, BatchLoads.LOAD_JOB_POLL_MAX_RETRIES); + Status jobStatus = BigQueryHelpers.parseStatus(loadJob); switch (jobStatus) { case SUCCEEDED: diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java index 6b4cf53..b6fbe49 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java @@ -614,7 +614,8 @@ public class BigQueryIOReadTest implements Serializable { fakeBqServices, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE, - QueryPriority.BATCH); + QueryPriority.BATCH, + null); options.setTempLocation(testFolder.getRoot().getAbsolutePath()); TableReference queryTable = new TableReference() @@ -693,7 +694,8 @@ public class BigQueryIOReadTest implements Serializable { fakeBqServices, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE, - QueryPriority.BATCH); + QueryPriority.BATCH, + null); options.setTempLocation(testFolder.getRoot().getAbsolutePath()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java index e1edd83..ac715a3 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java @@ -228,7 +228,7 @@ class FakeJobService implements JobService, Serializable { } @Override - public JobStatistics dryRunQuery(String projectId, JobConfigurationQuery query) + public JobStatistics dryRunQuery(String projectId, JobConfigurationQuery query, String location) throws InterruptedException, IOException { synchronized (dryRunQueryResults) { JobStatistics result = dryRunQueryResults.get(projectId, query.getQuery()); -- To stop receiving notification emails like this one, please contact chamik...@apache.org.