Repository: incubator-beam Updated Branches: refs/heads/master 437393712 -> 321547fb1
BigQueryIO.Write: support runtime schema and table Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fd6d09c3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fd6d09c3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fd6d09c3 Branch: refs/heads/master Commit: fd6d09c32f6bcf67c63ec74548373ee90d67f2bd Parents: 4373937 Author: Sam McVeety <s...@google.com> Authored: Sun Dec 4 14:16:23 2016 -0800 Committer: Dan Halperin <dhalp...@google.com> Committed: Mon Dec 12 11:14:20 2016 -0800 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 217 +++++++++++++------ .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 60 ++++- 2 files changed, 206 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fd6d09c3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index f99ca78..0be8567 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -321,6 +321,23 @@ public class BigQueryIO { return sb.toString(); } + @VisibleForTesting + static class JsonSchemaToTableSchema + implements SerializableFunction<String, TableSchema> { + @Override + public TableSchema apply(String from) { + return fromJsonString(from, TableSchema.class); + } + } + + private static class TableSchemaToJsonSchema + implements SerializableFunction<TableSchema, String> { + @Override + public String apply(TableSchema from) { + return toJsonString(from); + } + } + private static class JsonTableRefToTableRef implements SerializableFunction<String, TableReference> { @Override @@ -329,6 +346,14 @@ public class BigQueryIO { } } + private static class TableRefToTableSpec + implements SerializableFunction<TableReference, String> { + @Override + public String apply(TableReference from) { + return toTableSpec(from); + } + } + private static class TableRefToJson implements SerializableFunction<TableReference, String> { @Override @@ -353,6 +378,15 @@ public class BigQueryIO { } } + @Nullable + private static ValueProvider<String> displayTable( + @Nullable ValueProvider<TableReference> table) { + if (table == null) { + return null; + } + return NestedValueProvider.of(table, new TableRefToTableSpec()); + } + /** * A {@link PTransform} that reads from a BigQuery table and returns a * {@link PCollection} of {@link TableRow TableRows} containing each of the rows of the table. @@ -659,11 +693,11 @@ public class BigQueryIO { .setProjectId(executingProject) .setDatasetId(queryTempDatasetId) .setTableId(queryTempTableId); + String jsonTableRef = toJsonString(queryTempTableRef); source = BigQueryQuerySource.create( jobIdToken, query, NestedValueProvider.of( - StaticValueProvider.of( - toJsonString(queryTempTableRef)), new JsonTableRefToTableRef()), + StaticValueProvider.of(jsonTableRef), new JsonTableRefToTableRef()), flattenResults, useLegacySql, extractDestinationDir, bqServices); } else { ValueProvider<TableReference> inputTable = getTableWithDefaultProject(bqOptions); @@ -712,17 +746,10 @@ public class BigQueryIO { @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - TableReference table = getTable(); - - if (table != null) { - builder.add(DisplayData.item("table", toTableSpec(table)) - .withLabel("Table")); - } - String queryString = query == null - ? null : query.isAccessible() - ? query.get() : query.toString(); builder - .addIfNotNull(DisplayData.item("query", queryString) + .addIfNotNull(DisplayData.item("table", displayTable(getTableProvider())) + .withLabel("Table")) + .addIfNotNull(DisplayData.item("query", query) .withLabel("Query")) .addIfNotNull(DisplayData.item("flattenResults", flattenResults) .withLabel("Flatten Query Results")) @@ -752,10 +779,10 @@ public class BigQueryIO { if (Strings.isNullOrEmpty(table.get().getProjectId())) { // If user does not specify a project we assume the table to be located in // the default project. - TableReference ref = table.get(); - ref.setProjectId(bqOptions.getProject()); + TableReference tableRef = table.get(); + tableRef.setProjectId(bqOptions.getProject()); return NestedValueProvider.of(StaticValueProvider.of( - toJsonString(ref)), new JsonTableRefToTableRef()); + toJsonString(tableRef)), new JsonTableRefToTableRef()); } return table; } @@ -941,8 +968,7 @@ public class BigQueryIO { @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - String table = jsonTable.isAccessible() ? jsonTable.get() : jsonTable.toString(); - builder.add(DisplayData.item("table", table)); + builder.add(DisplayData.item("table", jsonTable)); } } @@ -1060,7 +1086,7 @@ public class BigQueryIO { @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - builder.add(DisplayData.item("query", query.get())); + builder.add(DisplayData.item("query", query)); } private synchronized JobStatistics dryRunQueryIfNeeded(BigQueryOptions bqOptions) @@ -1516,6 +1542,11 @@ public class BigQueryIO { } /** Creates a write transformation for the given table. */ + public static Bound to(ValueProvider<String> tableSpec) { + return new Bound().to(tableSpec); + } + + /** Creates a write transformation for the given table. */ public static Bound to(TableReference table) { return new Bound().to(table); } @@ -1558,6 +1589,13 @@ public class BigQueryIO { return new Bound().withSchema(schema); } + /** + * Like {@link #withSchema(TableSchema)}, but with a {@link ValueProvider}. + */ + public static Bound withSchema(ValueProvider<TableSchema> schema) { + return new Bound().withSchema(schema); + } + /** Creates a write transformation with the specified options for creating the table. */ public static Bound withCreateDisposition(CreateDisposition disposition) { return new Bound().withCreateDisposition(disposition); @@ -1593,12 +1631,12 @@ public class BigQueryIO { // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes. static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE; - @Nullable final String jsonTableRef; + @Nullable final ValueProvider<String> jsonTableRef; @Nullable final SerializableFunction<BoundedWindow, TableReference> tableRefFunction; // Table schema. The schema is required only if the table does not exist. - @Nullable final String jsonSchema; + @Nullable final ValueProvider<String> jsonSchema; // Options for creating the table. Valid values are CREATE_IF_NEEDED and // CREATE_NEVER. @@ -1645,9 +1683,9 @@ public class BigQueryIO { null /* bigQueryServices */); } - private Bound(String name, @Nullable String jsonTableRef, + private Bound(String name, @Nullable ValueProvider<String> jsonTableRef, @Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction, - @Nullable String jsonSchema, + @Nullable ValueProvider<String> jsonSchema, CreateDisposition createDisposition, WriteDisposition writeDisposition, boolean validate, @Nullable BigQueryServices bigQueryServices) { super(name); @@ -1667,7 +1705,8 @@ public class BigQueryIO { * <p>Does not modify this object. */ public Bound to(String tableSpec) { - return to(parseTableSpec(tableSpec)); + return toTableRef(NestedValueProvider.of( + StaticValueProvider.of(tableSpec), new TableSpecToTableRef())); } /** @@ -1676,7 +1715,28 @@ public class BigQueryIO { * <p>Does not modify this object. */ public Bound to(TableReference table) { - return new Bound(name, toJsonString(table), tableRefFunction, jsonSchema, createDisposition, + return to(StaticValueProvider.of(toTableSpec(table))); + } + + /** + * Returns a copy of this write transformation, but writing to the specified table. Refer to + * {@link #parseTableSpec(String)} for the specification format. + * + * <p>Does not modify this object. + */ + public Bound to(ValueProvider<String> tableSpec) { + return toTableRef(NestedValueProvider.of(tableSpec, new TableSpecToTableRef())); + } + + /** + * Returns a copy of this write transformation, but writing to the specified table. + * + * <p>Does not modify this object. + */ + private Bound toTableRef(ValueProvider<TableReference> table) { + return new Bound(name, + NestedValueProvider.of(table, new TableRefToJson()), + tableRefFunction, jsonSchema, createDisposition, writeDisposition, validate, bigQueryServices); } @@ -1716,7 +1776,17 @@ public class BigQueryIO { * <p>Does not modify this object. */ public Bound withSchema(TableSchema schema) { - return new Bound(name, jsonTableRef, tableRefFunction, toJsonString(schema), + return new Bound(name, jsonTableRef, tableRefFunction, + StaticValueProvider.of(toJsonString(schema)), + createDisposition, writeDisposition, validate, bigQueryServices); + } + + /** + * Like {@link #withSchema(TableSchema)}, but with a {@link ValueProvider}. + */ + public Bound withSchema(ValueProvider<TableSchema> schema) { + return new Bound(name, jsonTableRef, tableRefFunction, + NestedValueProvider.of(schema, new TableSchemaToJsonSchema()), createDisposition, writeDisposition, validate, bigQueryServices); } @@ -1798,7 +1868,7 @@ public class BigQueryIO { // The user specified a table. if (jsonTableRef != null && validate) { - TableReference table = getTableWithDefaultProject(options); + TableReference table = getTableWithDefaultProject(options).get(); DatasetService datasetService = getBigQueryServices().getDatasetService(options); // Check for destination table presence and emptiness for early failure notification. @@ -1855,10 +1925,11 @@ public class BigQueryIO { // StreamWithDeDup and BigQuery's streaming import API. if (input.isBounded() == IsBounded.UNBOUNDED || tableRefFunction != null) { return input.apply( - new StreamWithDeDup(getTable(), tableRefFunction, getSchema(), bqServices)); + new StreamWithDeDup(getTable(), tableRefFunction, + NestedValueProvider.of(jsonSchema, new JsonSchemaToTableSchema()), bqServices)); } - TableReference table = getTableWithDefaultProject(options); + ValueProvider<TableReference> table = getTableWithDefaultProject(options); String jobIdToken = "beam_job_" + randomUUIDString(); String tempLocation = options.getTempLocation(); @@ -1909,7 +1980,7 @@ public class BigQueryIO { bqServices, jobIdToken, tempFilePrefix, - toJsonString(table), + NestedValueProvider.of(table, new TableRefToJson()), jsonSchema, WriteDisposition.WRITE_EMPTY, CreateDisposition.CREATE_IF_NEEDED))); @@ -1920,7 +1991,7 @@ public class BigQueryIO { .of(new WriteRename( bqServices, jobIdToken, - toJsonString(table), + NestedValueProvider.of(table, new TableRefToJson()), writeDisposition, createDisposition, tempTablesView)) @@ -1934,7 +2005,7 @@ public class BigQueryIO { bqServices, jobIdToken, tempFilePrefix, - toJsonString(table), + NestedValueProvider.of(table, new TableRefToJson()), jsonSchema, writeDisposition, createDisposition))); @@ -2031,7 +2102,8 @@ public class BigQueryIO { /** Returns the table schema. */ public TableSchema getSchema() { - return fromJsonString(jsonSchema, TableSchema.class); + return fromJsonString( + jsonSchema == null ? null : jsonSchema.get(), TableSchema.class); } /** @@ -2039,20 +2111,32 @@ public class BigQueryIO { * * <p>If the table's project is not specified, use the executing project. */ - @Nullable private TableReference getTableWithDefaultProject(BigQueryOptions bqOptions) { - TableReference table = getTable(); - if (table != null && Strings.isNullOrEmpty(table.getProjectId())) { + @Nullable private ValueProvider<TableReference> getTableWithDefaultProject( + BigQueryOptions bqOptions) { + ValueProvider<TableReference> table = getTable(); + if (table == null) { + return table; + } + if (!table.isAccessible()) { + LOG.info("Using a dynamic value for table input. This must contain a project" + + " in the table reference: {}", table); + return table; + } + if (Strings.isNullOrEmpty(table.get().getProjectId())) { // If user does not specify a project we assume the table to be located in // the default project. - table.setProjectId(bqOptions.getProject()); + TableReference tableRef = table.get(); + tableRef.setProjectId(bqOptions.getProject()); + return NestedValueProvider.of(StaticValueProvider.of( + toJsonString(tableRef)), new JsonTableRefToTableRef()); } return table; } /** Returns the table reference, or {@code null}. */ @Nullable - public TableReference getTable() { - return fromJsonString(jsonTableRef, TableReference.class); + public ValueProvider<TableReference> getTable() { + return NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef()); } /** Returns {@code true} if table validation is enabled. */ @@ -2172,8 +2256,8 @@ public class BigQueryIO { private final BigQueryServices bqServices; private final String jobIdToken; private final String tempFilePrefix; - private final String jsonTableRef; - private final String jsonSchema; + private final ValueProvider<String> jsonTableRef; + private final ValueProvider<String> jsonSchema; private final WriteDisposition writeDisposition; private final CreateDisposition createDisposition; @@ -2182,8 +2266,8 @@ public class BigQueryIO { BigQueryServices bqServices, String jobIdToken, String tempFilePrefix, - String jsonTableRef, - String jsonSchema, + ValueProvider<String> jsonTableRef, + ValueProvider<String> jsonSchema, WriteDisposition writeDisposition, CreateDisposition createDisposition) { this.singlePartition = singlePartition; @@ -2200,7 +2284,7 @@ public class BigQueryIO { public void processElement(ProcessContext c) throws Exception { List<String> partition = Lists.newArrayList(c.element().getValue()).get(0); String jobIdPrefix = String.format(jobIdToken + "_%05d", c.element().getKey()); - TableReference ref = fromJsonString(jsonTableRef, TableReference.class); + TableReference ref = fromJsonString(jsonTableRef.get(), TableReference.class); if (!singlePartition) { ref.setTableId(jobIdPrefix); } @@ -2209,7 +2293,8 @@ public class BigQueryIO { bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), jobIdPrefix, ref, - fromJsonString(jsonSchema, TableSchema.class), + fromJsonString( + jsonSchema == null ? null : jsonSchema.get(), TableSchema.class), partition, writeDisposition, createDisposition); @@ -2242,16 +2327,15 @@ public class BigQueryIO { .setProjectId(projectId) .setJobId(jobId); jobService.startLoadJob(jobRef, loadConfig); - Job job = jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES); - Status jobStatus = parseStatus(job); + Status jobStatus = + parseStatus(jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES)); switch (jobStatus) { case SUCCEEDED: return; case UNKNOWN: throw new RuntimeException("Failed to poll the load job status of job " + jobId); case FAILED: - LOG.info("BigQuery load job failed. Status: {} Details: {}", - jobId, job.getStatus()); + LOG.info("BigQuery load job failed: {}", jobId); continue; default: throw new IllegalStateException(String.format("Unexpected job status: %s of job %s", @@ -2306,7 +2390,7 @@ public class BigQueryIO { static class WriteRename extends DoFn<String, Void> { private final BigQueryServices bqServices; private final String jobIdToken; - private final String jsonTableRef; + private final ValueProvider<String> jsonTableRef; private final WriteDisposition writeDisposition; private final CreateDisposition createDisposition; private final PCollectionView<Iterable<String>> tempTablesView; @@ -2314,7 +2398,7 @@ public class BigQueryIO { public WriteRename( BigQueryServices bqServices, String jobIdToken, - String jsonTableRef, + ValueProvider<String> jsonTableRef, WriteDisposition writeDisposition, CreateDisposition createDisposition, PCollectionView<Iterable<String>> tempTablesView) { @@ -2342,7 +2426,7 @@ public class BigQueryIO { copy( bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), jobIdToken, - fromJsonString(jsonTableRef, TableReference.class), + fromJsonString(jsonTableRef.get(), TableReference.class), tempTables, writeDisposition, createDisposition); @@ -2475,7 +2559,7 @@ public class BigQueryIO { private static class StreamingWriteFn extends DoFn<KV<ShardedKey<String>, TableRowInfo>, Void> { /** TableSchema in JSON. Use String to make the class Serializable. */ - private final String jsonTableSchema; + private final ValueProvider<String> jsonTableSchema; private final BigQueryServices bqServices; @@ -2495,8 +2579,9 @@ public class BigQueryIO { createAggregator("ByteCount", new Sum.SumLongFn()); /** Constructor. */ - StreamingWriteFn(TableSchema schema, BigQueryServices bqServices) { - this.jsonTableSchema = toJsonString(schema); + StreamingWriteFn(ValueProvider<TableSchema> schema, BigQueryServices bqServices) { + this.jsonTableSchema = + NestedValueProvider.of(schema, new TableSchemaToJsonSchema()); this.bqServices = checkNotNull(bqServices, "bqServices"); } @@ -2549,7 +2634,8 @@ public class BigQueryIO { // check again. This check isn't needed for correctness, but we add it to prevent // every thread from attempting a create and overwhelming our BigQuery quota. if (!createdTables.contains(tableSpec)) { - TableSchema tableSchema = JSON_FACTORY.fromString(jsonTableSchema, TableSchema.class); + TableSchema tableSchema = JSON_FACTORY.fromString( + jsonTableSchema.get(), TableSchema.class); Bigquery client = Transport.newBigQueryClient(options).build(); BigQueryTableInserter inserter = new BigQueryTableInserter(client, options); inserter.getOrCreateTable(tableReference, Write.WriteDisposition.WRITE_APPEND, @@ -2708,7 +2794,7 @@ public class BigQueryIO { private static class TagWithUniqueIdsAndTable extends DoFn<TableRow, KV<ShardedKey<String>, TableRowInfo>> { /** TableSpec to write to. */ - private final String tableSpec; + private final ValueProvider<String> tableSpec; /** User function mapping windows to {@link TableReference} in JSON. */ private final SerializableFunction<BoundedWindow, TableReference> tableRefFunction; @@ -2716,15 +2802,16 @@ public class BigQueryIO { private transient String randomUUID; private transient long sequenceNo = 0L; - TagWithUniqueIdsAndTable(BigQueryOptions options, TableReference table, + TagWithUniqueIdsAndTable(BigQueryOptions options, + ValueProvider<TableReference> table, SerializableFunction<BoundedWindow, TableReference> tableRefFunction) { checkArgument(table == null ^ tableRefFunction == null, "Exactly one of table or tableRefFunction should be set"); if (table != null) { - if (table.getProjectId() == null) { - table.setProjectId(options.as(BigQueryOptions.class).getProject()); + if (table.isAccessible() && table.get().getProjectId() == null) { + table.get().setProjectId(options.as(BigQueryOptions.class).getProject()); } - this.tableSpec = toTableSpec(table); + this.tableSpec = NestedValueProvider.of(table, new TableRefToTableSpec()); } else { tableSpec = null; } @@ -2763,7 +2850,7 @@ public class BigQueryIO { private String tableSpecFromWindow(BigQueryOptions options, BoundedWindow window) { if (tableSpec != null) { - return tableSpec; + return tableSpec.get(); } else { TableReference table = tableRefFunction.apply(window); if (table.getProjectId() == null) { @@ -2781,15 +2868,15 @@ public class BigQueryIO { * it leverages BigQuery best effort de-dup mechanism. */ private static class StreamWithDeDup extends PTransform<PCollection<TableRow>, PDone> { - private final transient TableReference tableReference; + private final transient ValueProvider<TableReference> tableReference; private final SerializableFunction<BoundedWindow, TableReference> tableRefFunction; - private final transient TableSchema tableSchema; + private final transient ValueProvider<TableSchema> tableSchema; private final BigQueryServices bqServices; /** Constructor. */ - StreamWithDeDup(TableReference tableReference, + StreamWithDeDup(ValueProvider<TableReference> tableReference, SerializableFunction<BoundedWindow, TableReference> tableRefFunction, - TableSchema tableSchema, + ValueProvider<TableSchema> tableSchema, BigQueryServices bqServices) { this.tableReference = tableReference; this.tableRefFunction = tableRefFunction; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fd6d09c3/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 25caf63..54ec2bb 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -95,6 +95,7 @@ import org.apache.beam.sdk.io.CountingInput; import org.apache.beam.sdk.io.CountingSource; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.BigQueryQuerySource; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.BigQueryTableSource; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.JsonSchemaToTableSchema; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.PassThroughThenCleanup; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.PassThroughThenCleanup.CleanupOperation; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Status; @@ -111,6 +112,8 @@ import org.apache.beam.sdk.options.BigQueryOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.StreamingOptions; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.NeedsRunner; @@ -643,9 +646,9 @@ public class BigQueryIOTest implements Serializable { BigQueryIO.Write.Bound bound, String project, String dataset, String table, TableSchema schema, CreateDisposition createDisposition, WriteDisposition writeDisposition, boolean validate) { - assertEquals(project, bound.getTable().getProjectId()); - assertEquals(dataset, bound.getTable().getDatasetId()); - assertEquals(table, bound.getTable().getTableId()); + assertEquals(project, bound.getTable().get().getProjectId()); + assertEquals(dataset, bound.getTable().get().getDatasetId()); + assertEquals(table, bound.getTable().get().getTableId()); assertEquals(schema, bound.getSchema()); assertEquals(createDisposition, bound.createDisposition); assertEquals(writeDisposition, bound.writeDisposition); @@ -1845,8 +1848,8 @@ public class BigQueryIOTest implements Serializable { fakeBqServices, jobIdToken, tempFilePrefix, - jsonTable, - jsonSchema, + StaticValueProvider.of(jsonTable), + StaticValueProvider.of(jsonSchema), WriteDisposition.WRITE_EMPTY, CreateDisposition.CREATE_IF_NEEDED); @@ -1920,7 +1923,7 @@ public class BigQueryIOTest implements Serializable { WriteRename writeRename = new WriteRename( fakeBqServices, jobIdToken, - jsonTable, + StaticValueProvider.of(jsonTable), WriteDisposition.WRITE_EMPTY, CreateDisposition.CREATE_IF_NEEDED, tempTablesView); @@ -1961,6 +1964,51 @@ public class BigQueryIOTest implements Serializable { logged.verifyNotLogged("Failed to delete the table " + toJsonString(tableRefs.get(2))); } + /** Test options. **/ + public interface RuntimeTestOptions extends PipelineOptions { + ValueProvider<String> getInputTable(); + void setInputTable(ValueProvider<String> value); + + ValueProvider<String> getInputQuery(); + void setInputQuery(ValueProvider<String> value); + + ValueProvider<String> getOutputTable(); + void setOutputTable(ValueProvider<String> value); + + ValueProvider<String> getOutputSchema(); + void setOutputSchema(ValueProvider<String> value); + } + + @Test + public void testRuntimeOptionsNotCalledInApplyInputTable() { + RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class); + BigQueryOptions bqOptions = options.as(BigQueryOptions.class); + bqOptions.setTempLocation("gs://testbucket/testdir"); + Pipeline pipeline = TestPipeline.create(options); + pipeline + .apply(BigQueryIO.Read.from(options.getInputTable()).withoutValidation()) + .apply(BigQueryIO.Write + .to(options.getOutputTable()) + .withSchema(NestedValueProvider.of( + options.getOutputSchema(), new JsonSchemaToTableSchema())) + .withoutValidation()); + } + + @Test + public void testRuntimeOptionsNotCalledInApplyInputQuery() { + RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class); + BigQueryOptions bqOptions = options.as(BigQueryOptions.class); + bqOptions.setTempLocation("gs://testbucket/testdir"); + Pipeline pipeline = TestPipeline.create(options); + pipeline + .apply(BigQueryIO.Read.fromQuery(options.getInputQuery()).withoutValidation()) + .apply(BigQueryIO.Write + .to(options.getOutputTable()) + .withSchema(NestedValueProvider.of( + options.getOutputSchema(), new JsonSchemaToTableSchema())) + .withoutValidation()); + } + private static void testNumFiles(File tempDir, int expectedNumFiles) { assertEquals(expectedNumFiles, tempDir.listFiles(new FileFilter() { @Override