Repository: incubator-beam Updated Branches: refs/heads/master 7d46698f2 -> b237e2f05
Forward port PR-411 and PR-420 from Dataflow Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8b4550d2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8b4550d2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8b4550d2 Branch: refs/heads/master Commit: 8b4550d27920b53d04291dc383f28c9f7f77ca32 Parents: 7d46698 Author: Pei He <pe...@google.com> Authored: Thu Sep 29 15:13:28 2016 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Fri Sep 30 14:13:46 2016 -0700 ---------------------------------------------------------------------- .../gcp/bigquery/BigQueryTableRowIterator.java | 114 ++++++++++----- .../bigquery/BigQueryTableRowIteratorTest.java | 143 +++++++++++++++---- 2 files changed, 190 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b4550d2/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java index 0ee01d9..64b1dc6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java @@ -28,7 +28,6 @@ import com.google.api.client.util.ClassInfo; import com.google.api.client.util.Data; import com.google.api.client.util.Sleeper; import com.google.api.services.bigquery.Bigquery; -import com.google.api.services.bigquery.Bigquery.Jobs.Insert; import com.google.api.services.bigquery.model.Dataset; import com.google.api.services.bigquery.model.DatasetReference; import com.google.api.services.bigquery.model.ErrorProto; @@ -36,6 +35,7 @@ import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobConfiguration; import com.google.api.services.bigquery.model.JobConfigurationQuery; import com.google.api.services.bigquery.model.JobReference; +import com.google.api.services.bigquery.model.JobStatistics; import com.google.api.services.bigquery.model.JobStatus; import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableCell; @@ -137,16 +137,7 @@ class BigQueryTableRowIterator implements AutoCloseable { ref = executeQueryAndWaitForCompletion(); } // Get table schema. - Bigquery.Tables.Get get = - client.tables().get(ref.getProjectId(), ref.getDatasetId(), ref.getTableId()); - - Table table = - executeWithBackOff( - get, - "Error opening BigQuery table %s of dataset %s : {}", - ref.getTableId(), - ref.getDatasetId()); - schema = table.getSchema(); + schema = getTable(ref).getSchema(); } public boolean advance() throws IOException, InterruptedException { @@ -168,12 +159,11 @@ class BigQueryTableRowIterator implements AutoCloseable { list.setPageToken(pageToken); } - TableDataList result = - executeWithBackOff( - list, - "Error reading from BigQuery table %s of dataset %s : {}", - ref.getTableId(), - ref.getDatasetId()); + TableDataList result = executeWithBackOff( + list, + String.format( + "Error reading from BigQuery table %s of dataset %s.", + ref.getTableId(), ref.getDatasetId())); pageToken = result.getPageToken(); iteratorOverCurrentBatch = @@ -332,19 +322,36 @@ class BigQueryTableRowIterator implements AutoCloseable { return row; } + // Get the BiqQuery table. + private Table getTable(TableReference ref) throws IOException, InterruptedException { + Bigquery.Tables.Get get = + client.tables().get(ref.getProjectId(), ref.getDatasetId(), ref.getTableId()); + + return executeWithBackOff( + get, + String.format( + "Error opening BigQuery table %s of dataset %s.", + ref.getTableId(), + ref.getDatasetId())); + } + // Create a new BigQuery dataset - private void createDataset(String datasetId) throws IOException, InterruptedException { + private void createDataset(String datasetId, @Nullable String location) + throws IOException, InterruptedException { Dataset dataset = new Dataset(); DatasetReference reference = new DatasetReference(); reference.setProjectId(projectId); reference.setDatasetId(datasetId); dataset.setDatasetReference(reference); + if (location != null) { + dataset.setLocation(location); + } - String createDatasetError = - "Error when trying to create the temporary dataset " + datasetId + " in project " - + projectId; executeWithBackOff( - client.datasets().insert(projectId, dataset), createDatasetError + " :{}"); + client.datasets().insert(projectId, dataset), + String.format( + "Error when trying to create the temporary dataset %s in project %s.", + datasetId, projectId)); } // Delete the given table that is available in the given dataset. @@ -352,16 +359,20 @@ class BigQueryTableRowIterator implements AutoCloseable { throws IOException, InterruptedException { executeWithBackOff( client.tables().delete(projectId, datasetId, tableId), - "Error when trying to delete the temporary table " + datasetId + " in dataset " + datasetId - + " of project " + projectId + ". Manual deletion may be required. Error message : {}"); + String.format( + "Error when trying to delete the temporary table %s in dataset %s of project %s. " + + "Manual deletion may be required.", + tableId, datasetId, projectId)); } // Delete the given dataset. This will fail if the given dataset has any tables. private void deleteDataset(String datasetId) throws IOException, InterruptedException { executeWithBackOff( client.datasets().delete(projectId, datasetId), - "Error when trying to delete the temporary dataset " + datasetId + " in project " - + projectId + ". Manual deletion may be required. Error message : {}"); + String.format( + "Error when trying to delete the temporary dataset %s in project %s. " + + "Manual deletion may be required.", + datasetId, projectId)); } /** @@ -372,13 +383,31 @@ class BigQueryTableRowIterator implements AutoCloseable { */ private TableReference executeQueryAndWaitForCompletion() throws IOException, InterruptedException { + // Dry run query to get source table location + Job dryRunJob = new Job() + .setConfiguration(new JobConfiguration() + .setQuery(new JobConfigurationQuery() + .setQuery(query)) + .setDryRun(true)); + JobStatistics jobStats = executeWithBackOff( + client.jobs().insert(projectId, dryRunJob), + String.format("Error when trying to dry run query %s.", query)).getStatistics(); + + // Let BigQuery to pick default location if the query does not read any tables. + String location = null; + @Nullable List<TableReference> tables = jobStats.getQuery().getReferencedTables(); + if (tables != null && !tables.isEmpty()) { + Table table = getTable(tables.get(0)); + location = table.getLocation(); + } + // Create a temporary dataset to store results. // Starting dataset name with an "_" so that it is hidden. Random rnd = new Random(System.currentTimeMillis()); temporaryDatasetId = "_dataflow_temporary_dataset_" + rnd.nextInt(1000000); temporaryTableId = "dataflow_temporary_table_" + rnd.nextInt(1000000); - createDataset(temporaryDatasetId); + createDataset(temporaryDatasetId, location); Job job = new Job(); JobConfiguration config = new JobConfiguration(); JobConfigurationQuery queryConfig = new JobConfigurationQuery(); @@ -394,15 +423,15 @@ class BigQueryTableRowIterator implements AutoCloseable { destinationTable.setTableId(temporaryTableId); queryConfig.setDestinationTable(destinationTable); - Insert insert = client.jobs().insert(projectId, job); Job queryJob = executeWithBackOff( - insert, "Error when trying to execute the job for query " + query + " :{}"); + client.jobs().insert(projectId, job), + String.format("Error when trying to execute the job for query %s.", query)); JobReference jobId = queryJob.getJobReference(); while (true) { Job pollJob = executeWithBackOff( client.jobs().get(projectId, jobId.getJobId()), - "Error when trying to get status of the job for query " + query + " :{}"); + String.format("Error when trying to get status of the job for query %s.", query)); JobStatus status = pollJob.getStatus(); if (status.getState().equals("DONE")) { // Job is DONE, but did not necessarily succeed. @@ -420,12 +449,23 @@ class BigQueryTableRowIterator implements AutoCloseable { } } + /** + * Execute a BQ request with exponential backoff and return the result. + * + * @deprecated use {@link #executeWithBackOff(AbstractGoogleClientRequest, String)}. + */ + @Deprecated + public static <T> T executeWithBackOff(AbstractGoogleClientRequest<T> client, String error, + Object... errorArgs) throws IOException, InterruptedException { + return executeWithBackOff(client, String.format(error, errorArgs)); + } + // Execute a BQ request with exponential backoff and return the result. // client - BQ request to be executed // error - Formatted message to log if when a request fails. Takes exception message as a // formatter parameter. - public static <T> T executeWithBackOff(AbstractGoogleClientRequest<T> client, String error, - Object... errorArgs) throws IOException, InterruptedException { + public static <T> T executeWithBackOff(AbstractGoogleClientRequest<T> client, String error) + throws IOException, InterruptedException { Sleeper sleeper = Sleeper.DEFAULT; BackOff backOff = FluentBackoff.DEFAULT @@ -437,15 +477,15 @@ class BigQueryTableRowIterator implements AutoCloseable { result = client.execute(); break; } catch (IOException e) { - LOG.error(String.format(error, errorArgs), e.getMessage()); + LOG.error("{}", error, e); if (!BackOffUtils.next(sleeper, backOff)) { - LOG.error( - String.format(error, errorArgs), "Failing after retrying " + MAX_RETRIES + " times."); - throw e; + String errorMessage = String.format( + "%s Failing to execute job after %d attempts.", error, MAX_RETRIES + 1); + LOG.error("{}", errorMessage, e); + throw new IOException(errorMessage, e); } } } - return result; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b4550d2/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java index 040f884..29a1704 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java @@ -32,11 +32,12 @@ import static org.mockito.Mockito.when; import com.google.api.services.bigquery.Bigquery; import com.google.api.services.bigquery.model.Dataset; -import com.google.api.services.bigquery.model.ErrorProto; import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobConfiguration; import com.google.api.services.bigquery.model.JobConfigurationQuery; import com.google.api.services.bigquery.model.JobReference; +import com.google.api.services.bigquery.model.JobStatistics; +import com.google.api.services.bigquery.model.JobStatistics2; import com.google.api.services.bigquery.model.JobStatus; import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableCell; @@ -45,6 +46,7 @@ import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; +import com.google.common.collect.ImmutableList; import com.google.common.io.BaseEncoding; import java.io.IOException; import java.util.Arrays; @@ -130,6 +132,22 @@ public class BigQueryTableRowIteratorTest { new TableFieldSchema().setName("anniversary_time").setType("TIME")))); } + private static Table noTableQuerySchema() { + return new Table() + .setSchema( + new TableSchema() + .setFields( + Arrays.asList( + new TableFieldSchema().setName("name").setType("STRING"), + new TableFieldSchema().setName("count").setType("INTEGER"), + new TableFieldSchema().setName("photo").setType("BYTES")))); + } + + private static Table tableWithLocation() { + return new Table() + .setLocation("EU"); + } + private TableRow rawRow(Object... args) { List<TableCell> cells = new LinkedList<>(); for (Object a : args) { @@ -149,8 +167,11 @@ public class BigQueryTableRowIteratorTest { @Test public void testReadFromQuery() throws IOException, InterruptedException { // Mock job inserting. + Job dryRunJob = new Job().setStatistics( + new JobStatistics().setQuery(new JobStatistics2().setReferencedTables( + ImmutableList.of(new TableReference())))); Job insertedJob = new Job().setJobReference(new JobReference()); - when(mockJobsInsert.execute()).thenReturn(insertedJob); + when(mockJobsInsert.execute()).thenReturn(dryRunJob, insertedJob); // Mock job polling. JobStatus status = new JobStatus().setState("DONE"); @@ -165,7 +186,7 @@ public class BigQueryTableRowIteratorTest { when(mockJobsGet.execute()).thenReturn(getJob); // Mock table schema fetch. - when(mockTablesGet.execute()).thenReturn(tableWithBasicSchema()); + when(mockTablesGet.execute()).thenReturn(tableWithLocation(), tableWithBasicSchema()); byte[] photoBytes = "photograph".getBytes(); String photoBytesEncoded = BaseEncoding.base64().encode(photoBytes); @@ -206,15 +227,91 @@ public class BigQueryTableRowIteratorTest { verify(mockDatasets).delete(anyString(), anyString()); verify(mockDatasetsDelete).execute(); // Job inserted to run the query, polled once. - verify(mockClient, times(2)).jobs(); - verify(mockJobs).insert(anyString(), any(Job.class)); - verify(mockJobsInsert).execute(); + verify(mockClient, times(3)).jobs(); + verify(mockJobs, times(2)).insert(anyString(), any(Job.class)); + verify(mockJobsInsert, times(2)).execute(); + verify(mockJobs).get(anyString(), anyString()); + verify(mockJobsGet).execute(); + // Temp table get after query finish, deleted after reading. + verify(mockClient, times(3)).tables(); + verify(mockTables, times(2)).get(anyString(), anyString(), anyString()); + verify(mockTablesGet, times(2)).execute(); + verify(mockTables).delete(anyString(), anyString(), anyString()); + verify(mockTablesDelete).execute(); + // Table data read. + verify(mockClient).tabledata(); + verify(mockTabledata).list("project", "dataset", "table"); + verify(mockTabledataList).execute(); + } + + /** + * Verifies that queries that reference no data can be read. + */ + @Test + public void testReadFromQueryNoTables() throws IOException, InterruptedException { + // Mock job inserting. + Job dryRunJob = new Job().setStatistics( + new JobStatistics().setQuery(new JobStatistics2())); + Job insertedJob = new Job().setJobReference(new JobReference()); + when(mockJobsInsert.execute()).thenReturn(dryRunJob, insertedJob); + + // Mock job polling. + JobStatus status = new JobStatus().setState("DONE"); + TableReference tableRef = + new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table"); + JobConfigurationQuery queryConfig = new JobConfigurationQuery().setDestinationTable(tableRef); + Job getJob = + new Job() + .setJobReference(new JobReference()) + .setStatus(status) + .setConfiguration(new JobConfiguration().setQuery(queryConfig)); + when(mockJobsGet.execute()).thenReturn(getJob); + + // Mock table schema fetch. + when(mockTablesGet.execute()).thenReturn(noTableQuerySchema()); + + byte[] photoBytes = "photograph".getBytes(); + String photoBytesEncoded = BaseEncoding.base64().encode(photoBytes); + // Mock table data fetch. + when(mockTabledataList.execute()).thenReturn( + rawDataList(rawRow("Arthur", 42, photoBytesEncoded))); + + // Run query and verify + String query = String.format( + "SELECT \"Arthur\" as name, 42 as count, \"%s\" as photo", + photoBytesEncoded); + try (BigQueryTableRowIterator iterator = + BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null)) { + iterator.open(); + assertTrue(iterator.advance()); + TableRow row = iterator.getCurrent(); + + assertTrue(row.containsKey("name")); + assertTrue(row.containsKey("count")); + assertTrue(row.containsKey("photo")); + assertEquals("Arthur", row.get("name")); + assertEquals(42, row.get("count")); + assertEquals(photoBytesEncoded, row.get("photo")); + + assertFalse(iterator.advance()); + } + + // Temp dataset created and later deleted. + verify(mockClient, times(2)).datasets(); + verify(mockDatasets).insert(anyString(), any(Dataset.class)); + verify(mockDatasetsInsert).execute(); + verify(mockDatasets).delete(anyString(), anyString()); + verify(mockDatasetsDelete).execute(); + // Job inserted to run the query, polled once. + verify(mockClient, times(3)).jobs(); + verify(mockJobs, times(2)).insert(anyString(), any(Job.class)); + verify(mockJobsInsert, times(2)).execute(); verify(mockJobs).get(anyString(), anyString()); verify(mockJobsGet).execute(); // Temp table get after query finish, deleted after reading. verify(mockClient, times(2)).tables(); - verify(mockTables).get("project", "dataset", "table"); - verify(mockTablesGet).execute(); + verify(mockTables, times(1)).get(anyString(), anyString(), anyString()); + verify(mockTablesGet, times(1)).execute(); verify(mockTables).delete(anyString(), anyString(), anyString()); verify(mockTablesDelete).execute(); // Table data read. @@ -230,43 +327,29 @@ public class BigQueryTableRowIteratorTest { */ @Test public void testQueryFailed() throws IOException { - // Job can be created. - JobReference ref = new JobReference(); - Job insertedJob = new Job().setJobReference(ref); - when(mockJobsInsert.execute()).thenReturn(insertedJob); - // Job state polled with an error. String errorReason = "bad query"; - JobStatus status = - new JobStatus().setState("DONE").setErrorResult(new ErrorProto().setMessage(errorReason)); - Job getJob = new Job().setJobReference(ref).setStatus(status); - when(mockJobsGet.execute()).thenReturn(getJob); + Exception exception = new IOException(errorReason); + when(mockJobsInsert.execute()).thenThrow(exception, exception, exception, exception); String query = "NOT A QUERY"; try (BigQueryTableRowIterator iterator = - BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null)) { + BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null)) { + try { iterator.open(); fail(); } catch (Exception expected) { // Verify message explains cause and reports the query. - assertThat(expected.getMessage(), containsString("failed")); - assertThat(expected.getMessage(), containsString(errorReason)); + assertThat(expected.getMessage(), containsString("Error")); assertThat(expected.getMessage(), containsString(query)); + assertThat(expected.getCause().getMessage(), containsString(errorReason)); } } - // Temp dataset created and then later deleted. - verify(mockClient, times(2)).datasets(); - verify(mockDatasets).insert(anyString(), any(Dataset.class)); - verify(mockDatasetsInsert).execute(); - verify(mockDatasets).delete(anyString(), anyString()); - verify(mockDatasetsDelete).execute(); // Job inserted to run the query, then polled once. - verify(mockClient, times(2)).jobs(); + verify(mockClient, times(1)).jobs(); verify(mockJobs).insert(anyString(), any(Job.class)); - verify(mockJobsInsert).execute(); - verify(mockJobs).get(anyString(), anyString()); - verify(mockJobsGet).execute(); + verify(mockJobsInsert, times(4)).execute(); } }