Repository: incubator-beam Updated Branches: refs/heads/master 2492604e4 -> 570de74da
BigQueryIO: port trivial fixes from Dataflow version. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/39b9de5f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/39b9de5f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/39b9de5f Branch: refs/heads/master Commit: 39b9de5feab7be37f88e44e99784375a8ae82bc7 Parents: 2492604 Author: Pei He <pe...@google.com> Authored: Mon Oct 3 21:19:37 2016 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Mon Oct 10 09:01:29 2016 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 21 +++++++------------- 1 file changed, 7 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/39b9de5f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 716fe39..3d1aba6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -268,12 +268,6 @@ public class BigQueryIO { private static final Pattern TABLE_SPEC = Pattern.compile(DATASET_TABLE_REGEXP); - // TODO: make this private and remove improper access from BigQueryIOTranslator. - public static final String SET_PROJECT_FROM_OPTIONS_WARNING = - "No project specified for BigQuery table \"%1$s.%2$s\". Assuming it is in \"%3$s\". If the" - + " table is in a different project please specify it as a part of the BigQuery table" - + " definition."; - private static final String RESOURCE_NOT_FOUND_ERROR = "BigQuery %1$s not found for table \"%2$s\" . Please create the %1$s before pipeline" + " execution. If the %1$s is created by an earlier stage of the pipeline, this" @@ -614,6 +608,7 @@ public class BigQueryIO { JobReference jobRef = new JobReference() .setProjectId(executingProject) .setJobId(getExtractJobId(jobIdToken)); + Job extractJob = bqServices.getJobService(bqOptions) .getJob(jobRef); @@ -805,8 +800,7 @@ public class BigQueryIO { BigQueryServices bqServices, String executingProject) { super(jobIdToken, extractDestinationDir, bqServices, executingProject); - checkNotNull(table, "table"); - this.jsonTable = toJsonString(table); + this.jsonTable = toJsonString(checkNotNull(table, "table")); this.tableSizeBytes = new AtomicReference<>(); } @@ -960,6 +954,7 @@ public class BigQueryIO { super.populateDisplayData(builder); builder.add(DisplayData.item("query", query)); } + private synchronized JobStatistics dryRunQueryIfNeeded(BigQueryOptions bqOptions) throws InterruptedException, IOException { if (dryRunJobStats.get() == null) { @@ -1755,10 +1750,8 @@ public class BigQueryIO { new StreamWithDeDup(getTable(), tableRefFunction, getSchema(), bqServices)); } - TableReference table = fromJsonString(jsonTableRef, TableReference.class); - if (Strings.isNullOrEmpty(table.getProjectId())) { - table.setProjectId(options.getProject()); - } + TableReference table = getTableWithDefaultProject(options); + String jobIdToken = "beam_job_" + randomUUIDString(); String tempLocation = options.getTempLocation(); String tempFilePrefix; @@ -2653,7 +2646,7 @@ public class BigQueryIO { public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - builder.addIfNotNull(DisplayData.item("tableSpec", tableSpec)); + builder.addIfNotNull(DisplayData.item("table", tableSpec)); if (tableRefFunction != null) { builder.add(DisplayData.item("tableFn", tableRefFunction.getClass()) .withLabel("Table Reference Function")); @@ -2745,7 +2738,7 @@ public class BigQueryIO { UNKNOWN, } - private static Status parseStatus(Job job) { + private static Status parseStatus(@Nullable Job job) { if (job == null) { return Status.UNKNOWN; }