Repository: incubator-beam
Updated Branches:
  refs/heads/master 7d46698f2 -> b237e2f05


Forward port PR-411 and PR-420 from Dataflow


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8b4550d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8b4550d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8b4550d2

Branch: refs/heads/master
Commit: 8b4550d27920b53d04291dc383f28c9f7f77ca32
Parents: 7d46698
Author: Pei He <pe...@google.com>
Authored: Thu Sep 29 15:13:28 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Sep 30 14:13:46 2016 -0700

----------------------------------------------------------------------
 .../gcp/bigquery/BigQueryTableRowIterator.java  | 114 ++++++++++-----
 .../bigquery/BigQueryTableRowIteratorTest.java  | 143 +++++++++++++++----
 2 files changed, 190 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b4550d2/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
index 0ee01d9..64b1dc6 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
@@ -28,7 +28,6 @@ import com.google.api.client.util.ClassInfo;
 import com.google.api.client.util.Data;
 import com.google.api.client.util.Sleeper;
 import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.bigquery.Bigquery.Jobs.Insert;
 import com.google.api.services.bigquery.model.Dataset;
 import com.google.api.services.bigquery.model.DatasetReference;
 import com.google.api.services.bigquery.model.ErrorProto;
@@ -36,6 +35,7 @@ import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobConfiguration;
 import com.google.api.services.bigquery.model.JobConfigurationQuery;
 import com.google.api.services.bigquery.model.JobReference;
+import com.google.api.services.bigquery.model.JobStatistics;
 import com.google.api.services.bigquery.model.JobStatus;
 import com.google.api.services.bigquery.model.Table;
 import com.google.api.services.bigquery.model.TableCell;
@@ -137,16 +137,7 @@ class BigQueryTableRowIterator implements AutoCloseable {
       ref = executeQueryAndWaitForCompletion();
     }
     // Get table schema.
