Repository: beam
Updated Branches:
  refs/heads/master e9cd41165 -> 77b136603


BigQuery: refactor services so that all queryConfig happens in BigQueryIO

By putting all the configuration in the same place, we can avoid
bugs that happen from mismatching code across files.

Also made a few unnecessarily-public APIs package-private.

And improved tests, removed a few dataflow references.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/21e2cf6b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/21e2cf6b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/21e2cf6b

Branch: refs/heads/master
Commit: 21e2cf6b0a3905cd2768948287115b69f4b6bd6c
Parents: e9cd411
Author: Dan Halperin <dhalp...@google.com>
Authored: Mon Jan 30 14:04:32 2017 -0800
Committer: Dan Halperin <dhalp...@google.com>
Committed: Tue Jan 31 17:11:18 2017 -0800

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    |  4 +-
 .../sdk/io/gcp/bigquery/BigQueryServices.java   |  3 +-
 .../io/gcp/bigquery/BigQueryServicesImpl.java   | 17 ++--
 .../gcp/bigquery/BigQueryTableRowIterator.java  | 82 ++++++++------------
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |  3 +-
 .../bigquery/BigQueryTableRowIteratorTest.java  | 51 ++++++------
 6 files changed, 67 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/21e2cf6b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 4ace985..b15807e 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -1074,7 +1074,7 @@ public class BigQueryIO {
     public BoundedReader<TableRow> createReader(PipelineOptions options) 
throws IOException {
       BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
       return new BigQueryReader(this, bqServices.getReaderFromQuery(
-          bqOptions, query.get(), executingProject.get(), flattenResults, 
useLegacySql));
+          bqOptions, executingProject.get(), createBasicQueryConfig()));
     }
 
     @Override