-    Bigquery.Tables.Get get =
-        client.tables().get(ref.getProjectId(), ref.getDatasetId(), 
ref.getTableId());
-
-    Table table =
-        executeWithBackOff(
-            get,
-            "Error opening BigQuery table  %s of dataset %s  : {}",
-            ref.getTableId(),
-            ref.getDatasetId());
-    schema = table.getSchema();
+    schema = getTable(ref).getSchema();
   }
 
   public boolean advance() throws IOException, InterruptedException {
@@ -168,12 +159,11 @@ class BigQueryTableRowIterator implements AutoCloseable {
         list.setPageToken(pageToken);
       }
 
-      TableDataList result =
-          executeWithBackOff(
-              list,
-              "Error reading from BigQuery table %s of dataset %s : {}",
-              ref.getTableId(),
-              ref.getDatasetId());
+      TableDataList result = executeWithBackOff(
+          list,
+          String.format(
+              "Error reading from BigQuery table %s of dataset %s.",
+              ref.getTableId(), ref.getDatasetId()));
 
       pageToken = result.getPageToken();
       iteratorOverCurrentBatch =
@@ -332,19 +322,36 @@ class BigQueryTableRowIterator implements AutoCloseable {
     return row;
   }
 
+  // Get the BiqQuery table.
+  private Table getTable(TableReference ref) throws IOException, 
InterruptedException {
+    Bigquery.Tables.Get get =
+        client.tables().get(ref.getProjectId(), ref.getDatasetId(), 
ref.getTableId());
+
+    return executeWithBackOff(
+        get,
+        String.format(
+            "Error opening BigQuery table %s of dataset %s.",
+            ref.getTableId(),
+            ref.getDatasetId()));
+  }
+
   // Create a new BigQuery dataset
-  private void createDataset(String datasetId) throws IOException, 
InterruptedException {
+  private void createDataset(String datasetId, @Nullable String location)
+      throws IOException, InterruptedException {
     Dataset dataset = new Dataset();
     DatasetReference reference = new DatasetReference();
     reference.setProjectId(projectId);
     reference.setDatasetId(datasetId);
     dataset.setDatasetReference(reference);
+    if (location != null) {
+      dataset.setLocation(location);
+    }
 
-    String createDatasetError =
-        "Error when trying to create the temporary dataset " + datasetId + " 
in project "
-        + projectId;
     executeWithBackOff(
-        client.datasets().insert(projectId, dataset), createDatasetError + " 
:{}");
+        client.datasets().insert(projectId, dataset),
+        String.format(
+            "Error when trying to create the temporary dataset %s in project 
%s.",
+            datasetId, projectId));
   }
 
   // Delete the given table that is available in the given dataset.
@@ -352,16 +359,20 @@ class BigQueryTableRowIterator implements AutoCloseable {
       throws IOException, InterruptedException {
     executeWithBackOff(
         client.tables().delete(projectId, datasetId, tableId),
-        "Error when trying to delete the temporary table " + datasetId + " in 
dataset " + datasetId
-        + " of project " + projectId + ". Manual deletion may be required. 
Error message : {}");
+        String.format(
+            "Error when trying to delete the temporary table %s in dataset %s 
of project %s. "
+            + "Manual deletion may be required.",
+            tableId, datasetId, projectId));
   }
 
   // Delete the given dataset. This will fail if the given dataset has any 
tables.
   private void deleteDataset(String datasetId) throws IOException, 
InterruptedException {
     executeWithBackOff(
         client.datasets().delete(projectId, datasetId),
-        "Error when trying to delete the temporary dataset " + datasetId + " 
in project "
-        + projectId + ". Manual deletion may be required. Error message : {}");
+        String.format(
+            "Error when trying to delete the temporary dataset %s in project 
%s. "
+            + "Manual deletion may be required.",
+            datasetId, projectId));
   }
 
   /**
@@ -372,13 +383,31 @@ class BigQueryTableRowIterator implements AutoCloseable {
    */
   private TableReference executeQueryAndWaitForCompletion()
       throws IOException, InterruptedException {
+    // Dry run query to get source table location
+    Job dryRunJob = new Job()
+        .setConfiguration(new JobConfiguration()
+            .setQuery(new JobConfigurationQuery()
+                .setQuery(query))
+            .setDryRun(true));
+    JobStatistics jobStats = executeWithBackOff(
+        client.jobs().insert(projectId, dryRunJob),
+        String.format("Error when trying to dry run query %s.", 
query)).getStatistics();
+
+    // Let BigQuery to pick default location if the query does not read any 
tables.
+    String location = null;
+    @Nullable List<TableReference> tables = 
jobStats.getQuery().getReferencedTables();
+    if (tables != null && !tables.isEmpty()) {
+      Table table = getTable(tables.get(0));
+      location = table.getLocation();
+    }
+
     // Create a temporary dataset to store results.
     // Starting dataset name with an "_" so that it is hidden.
     Random rnd = new Random(System.currentTimeMillis());
     temporaryDatasetId = "_dataflow_temporary_dataset_" + rnd.nextInt(1000000);
     temporaryTableId = "dataflow_temporary_table_" + rnd.nextInt(1000000);
 
-    createDataset(temporaryDatasetId);
+    createDataset(temporaryDatasetId, location);
     Job job = new Job();
     JobConfiguration config = new JobConfiguration();
     JobConfigurationQuery queryConfig = new JobConfigurationQuery();
@@ -394,15 +423,15 @@ class BigQueryTableRowIterator implements AutoCloseable {
     destinationTable.setTableId(temporaryTableId);
     queryConfig.setDestinationTable(destinationTable);
 
-    Insert insert = client.jobs().insert(projectId, job);
     Job queryJob = executeWithBackOff(
-        insert, "Error when trying to execute the job for query " + query + " 
:{}");
+        client.jobs().insert(projectId, job),
+        String.format("Error when trying to execute the job for query %s.", 
query));
     JobReference jobId = queryJob.getJobReference();
 
     while (true) {
       Job pollJob = executeWithBackOff(
           client.jobs().get(projectId, jobId.getJobId()),
-          "Error when trying to get status of the job for query " + query + " 
:{}");
+          String.format("Error when trying to get status of the job for query 
%s.", query));
       JobStatus status = pollJob.getStatus();
       if (status.getState().equals("DONE")) {
         // Job is DONE, but did not necessarily succeed.
@@ -420,12 +449,23 @@ class BigQueryTableRowIterator implements AutoCloseable {
     }
   }
 
+  /**
+   * Execute a BQ request with exponential backoff and return the result.
+   *
+   * @deprecated use {@link #executeWithBackOff(AbstractGoogleClientRequest, 
String)}.
+   */
+  @Deprecated
+  public static <T> T executeWithBackOff(AbstractGoogleClientRequest<T> 
client, String error,
+      Object... errorArgs) throws IOException, InterruptedException {
+    return executeWithBackOff(client, String.format(error, errorArgs));
+  }
+
   // Execute a BQ request with exponential backoff and return the result.
   // client - BQ request to be executed
   // error - Formatted message to log if when a request fails. Takes exception 
message as a
   // formatter parameter.
-  public static <T> T executeWithBackOff(AbstractGoogleClientRequest<T> 
client, String error,
-      Object... errorArgs) throws IOException, InterruptedException {
+  public static <T> T executeWithBackOff(AbstractGoogleClientRequest<T> 
client, String error)
+      throws IOException, InterruptedException {
     Sleeper sleeper = Sleeper.DEFAULT;
     BackOff backOff =
         FluentBackoff.DEFAULT
@@ -437,15 +477,15 @@ class BigQueryTableRowIterator implements AutoCloseable {
         result = client.execute();
         break;
       } catch (IOException e) {
-        LOG.error(String.format(error, errorArgs), e.getMessage());
+        LOG.error("{}", error, e);
         if (!BackOffUtils.next(sleeper, backOff)) {
-          LOG.error(
-              String.format(error, errorArgs), "Failing after retrying " + 
MAX_RETRIES + " times.");
-          throw e;
+          String errorMessage = String.format(
+              "%s Failing to execute job after %d attempts.", error, 
MAX_RETRIES + 1);
+          LOG.error("{}", errorMessage, e);
+          throw new IOException(errorMessage, e);
         }
       }
     }
-
     return result;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b4550d2/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java
index 040f884..29a1704 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java
@@ -32,11 +32,12 @@ import static org.mockito.Mockito.when;
 
 import com.google.api.services.bigquery.Bigquery;
 import com.google.api.services.bigquery.model.Dataset;
-import com.google.api.services.bigquery.model.ErrorProto;
 import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobConfiguration;
 import com.google.api.services.bigquery.model.JobConfigurationQuery;
 import com.google.api.services.bigquery.model.JobReference;
+import com.google.api.services.bigquery.model.JobStatistics;
+import com.google.api.services.bigquery.model.JobStatistics2;
 import com.google.api.services.bigquery.model.JobStatus;
 import com.google.api.services.bigquery.model.Table;
 import com.google.api.services.bigquery.model.TableCell;
@@ -45,6 +46,7 @@ import 
com.google.api.services.bigquery.model.TableFieldSchema;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.collect.ImmutableList;
 import com.google.common.io.BaseEncoding;
 import java.io.IOException;
 import java.util.Arrays;
@@ -130,6 +132,22 @@ public class BigQueryTableRowIteratorTest {
                         new 
TableFieldSchema().setName("anniversary_time").setType("TIME"))));
   }
 
+  private static Table noTableQuerySchema() {
+    return new Table()
+        .setSchema(
+            new TableSchema()
+                .setFields(
+                    Arrays.asList(
+                        new 
TableFieldSchema().setName("name").setType("STRING"),
+                        new 
TableFieldSchema().setName("count").setType("INTEGER"),
+                        new 
TableFieldSchema().setName("photo").setType("BYTES"))));
+  }
+
+  private static Table tableWithLocation() {
+    return new Table()
+        .setLocation("EU");
+  }
+
   private TableRow rawRow(Object... args) {
     List<TableCell> cells = new LinkedList<>();
     for (Object a : args) {
@@ -149,8 +167,11 @@ public class BigQueryTableRowIteratorTest {
   @Test
   public void testReadFromQuery() throws IOException, InterruptedException {
     // Mock job inserting.
+    Job dryRunJob = new Job().setStatistics(
+        new JobStatistics().setQuery(new JobStatistics2().setReferencedTables(
+            ImmutableList.of(new TableReference()))));
     Job insertedJob = new Job().setJobReference(new JobReference());
-    when(mockJobsInsert.execute()).thenReturn(insertedJob);
+    when(mockJobsInsert.execute()).thenReturn(dryRunJob, insertedJob);
 
     // Mock job polling.
     JobStatus status = new JobStatus().setState("DONE");
@@ -165,7 +186,7 @@ public class BigQueryTableRowIteratorTest {
     when(mockJobsGet.execute()).thenReturn(getJob);
 
     // Mock table schema fetch.
-    when(mockTablesGet.execute()).thenReturn(tableWithBasicSchema());
+    when(mockTablesGet.execute()).thenReturn(tableWithLocation(), 
tableWithBasicSchema());
 
     byte[] photoBytes = "photograph".getBytes();
     String photoBytesEncoded = BaseEncoding.base64().encode(photoBytes);
@@ -206,15 +227,91 @@ public class BigQueryTableRowIteratorTest {
     verify(mockDatasets).delete(anyString(), anyString());
     verify(mockDatasetsDelete).execute();
     // Job inserted to run the query, polled once.
-    verify(mockClient, times(2)).jobs();
-    verify(mockJobs).insert(anyString(), any(Job.class));
-    verify(mockJobsInsert).execute();
+    verify(mockClient, times(3)).jobs();
+    verify(mockJobs, times(2)).insert(anyString(), any(Job.class));
+    verify(mockJobsInsert, times(2)).execute();
+    verify(mockJobs).get(anyString(), anyString());
+    verify(mockJobsGet).execute();
+    // Temp table get after query finish, deleted after reading.
+    verify(mockClient, times(3)).tables();
+    verify(mockTables, times(2)).get(anyString(), anyString(), anyString());
+    verify(mockTablesGet, times(2)).execute();
+    verify(mockTables).delete(anyString(), anyString(), anyString());
+    verify(mockTablesDelete).execute();
+    // Table data read.
+    verify(mockClient).tabledata();
+    verify(mockTabledata).list("project", "dataset", "table");
+    verify(mockTabledataList).execute();
+  }
+
+  /**
+   * Verifies that queries that reference no data can be read.
+   */
+  @Test
+  public void testReadFromQueryNoTables() throws IOException, 
InterruptedException {
+    // Mock job inserting.
+    Job dryRunJob = new Job().setStatistics(
+        new JobStatistics().setQuery(new JobStatistics2()));
+    Job insertedJob = new Job().setJobReference(new JobReference());
+    when(mockJobsInsert.execute()).thenReturn(dryRunJob, insertedJob);
+
+    // Mock job polling.
+    JobStatus status = new JobStatus().setState("DONE");
+    TableReference tableRef =
+        new 
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
+    JobConfigurationQuery queryConfig = new 
JobConfigurationQuery().setDestinationTable(tableRef);
+    Job getJob =
+        new Job()
+            .setJobReference(new JobReference())
+            .setStatus(status)
+            .setConfiguration(new JobConfiguration().setQuery(queryConfig));
+    when(mockJobsGet.execute()).thenReturn(getJob);
+
+    // Mock table schema fetch.
+    when(mockTablesGet.execute()).thenReturn(noTableQuerySchema());
+
+    byte[] photoBytes = "photograph".getBytes();
+    String photoBytesEncoded = BaseEncoding.base64().encode(photoBytes);
+    // Mock table data fetch.
+    when(mockTabledataList.execute()).thenReturn(
+        rawDataList(rawRow("Arthur", 42, photoBytesEncoded)));
+
+    // Run query and verify
+    String query = String.format(
+        "SELECT \"Arthur\" as name, 42 as count, \"%s\" as photo",
+        photoBytesEncoded);
+    try (BigQueryTableRowIterator iterator =
+        BigQueryTableRowIterator.fromQuery(query, "project", mockClient, 
null)) {
+      iterator.open();
+      assertTrue(iterator.advance());
+      TableRow row = iterator.getCurrent();
+
+      assertTrue(row.containsKey("name"));
+      assertTrue(row.containsKey("count"));
+      assertTrue(row.containsKey("photo"));
+      assertEquals("Arthur", row.get("name"));
+      assertEquals(42, row.get("count"));
+      assertEquals(photoBytesEncoded, row.get("photo"));
+
+      assertFalse(iterator.advance());
+    }
+
+    // Temp dataset created and later deleted.
+    verify(mockClient, times(2)).datasets();
+    verify(mockDatasets).insert(anyString(), any(Dataset.class));
+    verify(mockDatasetsInsert).execute();
+    verify(mockDatasets).delete(anyString(), anyString());
+    verify(mockDatasetsDelete).execute();
+    // Job inserted to run the query, polled once.
+    verify(mockClient, times(3)).jobs();
+    verify(mockJobs, times(2)).insert(anyString(), any(Job.class));
+    verify(mockJobsInsert, times(2)).execute();
     verify(mockJobs).get(anyString(), anyString());
     verify(mockJobsGet).execute();
     // Temp table get after query finish, deleted after reading.
     verify(mockClient, times(2)).tables();
-    verify(mockTables).get("project", "dataset", "table");
-    verify(mockTablesGet).execute();
+    verify(mockTables, times(1)).get(anyString(), anyString(), anyString());
+    verify(mockTablesGet, times(1)).execute();
     verify(mockTables).delete(anyString(), anyString(), anyString());
     verify(mockTablesDelete).execute();
     // Table data read.
@@ -230,43 +327,29 @@ public class BigQueryTableRowIteratorTest {
    */
   @Test
   public void testQueryFailed() throws IOException {
-    // Job can be created.
-    JobReference ref = new JobReference();
-    Job insertedJob = new Job().setJobReference(ref);
-    when(mockJobsInsert.execute()).thenReturn(insertedJob);
-
     // Job state polled with an error.
     String errorReason = "bad query";
-    JobStatus status =
-        new JobStatus().setState("DONE").setErrorResult(new 
ErrorProto().setMessage(errorReason));
-    Job getJob = new Job().setJobReference(ref).setStatus(status);
-    when(mockJobsGet.execute()).thenReturn(getJob);
+    Exception exception = new IOException(errorReason);
+    when(mockJobsInsert.execute()).thenThrow(exception, exception, exception, 
exception);
 
     String query = "NOT A QUERY";
     try (BigQueryTableRowIterator iterator =
-            BigQueryTableRowIterator.fromQuery(query, "project", mockClient, 
null)) {
+        BigQueryTableRowIterator.fromQuery(query, "project", mockClient, 
null)) {
+
       try {
         iterator.open();
         fail();
       } catch (Exception expected) {
         // Verify message explains cause and reports the query.
-        assertThat(expected.getMessage(), containsString("failed"));
-        assertThat(expected.getMessage(), containsString(errorReason));
+        assertThat(expected.getMessage(), containsString("Error"));
         assertThat(expected.getMessage(), containsString(query));
+        assertThat(expected.getCause().getMessage(), 
containsString(errorReason));
       }
     }
 
-    // Temp dataset created and then later deleted.
-    verify(mockClient, times(2)).datasets();
-    verify(mockDatasets).insert(anyString(), any(Dataset.class));
-    verify(mockDatasetsInsert).execute();
-    verify(mockDatasets).delete(anyString(), anyString());
-    verify(mockDatasetsDelete).execute();
     // Job inserted to run the query, then polled once.
-    verify(mockClient, times(2)).jobs();
+    verify(mockClient, times(1)).jobs();
     verify(mockJobs).insert(anyString(), any(Job.class));
-    verify(mockJobsInsert).execute();
-    verify(mockJobs).get(anyString(), anyString());
-    verify(mockJobsGet).execute();
+    verify(mockJobsInsert, times(4)).execute();
   }
 }

Reply via email to