@@ -1162,8 +1162,8 @@ public class BigQueryIO {
 
     private JobConfigurationQuery createBasicQueryConfig() {
       return new JobConfigurationQuery()
-          .setQuery(query.get())
           .setFlattenResults(flattenResults)
+          .setQuery(query.get())
           .setUseLegacySql(useLegacySql);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/21e2cf6b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index 03e4391..a85d16d 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -57,8 +57,7 @@ interface BigQueryServices extends Serializable {
    * Returns a real, mock, or fake {@link BigQueryJsonReader} to query tables.
    */
   BigQueryJsonReader getReaderFromQuery(
-      BigQueryOptions bqOptions, String query, String projectId, @Nullable 
Boolean flatten,
-      @Nullable Boolean useLegacySql);
+      BigQueryOptions bqOptions, String projectId, JobConfigurationQuery 
queryConfig);
 
   /**
    * An interface for the Cloud BigQuery load service.

http://git-wip-us.apache.org/repos/asf/beam/blob/21e2cf6b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 7c3edbe..b958c8d 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -100,9 +100,8 @@ class BigQueryServicesImpl implements BigQueryServices {
 
   @Override
   public BigQueryJsonReader getReaderFromQuery(
-      BigQueryOptions bqOptions, String query, String projectId, @Nullable 
Boolean flatten,
-      @Nullable Boolean useLegacySql) {
-    return BigQueryJsonReaderImpl.fromQuery(bqOptions, query, projectId, 
flatten, useLegacySql);
+      BigQueryOptions bqOptions, String projectId, JobConfigurationQuery 
queryConfig) {
+    return BigQueryJsonReaderImpl.fromQuery(bqOptions, projectId, queryConfig);
   }
 
   @VisibleForTesting
@@ -800,20 +799,14 @@ class BigQueryServicesImpl implements BigQueryServices {
     }
 
     private static BigQueryJsonReader fromQuery(
-        BigQueryOptions bqOptions,
-        String query,
-        String projectId,
-        @Nullable Boolean flattenResults,
-        @Nullable Boolean useLegacySql) {
+        BigQueryOptions bqOptions, String projectId, JobConfigurationQuery 
queryConfig) {
       return new BigQueryJsonReaderImpl(
           BigQueryTableRowIterator.fromQuery(
-              query, projectId, 
Transport.newBigQueryClient(bqOptions).build(), flattenResults,
-              useLegacySql));
+              queryConfig, projectId, 
Transport.newBigQueryClient(bqOptions).build()));
     }
 
     private static BigQueryJsonReader fromTable(
-        BigQueryOptions bqOptions,
-        TableReference tableRef) {
+        BigQueryOptions bqOptions, TableReference tableRef) {
       return new BigQueryJsonReaderImpl(BigQueryTableRowIterator.fromTable(
           tableRef, Transport.newBigQueryClient(bqOptions).build()));
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/21e2cf6b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
index 92f7542..5edc78c 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
@@ -44,7 +44,6 @@ import 
com.google.api.services.bigquery.model.TableFieldSchema;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
-import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.IOException;
@@ -72,6 +71,7 @@ class BigQueryTableRowIterator implements AutoCloseable {
   @Nullable private TableReference ref;
   @Nullable private final String projectId;
   @Nullable private TableSchema schema;
+  @Nullable private JobConfigurationQuery queryConfig;
   private final Bigquery client;
   private String pageToken;
   private Iterator<TableRow> iteratorOverCurrentBatch;
@@ -88,64 +88,54 @@ class BigQueryTableRowIterator implements AutoCloseable {
   // following interval to check the status of query execution job
   private static final Duration QUERY_COMPLETION_POLL_TIME = 
Duration.standardSeconds(1);
 
-  private final String query;
-  // Whether to flatten query results.
-  private final boolean flattenResults;
-  // Whether to use the BigQuery legacy SQL dialect..
-  private final boolean useLegacySql;
   // Temporary dataset used to store query results.
   private String temporaryDatasetId = null;
   // Temporary table used to store query results.
   private String temporaryTableId = null;
 
   private BigQueryTableRowIterator(
-      @Nullable TableReference ref, @Nullable String query, @Nullable String 
projectId,
-      Bigquery client, boolean flattenResults, boolean useLegacySql) {
+      @Nullable TableReference ref, @Nullable JobConfigurationQuery 
queryConfig,
+      @Nullable String projectId, Bigquery client) {
     this.ref = ref;
-    this.query = query;
+    this.queryConfig = queryConfig;
     this.projectId = projectId;
     this.client = checkNotNull(client, "client");
-    this.flattenResults = flattenResults;
-    this.useLegacySql = useLegacySql;
   }
 
   /**
    * Constructs a {@code BigQueryTableRowIterator} that reads from the 
specified table.
    */
-  public static BigQueryTableRowIterator fromTable(TableReference ref, 
Bigquery client) {
+  static BigQueryTableRowIterator fromTable(TableReference ref, Bigquery 
client) {
     checkNotNull(ref, "ref");
     checkNotNull(client, "client");
-    return new BigQueryTableRowIterator(ref, null, ref.getProjectId(), client, 
true, true);
+    return new BigQueryTableRowIterator(ref, /* queryConfig */null, 
ref.getProjectId(), client);
   }
 
   /**
    * Constructs a {@code BigQueryTableRowIterator} that reads from the results 
of executing the
    * specified query in the specified project.
    */
-  public static BigQueryTableRowIterator fromQuery(
-      String query, String projectId, Bigquery client, @Nullable Boolean 
flattenResults,
-      @Nullable Boolean useLegacySql) {
-    checkNotNull(query, "query");
+  static BigQueryTableRowIterator fromQuery(
+      JobConfigurationQuery queryConfig, String projectId, Bigquery client) {
+    checkNotNull(queryConfig, "queryConfig");
     checkNotNull(projectId, "projectId");
     checkNotNull(client, "client");
-    return new BigQueryTableRowIterator(null, query, projectId, client,
-        MoreObjects.firstNonNull(flattenResults, Boolean.TRUE),
-        MoreObjects.firstNonNull(useLegacySql, Boolean.TRUE));
+    return new BigQueryTableRowIterator(/* ref */null, queryConfig, projectId, 
client);
   }
 
   /**
    * Opens the table for read.
    * @throws IOException on failure
    */
-  public void open() throws IOException, InterruptedException {
-    if (query != null) {
+  void open() throws IOException, InterruptedException {
+    if (queryConfig != null) {
       ref = executeQueryAndWaitForCompletion();
     }
     // Get table schema.
     schema = getTable(ref).getSchema();
   }
 
-  public boolean advance() throws IOException, InterruptedException {
+  boolean advance() throws IOException, InterruptedException {
     while (true) {
       if (iteratorOverCurrentBatch != null && 
iteratorOverCurrentBatch.hasNext()) {
         // Embed schema information into the raw row, so that values have an
@@ -183,7 +173,7 @@ class BigQueryTableRowIterator implements AutoCloseable {
     }
   }
 
-  public TableRow getCurrent() {
+  TableRow getCurrent() {
     if (current == null) {
       throw new NoSuchElementException();
     }
@@ -193,7 +183,7 @@ class BigQueryTableRowIterator implements AutoCloseable {
   /**
    * Adjusts a field returned from the BigQuery API to match what we will 
receive when running
    * BigQuery's export-to-GCS and parallel read, which is the efficient 
parallel implementation
-   * used for batch jobs executed on the Cloud Dataflow service.
+   * used for batch jobs executed on the Beam Runners that perform initial 
splitting.
    *
    * <p>The following is the relationship between BigQuery schema and Java 
types:
    *
@@ -254,7 +244,7 @@ class BigQueryTableRowIterator implements AutoCloseable {
   }
 
   /**
-   * A list of the field names that cannot be used in BigQuery tables 
processed by Dataflow,
+   * A list of the field names that cannot be used in BigQuery tables 
processed by Apache Beam,
    * because they are reserved keywords in {@link TableRow}.
    */
   // TODO: This limitation is unfortunate. We need to give users a way to use 
BigQueryIO that does
@@ -388,15 +378,17 @@ class BigQueryTableRowIterator implements AutoCloseable {
    */
   private TableReference executeQueryAndWaitForCompletion()
       throws IOException, InterruptedException {
+    checkState(projectId != null, "Unable to execute a query without a 
configured project id");
+    checkState(queryConfig != null, "Unable to execute a query without a 
configured query");
     // Dry run query to get source table location
     Job dryRunJob = new Job()
         .setConfiguration(new JobConfiguration()
-            .setQuery(new JobConfigurationQuery()
-                .setQuery(query))
+            .setQuery(queryConfig)
             .setDryRun(true));
     JobStatistics jobStats = executeWithBackOff(
         client.jobs().insert(projectId, dryRunJob),
-        String.format("Error when trying to dry run query %s.", 
query)).getStatistics();
+        String.format("Error when trying to dry run query %s.",
+            queryConfig.toPrettyString())).getStatistics();
 
     // Let BigQuery to pick default location if the query does not read any 
tables.
     String location = null;
@@ -409,35 +401,33 @@ class BigQueryTableRowIterator implements AutoCloseable {
     // Create a temporary dataset to store results.
     // Starting dataset name with an "_" so that it is hidden.
     Random rnd = new Random(System.currentTimeMillis());
-    temporaryDatasetId = "_dataflow_temporary_dataset_" + rnd.nextInt(1000000);
-    temporaryTableId = "dataflow_temporary_table_" + rnd.nextInt(1000000);
+    temporaryDatasetId = "_beam_temporary_dataset_" + rnd.nextInt(1000000);
+    temporaryTableId = "beam_temporary_table_" + rnd.nextInt(1000000);
 
     createDataset(temporaryDatasetId, location);
     Job job = new Job();
     JobConfiguration config = new JobConfiguration();
-    JobConfigurationQuery queryConfig = new JobConfigurationQuery();
     config.setQuery(queryConfig);
     job.setConfiguration(config);
-    queryConfig.setQuery(query);
-    queryConfig.setAllowLargeResults(true);
-    queryConfig.setFlattenResults(flattenResults);
-    queryConfig.setUseLegacySql(useLegacySql);
 
     TableReference destinationTable = new TableReference();
     destinationTable.setProjectId(projectId);
     destinationTable.setDatasetId(temporaryDatasetId);
     destinationTable.setTableId(temporaryTableId);
     queryConfig.setDestinationTable(destinationTable);
+    queryConfig.setAllowLargeResults(true);
 
     Job queryJob = executeWithBackOff(
         client.jobs().insert(projectId, job),
-        String.format("Error when trying to execute the job for query %s.", 
query));
+        String.format("Error when trying to execute the job for query %s.",
+            queryConfig.toPrettyString()));
     JobReference jobId = queryJob.getJobReference();
 
     while (true) {
       Job pollJob = executeWithBackOff(
           client.jobs().get(projectId, jobId.getJobId()),
-          String.format("Error when trying to get status of the job for query 
%s.", query));
+          String.format("Error when trying to get status of the job for query 
%s.",
+              queryConfig.toPrettyString()));
       JobStatus status = pollJob.getStatus();
       if (status.getState().equals("DONE")) {
         // Job is DONE, but did not necessarily succeed.
@@ -447,7 +437,8 @@ class BigQueryTableRowIterator implements AutoCloseable {
         } else {
           // There will be no temporary table to delete, so null out the 
reference.
           temporaryTableId = null;
-          throw new IOException("Executing query " + query + " failed: " + 
error.getMessage());
+          throw new IOException(String.format(
+              "Executing query %s failed: %s", queryConfig.toPrettyString(), 
error.getMessage()));
         }
       }
       Uninterruptibles.sleepUninterruptibly(
@@ -455,22 +446,11 @@ class BigQueryTableRowIterator implements AutoCloseable {
     }
   }
 
-  /**
-   * Execute a BQ request with exponential backoff and return the result.
-   *
-   * @deprecated use {@link #executeWithBackOff(AbstractGoogleClientRequest, 
String)}.
-   */
-  @Deprecated
-  public static <T> T executeWithBackOff(AbstractGoogleClientRequest<T> 
client, String error,
-      Object... errorArgs) throws IOException, InterruptedException {
-    return executeWithBackOff(client, String.format(error, errorArgs));
-  }
-
   // Execute a BQ request with exponential backoff and return the result.
   // client - BQ request to be executed
   // error - Formatted message to log if when a request fails. Takes exception 
message as a
   // formatter parameter.
-  public static <T> T executeWithBackOff(AbstractGoogleClientRequest<T> 
client, String error)
+  private static <T> T executeWithBackOff(AbstractGoogleClientRequest<T> 
client, String error)
       throws IOException, InterruptedException {
     Sleeper sleeper = Sleeper.DEFAULT;
     BackOff backOff =

http://git-wip-us.apache.org/repos/asf/beam/blob/21e2cf6b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index bbfc2ce..c0ce027 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -226,8 +226,7 @@ public class BigQueryIOTest implements Serializable {
 
     @Override
     public BigQueryJsonReader getReaderFromQuery(
-        BigQueryOptions bqOptions, String query, String projectId, @Nullable 
Boolean flatten,
-        @Nullable Boolean useLegacySql) {
+        BigQueryOptions bqOptions, String projectId, JobConfigurationQuery 
queryConfig) {
       return new FakeBigQueryReader(jsonTableRowReturns);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/21e2cf6b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java
index a41b455..f84d412 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java
@@ -175,14 +175,16 @@ public class BigQueryTableRowIteratorTest {
 
     // Mock job polling.
     JobStatus status = new JobStatus().setState("DONE");
-    TableReference tableRef =
-        new 
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
-    JobConfigurationQuery queryConfig = new 
JobConfigurationQuery().setDestinationTable(tableRef);
+    JobConfigurationQuery resultQueryConfig = new JobConfigurationQuery()
+        .setDestinationTable(new TableReference()
+            .setProjectId("project")
+            .setDatasetId("tempdataset")
+            .setTableId("temptable"));
     Job getJob =
         new Job()
             .setJobReference(new JobReference())
             .setStatus(status)
-            .setConfiguration(new JobConfiguration().setQuery(queryConfig));
+            .setConfiguration(new 
JobConfiguration().setQuery(resultQueryConfig));
     when(mockJobsGet.execute()).thenReturn(getJob);
 
     // Mock table schema fetch.
@@ -198,8 +200,9 @@ public class BigQueryTableRowIteratorTest {
     // Run query and verify
     String query = "SELECT name, count, photo, anniversary_date, "
         + "anniversary_datetime, anniversary_time from table";
+    JobConfigurationQuery queryConfig = new 
JobConfigurationQuery().setQuery(query);
     try (BigQueryTableRowIterator iterator =
-            BigQueryTableRowIterator.fromQuery(query, "project", mockClient, 
null, null)) {
+            BigQueryTableRowIterator.fromQuery(queryConfig, "project", 
mockClient)) {
       iterator.open();
       assertTrue(iterator.advance());
       TableRow row = iterator.getCurrent();
@@ -240,7 +243,7 @@ public class BigQueryTableRowIteratorTest {
     verify(mockTablesDelete).execute();
     // Table data read.
     verify(mockClient).tabledata();
-    verify(mockTabledata).list("project", "dataset", "table");
+    verify(mockTabledata).list("project", "tempdataset", "temptable");
     verify(mockTabledataList).execute();
   }
 
@@ -257,14 +260,16 @@ public class BigQueryTableRowIteratorTest {
 
     // Mock job polling.
     JobStatus status = new JobStatus().setState("DONE");
-    TableReference tableRef =
-        new 
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
-    JobConfigurationQuery queryConfig = new 
JobConfigurationQuery().setDestinationTable(tableRef);
+    JobConfigurationQuery resultQueryConfig = new JobConfigurationQuery()
+        .setDestinationTable(new TableReference()
+            .setProjectId("project")
+            .setDatasetId("tempdataset")
+            .setTableId("temptable"));
     Job getJob =
         new Job()
             .setJobReference(new JobReference())
             .setStatus(status)
-            .setConfiguration(new JobConfiguration().setQuery(queryConfig));
+            .setConfiguration(new 
JobConfiguration().setQuery(resultQueryConfig));
     when(mockJobsGet.execute()).thenReturn(getJob);
 
     // Mock table schema fetch.
@@ -280,8 +285,9 @@ public class BigQueryTableRowIteratorTest {
     String query = String.format(
         "SELECT \"Arthur\" as name, 42 as count, \"%s\" as photo",
         photoBytesEncoded);
+    JobConfigurationQuery queryConfig = new 
JobConfigurationQuery().setQuery(query);
     try (BigQueryTableRowIterator iterator =
-        BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null, 
null)) {
+        BigQueryTableRowIterator.fromQuery(queryConfig, "project", 
mockClient)) {
       iterator.open();
       assertTrue(iterator.advance());
       TableRow row = iterator.getCurrent();
@@ -316,7 +322,7 @@ public class BigQueryTableRowIteratorTest {
     verify(mockTablesDelete).execute();
     // Table data read.
     verify(mockClient).tabledata();
-    verify(mockTabledata).list("project", "dataset", "table");
+    verify(mockTabledata).list("project", "tempdataset", "temptable");
     verify(mockTabledataList).execute();
   }
 
@@ -332,19 +338,16 @@ public class BigQueryTableRowIteratorTest {
     Exception exception = new IOException(errorReason);
     when(mockJobsInsert.execute()).thenThrow(exception, exception, exception, 
exception);
 
-    String query = "NOT A QUERY";
+    JobConfigurationQuery queryConfig = new 
JobConfigurationQuery().setQuery("NOT A QUERY");
     try (BigQueryTableRowIterator iterator =
-        BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null, 
null)) {
-
-      try {
-        iterator.open();
-        fail();
-      } catch (Exception expected) {
-        // Verify message explains cause and reports the query.
-        assertThat(expected.getMessage(), containsString("Error"));
-        assertThat(expected.getMessage(), containsString(query));
-        assertThat(expected.getCause().getMessage(), 
containsString(errorReason));
-      }
+        BigQueryTableRowIterator.fromQuery(queryConfig, "project", 
mockClient)) {
+      iterator.open();
+      fail();
+    } catch (Exception expected) {
+      // Verify message explains cause and reports the query.
+      assertThat(expected.getMessage(), containsString("Error"));
+      assertThat(expected.getMessage(), containsString("NOT A QUERY"));
+      assertThat(expected.getCause().getMessage(), 
containsString(errorReason));
     }
 
     // Job inserted to run the query, then polled once.

Reply via email to