Repository: incubator-beam Updated Branches: refs/heads/master 3c4b6930e -> 3e1a62815
[BEAM-1022] Add testing coverage for BigQuery streaming writes Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/51900830 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/51900830 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/51900830 Branch: refs/heads/master Commit: 519008303f9cefd3f8f4a8a7a98a9a79717f57ff Parents: 3c4b693 Author: Reuven Lax <re...@google.com> Authored: Thu Nov 17 10:57:41 2016 -0800 Committer: Dan Halperin <dhalp...@google.com> Committed: Thu Dec 15 11:45:45 2016 -0800 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 48 +- .../sdk/io/gcp/bigquery/BigQueryServices.java | 7 +- .../io/gcp/bigquery/BigQueryServicesImpl.java | 121 ++++- .../io/gcp/bigquery/BigQueryTableInserter.java | 217 --------- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 456 +++++++++++++++---- .../gcp/bigquery/BigQueryServicesImplTest.java | 139 +++++- .../gcp/bigquery/BigQueryTableInserterTest.java | 245 ---------- .../sdk/io/gcp/bigquery/BigQueryUtilTest.java | 50 +- 8 files changed, 655 insertions(+), 628 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51900830/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 0be8567..28049ed 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -24,7 +24,6 @@ import static com.google.common.base.Preconditions.checkState; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.api.client.json.JsonFactory; -import com.google.api.services.bigquery.Bigquery; import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobConfigurationExtract; import com.google.api.services.bigquery.model.JobConfigurationLoad; @@ -33,6 +32,7 @@ import com.google.api.services.bigquery.model.JobConfigurationTableCopy; import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.JobStatistics; import com.google.api.services.bigquery.model.JobStatus; +import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; @@ -1796,8 +1796,8 @@ public class BigQueryIO { * <p>Does not modify this object. */ public Bound withCreateDisposition(CreateDisposition createDisposition) { - return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, - writeDisposition, validate, bigQueryServices); + return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, + createDisposition, writeDisposition, validate, bigQueryServices); } /** @@ -1806,8 +1806,8 @@ public class BigQueryIO { * <p>Does not modify this object. */ public Bound withWriteDisposition(WriteDisposition writeDisposition) { - return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, - writeDisposition, validate, bigQueryServices); + return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, + createDisposition, writeDisposition, validate, bigQueryServices); } /** @@ -2136,7 +2136,8 @@ public class BigQueryIO { /** Returns the table reference, or {@code null}. */ @Nullable public ValueProvider<TableReference> getTable() { - return NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef()); + return jsonTableRef == null ? null : + NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef()); } /** Returns {@code true} if table validation is enabled. */ @@ -2550,6 +2551,13 @@ public class BigQueryIO { } } + /** + * Clear the cached map of created tables. Used for testing. + */ + @VisibleForTesting + static void clearCreatedTables() { + StreamingWriteFn.clearCreatedTables(); + } ///////////////////////////////////////////////////////////////////////////// /** @@ -2585,6 +2593,15 @@ public class BigQueryIO { this.bqServices = checkNotNull(bqServices, "bqServices"); } + /** + * Clear the cached map of created tables. Used for testing. + */ + private static void clearCreatedTables() { + synchronized (createdTables) { + createdTables.clear(); + } + } + /** Prepares a target BigQuery table. */ @StartBundle public void startBundle(Context context) { @@ -2626,20 +2643,25 @@ public class BigQueryIO { } public TableReference getOrCreateTable(BigQueryOptions options, String tableSpec) - throws IOException { + throws InterruptedException, IOException { TableReference tableReference = parseTableSpec(tableSpec); if (!createdTables.contains(tableSpec)) { synchronized (createdTables) { // Another thread may have succeeded in creating the table in the meanwhile, so // check again. This check isn't needed for correctness, but we add it to prevent // every thread from attempting a create and overwhelming our BigQuery quota. + DatasetService datasetService = bqServices.getDatasetService(options); if (!createdTables.contains(tableSpec)) { - TableSchema tableSchema = JSON_FACTORY.fromString( - jsonTableSchema.get(), TableSchema.class); - Bigquery client = Transport.newBigQueryClient(options).build(); - BigQueryTableInserter inserter = new BigQueryTableInserter(client, options); - inserter.getOrCreateTable(tableReference, Write.WriteDisposition.WRITE_APPEND, - Write.CreateDisposition.CREATE_IF_NEEDED, tableSchema); + Table table = datasetService.getTable( + tableReference.getProjectId(), + tableReference.getDatasetId(), + tableReference.getTableId()); + if (table == null) { + TableSchema tableSchema = JSON_FACTORY.fromString( + jsonTableSchema.get(), TableSchema.class); + datasetService.createTable( + new Table().setTableReference(tableReference).setSchema(tableSchema)); + } createdTables.add(tableSpec); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51900830/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java index 07dc06e..8ca473d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java @@ -114,12 +114,17 @@ interface BigQueryServices extends Serializable { */ interface DatasetService { /** - * Gets the specified {@link Table} resource by table ID. + * Gets the specified {@link Table} resource by table ID or {@code null} if no table exists. */ Table getTable(String projectId, String datasetId, String tableId) throws InterruptedException, IOException; /** + * Creates the specified table if it does not exist. + */ + void createTable(Table table) throws InterruptedException, IOException; + + /** * Deletes the table specified by tableId from the dataset. * If the table contains data, all the data will be deleted. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51900830/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 61f1a1a..4eb8e7b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -23,6 +23,7 @@ import com.google.api.client.googleapis.json.GoogleJsonResponseException; import com.google.api.client.googleapis.services.AbstractGoogleClientRequest; import com.google.api.client.util.BackOff; import com.google.api.client.util.BackOffUtils; +import com.google.api.client.util.ExponentialBackOff; import com.google.api.client.util.Sleeper; import com.google.api.services.bigquery.Bigquery; import com.google.api.services.bigquery.model.Dataset; @@ -53,10 +54,12 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import org.apache.beam.sdk.options.BigQueryOptions; import org.apache.beam.sdk.options.GcsOptions; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.Transport; import org.joda.time.Duration; @@ -281,7 +284,8 @@ class BigQueryServicesImpl implements BigQueryServices { "Unable to dry run query: %s, aborting after %d retries.", queryConfig, MAX_RPC_RETRIES), Sleeper.DEFAULT, - backoff).getStatistics(); + backoff, + ALWAYS_RETRY).getStatistics(); } /** @@ -400,7 +404,80 @@ class BigQueryServicesImpl implements BigQueryServices { "Unable to get table: %s, aborting after %d retries.", tableId, MAX_RPC_RETRIES), Sleeper.DEFAULT, - backoff); + backoff, + DONT_RETRY_NOT_FOUND); + } + + /** + * Retry table creation up to 5 minutes (with exponential backoff) when this user is near the + * quota for table creation. This relatively innocuous behavior can happen when BigQueryIO is + * configured with a table spec function to use different tables for each window. + */ + private static final int RETRY_CREATE_TABLE_DURATION_MILLIS = + (int) TimeUnit.MINUTES.toMillis(5); + + /** + * {@inheritDoc} + * + * <p>If a table with the same name already exists in the dataset, the function simply + * returns. In such a case, + * the existing table doesn't necessarily have the same schema as specified + * by the parameter. + * + * @throws IOException if other error than already existing table occurs. + */ + @Override + public void createTable(Table table) throws InterruptedException, IOException { + LOG.info("Trying to create BigQuery table: {}", + BigQueryIO.toTableSpec(table.getTableReference())); + BackOff backoff = + new ExponentialBackOff.Builder() + .setMaxElapsedTimeMillis(RETRY_CREATE_TABLE_DURATION_MILLIS) + .build(); + + tryCreateTable(table, backoff, Sleeper.DEFAULT); + } + + @VisibleForTesting + @Nullable + Table tryCreateTable(Table table, BackOff backoff, Sleeper sleeper) + throws IOException { + boolean retry = false; + while (true) { + try { + return client.tables().insert( + table.getTableReference().getProjectId(), + table.getTableReference().getDatasetId(), + table).execute(); + } catch (IOException e) { + ApiErrorExtractor extractor = new ApiErrorExtractor(); + if (extractor.itemAlreadyExists(e)) { + // The table already exists, nothing to return. + return null; + } else if (extractor.rateLimited(e)) { + // The request failed because we hit a temporary quota. Back off and try again. + try { + if (BackOffUtils.next(sleeper, backoff)) { + if (!retry) { + LOG.info( + "Quota limit reached when creating table {}:{}.{}, retrying up to {} minutes", + table.getTableReference().getProjectId(), + table.getTableReference().getDatasetId(), + table.getTableReference().getTableId(), + TimeUnit.MILLISECONDS.toSeconds(RETRY_CREATE_TABLE_DURATION_MILLIS) / 60.0); + retry = true; + } + continue; + } + } catch (InterruptedException e1) { + // Restore interrupted state and throw the last failure. + Thread.currentThread().interrupt(); + throw e; + } + } + throw e; + } + } } /** @@ -422,7 +499,8 @@ class BigQueryServicesImpl implements BigQueryServices { "Unable to delete table: %s, aborting after %d retries.", tableId, MAX_RPC_RETRIES), Sleeper.DEFAULT, - backoff); + backoff, + ALWAYS_RETRY); } @Override @@ -437,7 +515,8 @@ class BigQueryServicesImpl implements BigQueryServices { "Unable to list table data: %s, aborting after %d retries.", tableId, MAX_RPC_RETRIES), Sleeper.DEFAULT, - backoff); + backoff, + ALWAYS_RETRY); return dataList.getRows() == null || dataList.getRows().isEmpty(); } @@ -460,7 +539,8 @@ class BigQueryServicesImpl implements BigQueryServices { "Unable to get dataset: %s, aborting after %d retries.", datasetId, MAX_RPC_RETRIES), Sleeper.DEFAULT, - backoff); + backoff, + DONT_RETRY_NOT_FOUND); } /** @@ -543,7 +623,8 @@ class BigQueryServicesImpl implements BigQueryServices { "Unable to delete table: %s, aborting after %d retries.", datasetId, MAX_RPC_RETRIES), Sleeper.DEFAULT, - backoff); + backoff, + ALWAYS_RETRY); } @VisibleForTesting @@ -684,8 +765,8 @@ class BigQueryServicesImpl implements BigQueryServices { public long insertAll( TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList) throws IOException, InterruptedException { - return insertAll( - ref, rowList, insertIdList, INSERT_BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT); + return insertAll( + ref, rowList, insertIdList, INSERT_BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT); } } @@ -747,12 +828,31 @@ class BigQueryServicesImpl implements BigQueryServices { } } + static final SerializableFunction<IOException, Boolean> DONT_RETRY_NOT_FOUND = + new SerializableFunction<IOException, Boolean>() { + @Override + public Boolean apply(IOException input) { + ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); + return !errorExtractor.itemNotFound(input); + } + }; + + static final SerializableFunction<IOException, Boolean> ALWAYS_RETRY = + new SerializableFunction<IOException, Boolean>() { + @Override + public Boolean apply(IOException input) { + return true; + } + }; + + @VisibleForTesting static <T> T executeWithRetries( AbstractGoogleClientRequest<T> request, String errorMessage, Sleeper sleeper, - BackOff backoff) + BackOff backoff, + SerializableFunction<IOException, Boolean> shouldRetry) throws IOException, InterruptedException { Exception lastException = null; do { @@ -761,6 +861,9 @@ class BigQueryServicesImpl implements BigQueryServices { } catch (IOException e) { LOG.warn("Ignore the error and retry the request.", e); lastException = e; + if (!shouldRetry.apply(e)) { + break; + } } } while (nextBackOff(sleeper, backoff)); throw new IOException( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51900830/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserter.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserter.java deleted file mode 100644 index a64dc9f..0000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserter.java +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.bigquery; - -import com.google.api.client.util.BackOff; -import com.google.api.client.util.BackOffUtils; -import com.google.api.client.util.ExponentialBackOff; -import com.google.api.client.util.Sleeper; -import com.google.api.services.bigquery.Bigquery; -import com.google.api.services.bigquery.model.Table; -import com.google.api.services.bigquery.model.TableDataList; -import com.google.api.services.bigquery.model.TableReference; -import com.google.api.services.bigquery.model.TableSchema; -import com.google.cloud.hadoop.util.ApiErrorExtractor; -import com.google.common.annotations.VisibleForTesting; -import java.io.IOException; -import java.util.concurrent.TimeUnit; -import javax.annotation.Nullable; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; -import org.apache.beam.sdk.options.PipelineOptions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Inserts rows into BigQuery. - */ -class BigQueryTableInserter { - private static final Logger LOG = LoggerFactory.getLogger(BigQueryTableInserter.class); - - private final Bigquery client; - - /** - * Constructs a new row inserter. - * - * @param client a BigQuery client - * @param options a PipelineOptions object - */ - BigQueryTableInserter(Bigquery client, PipelineOptions options) { - this.client = client; - } - - /** - * Retrieves or creates the table. - * - * <p>The table is checked to conform to insertion requirements as specified - * by WriteDisposition and CreateDisposition. - * - * <p>If table truncation is requested (WriteDisposition.WRITE_TRUNCATE), then - * this will re-create the table if necessary to ensure it is empty. - * - * <p>If an empty table is required (WriteDisposition.WRITE_EMPTY), then this - * will fail if the table exists and is not empty. - * - * <p>When constructing a table, a {@code TableSchema} must be available. If a - * schema is provided, then it will be used. If no schema is provided, but - * an existing table is being cleared (WRITE_TRUNCATE option above), then - * the existing schema will be re-used. If no schema is available, then an - * {@code IOException} is thrown. - */ - Table getOrCreateTable( - TableReference ref, - WriteDisposition writeDisposition, - CreateDisposition createDisposition, - @Nullable TableSchema schema) throws IOException { - // Check if table already exists. - Bigquery.Tables.Get get = client.tables() - .get(ref.getProjectId(), ref.getDatasetId(), ref.getTableId()); - Table table = null; - try { - table = get.execute(); - } catch (IOException e) { - ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); - if (!errorExtractor.itemNotFound(e) - || createDisposition != CreateDisposition.CREATE_IF_NEEDED) { - // Rethrow. - throw e; - } - } - - // If we want an empty table, and it isn't, then delete it first. - if (table != null) { - if (writeDisposition == WriteDisposition.WRITE_APPEND) { - return table; - } - - boolean empty = isEmpty(ref); - if (empty) { - if (writeDisposition == WriteDisposition.WRITE_TRUNCATE) { - LOG.info("Empty table found, not removing {}", BigQueryIO.toTableSpec(ref)); - } - return table; - - } else if (writeDisposition == WriteDisposition.WRITE_EMPTY) { - throw new IOException("WriteDisposition is WRITE_EMPTY, " - + "but table is not empty"); - } - - // Reuse the existing schema if none was provided. - if (schema == null) { - schema = table.getSchema(); - } - - // Delete table and fall through to re-creating it below. - LOG.info("Deleting table {}", BigQueryIO.toTableSpec(ref)); - Bigquery.Tables.Delete delete = client.tables() - .delete(ref.getProjectId(), ref.getDatasetId(), ref.getTableId()); - delete.execute(); - } - - if (schema == null) { - throw new IllegalArgumentException( - "Table schema required for new table."); - } - - // Create the table. - return tryCreateTable(ref, schema); - } - - /** - * Checks if a table is empty. - */ - private boolean isEmpty(TableReference ref) throws IOException { - Bigquery.Tabledata.List list = client.tabledata() - .list(ref.getProjectId(), ref.getDatasetId(), ref.getTableId()); - list.setMaxResults(1L); - TableDataList dataList = list.execute(); - - return dataList.getRows() == null || dataList.getRows().isEmpty(); - } - - /** - * Retry table creation up to 5 minutes (with exponential backoff) when this user is near the - * quota for table creation. This relatively innocuous behavior can happen when BigQueryIO is - * configured with a table spec function to use different tables for each window. - */ - private static final int RETRY_CREATE_TABLE_DURATION_MILLIS = (int) TimeUnit.MINUTES.toMillis(5); - - /** - * Tries to create the BigQuery table. - * If a table with the same name already exists in the dataset, the table - * creation fails, and the function returns null. In such a case, - * the existing table doesn't necessarily have the same schema as specified - * by the parameter. - * - * @param schema Schema of the new BigQuery table. - * @return The newly created BigQuery table information, or null if the table - * with the same name already exists. - * @throws IOException if other error than already existing table occurs. - */ - @Nullable - private Table tryCreateTable(TableReference ref, TableSchema schema) throws IOException { - LOG.info("Trying to create BigQuery table: {}", BigQueryIO.toTableSpec(ref)); - BackOff backoff = - new ExponentialBackOff.Builder() - .setMaxElapsedTimeMillis(RETRY_CREATE_TABLE_DURATION_MILLIS) - .build(); - - Table table = new Table().setTableReference(ref).setSchema(schema); - return tryCreateTable(table, ref.getProjectId(), ref.getDatasetId(), backoff, Sleeper.DEFAULT); - } - - @VisibleForTesting - @Nullable - Table tryCreateTable( - Table table, String projectId, String datasetId, BackOff backoff, Sleeper sleeper) - throws IOException { - boolean retry = false; - while (true) { - try { - return client.tables().insert(projectId, datasetId, table).execute(); - } catch (IOException e) { - ApiErrorExtractor extractor = new ApiErrorExtractor(); - if (extractor.itemAlreadyExists(e)) { - // The table already exists, nothing to return. - return null; - } else if (extractor.rateLimited(e)) { - // The request failed because we hit a temporary quota. Back off and try again. - try { - if (BackOffUtils.next(sleeper, backoff)) { - if (!retry) { - LOG.info( - "Quota limit reached when creating table {}:{}.{}, retrying up to {} minutes", - projectId, - datasetId, - table.getTableReference().getTableId(), - TimeUnit.MILLISECONDS.toSeconds(RETRY_CREATE_TABLE_DURATION_MILLIS) / 60.0); - retry = true; - } - continue; - } - } catch (InterruptedException e1) { - // Restore interrupted state and throw the last failure. - Thread.currentThread().interrupt(); - throw e; - } - } - throw e; - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51900830/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 54ec2bb..b78316f 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -23,11 +23,13 @@ import static com.google.common.base.Preconditions.checkState; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.fromJsonString; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.toJsonString; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doNothing; @@ -58,18 +60,20 @@ import com.google.common.collect.HashBasedTable; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; -import com.google.common.collect.Table.Cell; import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileFilter; import java.io.IOException; +import java.io.InputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.io.OutputStream; import java.io.Serializable; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -85,6 +89,8 @@ import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -133,6 +139,9 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; @@ -146,6 +155,7 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.hamcrest.CoreMatchers; import org.hamcrest.Matchers; +import org.joda.time.Instant; import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; @@ -478,39 +488,81 @@ public class BigQueryIOTest implements Serializable { } } - /** A fake dataset service that can be serialized, for use in testReadFromTable. */ - private static class FakeDatasetService implements DatasetService, Serializable { - private com.google.common.collect.Table<String, String, Map<String, Table>> tables = - HashBasedTable.create(); - - public FakeDatasetService withTable( - String projectId, String datasetId, String tableId, Table table) throws IOException { - Map<String, Table> dataset = tables.get(projectId, datasetId); - if (dataset == null) { - dataset = new HashMap<>(); - tables.put(projectId, datasetId, dataset); - } - dataset.put(tableId, table); + private static class TableContainer { + Table table; + List<TableRow> rows; + List<String> ids; + + TableContainer(Table table) { + this.table = table; + this.rows = new ArrayList<>(); + this.ids = new ArrayList<>(); + } + + TableContainer addRow(TableRow row, String id) { + rows.add(row); + ids.add(id); return this; } + Table getTable() { + return table; + } + + List<TableRow> getRows() { + return rows; + } + } + + // Table information must be static, as each ParDo will get a separate instance of + // FakeDatasetServices, and they must all modify the same storage. + private static com.google.common.collect.Table<String, String, Map<String, TableContainer>> + tables = HashBasedTable.create(); + + /** A fake dataset service that can be serialized, for use in testReadFromTable. */ + private static class FakeDatasetService implements DatasetService, Serializable { + @Override public Table getTable(String projectId, String datasetId, String tableId) throws InterruptedException, IOException { - Map<String, Table> dataset = - checkNotNull( - tables.get(projectId, datasetId), - "Tried to get a table %s:%s.%s from %s, but no such table was set", - projectId, - datasetId, - tableId, - FakeDatasetService.class.getSimpleName()); - return checkNotNull(dataset.get(tableId), - "Tried to get a table %s:%s.%s from %s, but no such table was set", - projectId, - datasetId, - tableId, - FakeDatasetService.class.getSimpleName()); + synchronized (tables) { + Map<String, TableContainer> dataset = + checkNotNull( + tables.get(projectId, datasetId), + "Tried to get a dataset %s:%s from %s, but no such dataset was set", + projectId, + datasetId, + tableId, + FakeDatasetService.class.getSimpleName()); + TableContainer tableContainer = dataset.get(tableId); + return tableContainer == null ? null : tableContainer.getTable(); + } + } + + public List<TableRow> getAllRows(String projectId, String datasetId, String tableId) + throws InterruptedException, IOException { + synchronized (tables) { + return getTableContainer(projectId, datasetId, tableId).getRows(); + } + } + + private TableContainer getTableContainer(String projectId, String datasetId, String tableId) + throws InterruptedException, IOException { + synchronized (tables) { + Map<String, TableContainer> dataset = + checkNotNull( + tables.get(projectId, datasetId), + "Tried to get a dataset %s:%s from %s, but no such dataset was set", + projectId, + datasetId, + FakeDatasetService.class.getSimpleName()); + return checkNotNull(dataset.get(tableId), + "Tried to get a table %s:%s.%s from %s, but no such table was set", + projectId, + datasetId, + tableId, + FakeDatasetService.class.getSimpleName()); + } } @Override @@ -519,6 +571,26 @@ public class BigQueryIOTest implements Serializable { throw new UnsupportedOperationException("Unsupported"); } + + @Override + public void createTable(Table table) throws IOException { + TableReference tableReference = table.getTableReference(); + synchronized (tables) { + Map<String, TableContainer> dataset = + checkNotNull( + tables.get(tableReference.getProjectId(), tableReference.getDatasetId()), + "Tried to get a dataset %s:%s from %s, but no such table was set", + tableReference.getProjectId(), + tableReference.getDatasetId(), + FakeDatasetService.class.getSimpleName()); + TableContainer tableContainer = dataset.get(tableReference.getTableId()); + if (tableContainer == null) { + tableContainer = new TableContainer(table); + dataset.put(tableReference.getTableId(), tableContainer); + } + } + } + @Override public boolean isTableEmpty(String projectId, String datasetId, String tableId) throws IOException, InterruptedException { @@ -536,7 +608,13 @@ public class BigQueryIOTest implements Serializable { public void createDataset( String projectId, String datasetId, String location, String description) throws IOException, InterruptedException { - throw new UnsupportedOperationException("Unsupported"); + synchronized (tables) { + Map<String, TableContainer> dataset = tables.get(projectId, datasetId); + if (dataset == null) { + dataset = new HashMap<>(); + tables.put(projectId, datasetId, dataset); + } + } } @Override @@ -549,55 +627,18 @@ public class BigQueryIOTest implements Serializable { public long insertAll( TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList) throws IOException, InterruptedException { - throw new UnsupportedOperationException("Unsupported"); - } - - ////////////////////////////////// SERIALIZATION METHODS //////////////////////////////////// - private void writeObject(ObjectOutputStream out) throws IOException { - out.writeObject(replaceTablesWithBytes(this.tables)); - } - - private com.google.common.collect.Table<String, String, Map<String, byte[]>> - replaceTablesWithBytes( - com.google.common.collect.Table<String, String, Map<String, Table>> toCopy) - throws IOException { - com.google.common.collect.Table<String, String, Map<String, byte[]>> copy = - HashBasedTable.create(); - for (Cell<String, String, Map<String, Table>> cell : toCopy.cellSet()) { - HashMap<String, byte[]> dataset = new HashMap<>(); - copy.put(cell.getRowKey(), cell.getColumnKey(), dataset); - for (Map.Entry<String, Table> dsTables : cell.getValue().entrySet()) { - dataset.put( - dsTables.getKey(), Transport.getJsonFactory().toByteArray(dsTables.getValue())); + synchronized (tables) { + assertEquals(rowList.size(), insertIdList.size()); + + long dataSize = 0; + TableContainer tableContainer = getTableContainer( + ref.getProjectId(), ref.getDatasetId(), ref.getTableId()); + for (int i = 0; i < rowList.size(); ++i) { + tableContainer.addRow(rowList.get(i), insertIdList.get(i)); + dataSize += rowList.get(i).toString().length(); } + return dataSize; } - return copy; - } - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - com.google.common.collect.Table<String, String, Map<String, byte[]>> tablesTable = - (com.google.common.collect.Table<String, String, Map<String, byte[]>>) in.readObject(); - this.tables = replaceBytesWithTables(tablesTable); - } - - private com.google.common.collect.Table<String, String, Map<String, Table>> - replaceBytesWithTables( - com.google.common.collect.Table<String, String, Map<String, byte[]>> tablesTable) - throws IOException { - com.google.common.collect.Table<String, String, Map<String, Table>> copy = - HashBasedTable.create(); - for (Cell<String, String, Map<String, byte[]>> cell : tablesTable.cellSet()) { - HashMap<String, Table> dataset = new HashMap<>(); - copy.put(cell.getRowKey(), cell.getColumnKey(), dataset); - for (Map.Entry<String, byte[]> dsTables : cell.getValue().entrySet()) { - Table table = - Transport.getJsonFactory() - .createJsonParser(new ByteArrayInputStream(dsTables.getValue())) - .parse(Table.class); - dataset.put(dsTables.getKey(), table); - } - } - return copy; } } @@ -658,6 +699,8 @@ public class BigQueryIOTest implements Serializable { @Before public void setUp() throws IOException { MockitoAnnotations.initMocks(this); + tables = HashBasedTable.create(); + BigQueryIO.clearCreatedTables(); } @Test @@ -716,8 +759,11 @@ public class BigQueryIOTest implements Serializable { bqOptions.setProject(projectId); bqOptions.setTempLocation("gs://testbucket/testdir"); - FakeDatasetService fakeDatasetService = - new FakeDatasetService().withTable(projectId, datasetId, tableId, null); + FakeDatasetService fakeDatasetService = new FakeDatasetService(); + fakeDatasetService.createDataset(projectId, datasetId, "", ""); + TableReference tableReference = + new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId); + fakeDatasetService.createTable(new Table().setTableReference(tableReference)); FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() .withJobService(new FakeJobService()) @@ -765,7 +811,7 @@ public class BigQueryIOTest implements Serializable { p.apply("ReadMyTable", BigQueryIO.Read .from("foo.com:project:somedataset.sometable") - .fromQuery("query")); + .fromQuery("query")); p.run(); } @@ -829,7 +875,7 @@ public class BigQueryIOTest implements Serializable { @Test @Category(NeedsRunner.class) - public void testReadFromTable() throws IOException { + public void testReadFromTable() throws IOException, InterruptedException { BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); bqOptions.setProject("defaultProject"); bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); @@ -850,10 +896,15 @@ public class BigQueryIOTest implements Serializable { ImmutableList.of( new TableFieldSchema().setName("name").setType("STRING"), new TableFieldSchema().setName("number").setType("INTEGER")))); + sometable.setTableReference( + new TableReference() + .setProjectId("non-executing-project") + .setDatasetId("somedataset") + .setTableId("sometable")); sometable.setNumBytes(1024L * 1024L); - FakeDatasetService fakeDatasetService = - new FakeDatasetService() - .withTable("non-executing-project", "somedataset", "sometable", sometable); + FakeDatasetService fakeDatasetService = new FakeDatasetService(); + fakeDatasetService.createDataset("non-executing-project", "somedataset", "", ""); + fakeDatasetService.createTable(sometable); SerializableFunction<Void, Schema> schemaGenerator = new SerializableFunction<Void, Schema>() { @Override @@ -945,6 +996,216 @@ public class BigQueryIOTest implements Serializable { @Test @Category(NeedsRunner.class) + public void testStreamingWrite() throws Exception { + BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); + bqOptions.setProject("defaultProject"); + bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); + + FakeDatasetService datasetService = new FakeDatasetService(); + datasetService.createDataset("project-id", "dataset-id", "", ""); + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() + .withDatasetService(datasetService); + + Pipeline p = TestPipeline.create(bqOptions); + p.apply(Create.of( + new TableRow().set("name", "a").set("number", 1), + new TableRow().set("name", "b").set("number", 2), + new TableRow().set("name", "c").set("number", 3), + new TableRow().set("name", "d").set("number", 4)) + .withCoder(TableRowJsonCoder.of())) + .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED) + .apply(BigQueryIO.Write.to("project-id:dataset-id.table-id") + .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) + .withSchema(new TableSchema().setFields( + ImmutableList.of( + new TableFieldSchema().setName("name").setType("STRING"), + new TableFieldSchema().setName("number").setType("INTEGER")))) + .withTestServices(fakeBqServices) + .withoutValidation()); + p.run(); + + + assertThat(datasetService.getAllRows("project-id", "dataset-id", "table-id"), + containsInAnyOrder( + new TableRow().set("name", "a").set("number", 1), + new TableRow().set("name", "b").set("number", 2), + new TableRow().set("name", "c").set("number", 3), + new TableRow().set("name", "d").set("number", 4))); + } + + /** + * A generic window function that allows partitioning data into windows by a string value. + * + * <p>Logically, creates multiple global windows, and the user provides a function that + * decides which global window a value should go into. + */ + private static class PartitionedGlobalWindows extends + + NonMergingWindowFn<TableRow, PartitionedGlobalWindow> { + private SerializableFunction<TableRow, String> extractPartition; + + public PartitionedGlobalWindows(SerializableFunction<TableRow, String> extractPartition) { + this.extractPartition = extractPartition; + } + + @Override + public Collection<PartitionedGlobalWindow> assignWindows(AssignContext c) { + return Collections.singletonList(new PartitionedGlobalWindow( + extractPartition.apply(c.element()))); + } + + @Override + public boolean isCompatible(WindowFn<?, ?> o) { + return o instanceof PartitionedGlobalWindows; + } + + @Override + public Coder<PartitionedGlobalWindow> windowCoder() { + return new PartitionedGlobalWindowCoder(); + } + + @Override + public PartitionedGlobalWindow getSideInputWindow(BoundedWindow window) { + throw new UnsupportedOperationException( + "PartitionedGlobalWindows is not allowed in side inputs"); + } + + @Override + public Instant getOutputTime(Instant inputTimestamp, PartitionedGlobalWindow window) { + return inputTimestamp; + } + } + + /** + * Custom Window object that encodes a String value. + */ + private static class PartitionedGlobalWindow extends BoundedWindow { + String value; + + public PartitionedGlobalWindow(String value) { + this.value = value; + } + + @Override + public Instant maxTimestamp() { + return GlobalWindow.INSTANCE.maxTimestamp(); + } + + // The following methods are only needed due to BEAM-1022. Once this issue is fixed, we will + // no longer need these. + @Override + public boolean equals(Object other) { + if (other instanceof PartitionedGlobalWindow) { + return value.equals(((PartitionedGlobalWindow) other).value); + } + return false; + } + + @Override + public int hashCode() { + return value.hashCode(); + } + } + + /** + * Coder for @link{PartitionedGlobalWindow}. + */ + private static class PartitionedGlobalWindowCoder extends AtomicCoder<PartitionedGlobalWindow> { + @Override + public void encode(PartitionedGlobalWindow window, OutputStream outStream, Context context) + throws IOException, CoderException { + StringUtf8Coder.of().encode(window.value, outStream, context); + } + + @Override + public PartitionedGlobalWindow decode(InputStream inStream, Context context) + throws IOException, CoderException { + return new PartitionedGlobalWindow(StringUtf8Coder.of().decode(inStream, context)); + } + } + + @Test + @Category(NeedsRunner.class) + public void testStreamingWriteWithWindowFn() throws Exception { + BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); + bqOptions.setProject("defaultProject"); + bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); + + FakeDatasetService datasetService = new FakeDatasetService(); + datasetService.createDataset("project-id", "dataset-id", "", ""); + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() + .withDatasetService(datasetService); + + List<TableRow> inserts = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + inserts.add(new TableRow().set("name", "number" + i).set("number", i)); + } + + // Create a windowing strategy that puts the input into five different windows depending on + // record value. + WindowFn<TableRow, PartitionedGlobalWindow> window = new PartitionedGlobalWindows( + new SerializableFunction<TableRow, String>() { + @Override + public String apply(TableRow value) { + try { + int intValue = (Integer) value.get("number") % 5; + return Integer.toString(intValue); + } catch (NumberFormatException e) { + fail(e.toString()); + } + return value.toString(); + } + } + ); + + SerializableFunction<BoundedWindow, String> tableFunction = + new SerializableFunction<BoundedWindow, String>() { + @Override + public String apply(BoundedWindow input) { + return "project-id:dataset-id.table-id-" + ((PartitionedGlobalWindow) input).value; + } + }; + + Pipeline p = TestPipeline.create(bqOptions); + p.apply(Create.of(inserts).withCoder(TableRowJsonCoder.of())) + .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED) + .apply(Window.<TableRow>into(window)) + .apply(BigQueryIO.Write + .to(tableFunction) + .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) + .withSchema(new TableSchema().setFields( + ImmutableList.of( + new TableFieldSchema().setName("name").setType("STRING"), + new TableFieldSchema().setName("number").setType("INTEGER")))) + .withTestServices(fakeBqServices) + .withoutValidation()); + p.run(); + + + assertThat(datasetService.getAllRows("project-id", "dataset-id", "table-id-0"), + containsInAnyOrder( + new TableRow().set("name", "number0").set("number", 0), + new TableRow().set("name", "number5").set("number", 5))); + assertThat(datasetService.getAllRows("project-id", "dataset-id", "table-id-1"), + containsInAnyOrder( + new TableRow().set("name", "number1").set("number", 1), + new TableRow().set("name", "number6").set("number", 6))); + assertThat(datasetService.getAllRows("project-id", "dataset-id", "table-id-2"), + containsInAnyOrder( + new TableRow().set("name", "number2").set("number", 2), + new TableRow().set("name", "number7").set("number", 7))); + assertThat(datasetService.getAllRows("project-id", "dataset-id", "table-id-3"), + containsInAnyOrder( + new TableRow().set("name", "number3").set("number", 3), + new TableRow().set("name", "number8").set("number", 8))); + assertThat(datasetService.getAllRows("project-id", "dataset-id", "table-id-4"), + containsInAnyOrder( + new TableRow().set("name", "number4").set("number", 4), + new TableRow().set("name", "number9").set("number", 9))); + } + + @Test + @Category(NeedsRunner.class) public void testWriteUnknown() throws Exception { BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); bqOptions.setProject("defaultProject"); @@ -1031,7 +1292,8 @@ public class BigQueryIOTest implements Serializable { @Test public void testBuildWrite() { - BigQueryIO.Write.Bound bound = BigQueryIO.Write.to("foo.com:project:somedataset.sometable"); + BigQueryIO.Write.Bound bound = + BigQueryIO.Write.to("foo.com:project:somedataset.sometable"); checkWriteObject( bound, "foo.com:project", "somedataset", "sometable", null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY); @@ -1980,32 +2242,42 @@ public class BigQueryIOTest implements Serializable { } @Test - public void testRuntimeOptionsNotCalledInApplyInputTable() { + public void testRuntimeOptionsNotCalledInApplyInputTable() throws IOException { RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class); BigQueryOptions bqOptions = options.as(BigQueryOptions.class); - bqOptions.setTempLocation("gs://testbucket/testdir"); + bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() + .withJobService(new FakeJobService()); Pipeline pipeline = TestPipeline.create(options); pipeline - .apply(BigQueryIO.Read.from(options.getInputTable()).withoutValidation()) - .apply(BigQueryIO.Write + .apply(BigQueryIO.Read + .from(options.getInputTable()).withoutValidation() + .withTestServices(fakeBqServices)) + .apply(BigQueryIO.Write .to(options.getOutputTable()) .withSchema(NestedValueProvider.of( options.getOutputSchema(), new JsonSchemaToTableSchema())) + .withTestServices(fakeBqServices) .withoutValidation()); } @Test - public void testRuntimeOptionsNotCalledInApplyInputQuery() { + public void testRuntimeOptionsNotCalledInApplyInputQuery() throws IOException { RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class); BigQueryOptions bqOptions = options.as(BigQueryOptions.class); - bqOptions.setTempLocation("gs://testbucket/testdir"); + bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() + .withJobService(new FakeJobService()); Pipeline pipeline = TestPipeline.create(options); pipeline - .apply(BigQueryIO.Read.fromQuery(options.getInputQuery()).withoutValidation()) - .apply(BigQueryIO.Write + .apply(BigQueryIO.Read + .fromQuery(options.getInputQuery()).withoutValidation() + .withTestServices(fakeBqServices)) + .apply(BigQueryIO.Write .to(options.getOutputTable()) .withSchema(NestedValueProvider.of( options.getOutputSchema(), new JsonSchemaToTableSchema())) + .withTestServices(fakeBqServices) .withoutValidation()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51900830/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java index 0e76660..10ed8bd 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java @@ -17,9 +17,11 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; +import static com.google.common.base.Verify.verifyNotNull; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import static org.mockito.Mockito.times; @@ -47,9 +49,12 @@ import com.google.api.services.bigquery.model.JobStatus; import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableDataInsertAllResponse; import com.google.api.services.bigquery.model.TableDataInsertAllResponse.InsertErrors; +import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; import com.google.cloud.hadoop.util.ApiErrorExtractor; +import com.google.cloud.hadoop.util.RetryBoundedBackOff; import com.google.common.collect.ImmutableList; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -64,6 +69,7 @@ import org.apache.beam.sdk.testing.FastNanoClockAndSleeper; import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.RetryHttpRequestInitializer; import org.apache.beam.sdk.util.Transport; +import org.joda.time.Duration; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -325,7 +331,8 @@ public class BigQueryServicesImplTest { bigquery.tables().get("projectId", "datasetId", "tableId"), "Failed to get table.", Sleeper.DEFAULT, - BackOff.STOP_BACKOFF); + BackOff.STOP_BACKOFF, + BigQueryServicesImpl.ALWAYS_RETRY); assertEquals(testTable, table); verify(response, times(1)).getStatusCode(); @@ -358,6 +365,11 @@ public class BigQueryServicesImplTest { verify(response, times(2)).getContentType(); expectedLogs.verifyInfo("BigQuery insertAll exceeded rate limit, retrying"); } + // A BackOff that makes a total of 4 attempts + private static final FluentBackoff TEST_BACKOFF = FluentBackoff.DEFAULT + .withInitialBackoff(Duration.millis(1)) + .withExponent(1) + .withMaxRetries(3); /** * Tests that {@link DatasetServiceImpl#insertAll} retries selected rows on failure. @@ -371,7 +383,8 @@ public class BigQueryServicesImplTest { List<String> insertIds = ImmutableList.of("a", "b"); final TableDataInsertAllResponse bFailed = new TableDataInsertAllResponse() - .setInsertErrors(ImmutableList.of(new InsertErrors().setIndex(1L))); + .setInsertErrors(ImmutableList.of( + new InsertErrors().setIndex(1L).setErrors(ImmutableList.of(new ErrorProto())))); final TableDataInsertAllResponse allRowsSucceeded = new TableDataInsertAllResponse(); @@ -389,9 +402,6 @@ public class BigQueryServicesImplTest { expectedLogs.verifyInfo("Retrying 1 failed inserts to BigQuery"); } - // A BackOff that makes a total of 4 attempts - private static final FluentBackoff TEST_BACKOFF = FluentBackoff.DEFAULT.withMaxRetries(3); - /** * Tests that {@link DatasetServiceImpl#insertAll} fails gracefully when persistent issues. */ @@ -490,9 +500,128 @@ public class BigQueryServicesImplTest { GoogleJsonError error = new GoogleJsonError(); error.setErrors(ImmutableList.of(info)); error.setCode(status); + error.setMessage(reason); // The actual JSON response is an error container. GoogleJsonErrorContainer container = new GoogleJsonErrorContainer(); container.setError(error); return container; } + + @Test + public void testCreateTableSucceeds() throws IOException { + TableReference ref = + new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table"); + Table testTable = new Table().setTableReference(ref); + when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); + when(response.getStatusCode()).thenReturn(200); + when(response.getContent()).thenReturn(toStream(testTable)); + + BigQueryServicesImpl.DatasetServiceImpl services = + new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); + Table ret = + services.tryCreateTable( + testTable, + new RetryBoundedBackOff(0, BackOff.ZERO_BACKOFF), + Sleeper.DEFAULT); + assertEquals(testTable, ret); + verify(response, times(1)).getStatusCode(); + verify(response, times(1)).getContent(); + verify(response, times(1)).getContentType(); + } + + /** + * Tests that {@link BigQueryServicesImpl} does not retry non-rate-limited attempts. + */ + @Test + public void testCreateTableDoesNotRetry() throws IOException { + TableReference ref = + new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table"); + Table testTable = new Table().setTableReference(ref); + // First response is 403 not-rate-limited, second response has valid payload but should not + // be invoked. + when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); + when(response.getStatusCode()).thenReturn(403).thenReturn(200); + when(response.getContent()) + .thenReturn(toStream(errorWithReasonAndStatus("actually forbidden", 403))) + .thenReturn(toStream(testTable)); + + thrown.expect(GoogleJsonResponseException.class); + thrown.expectMessage("actually forbidden"); + + BigQueryServicesImpl.DatasetServiceImpl services = + new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); + try { + services.tryCreateTable( + testTable, + new RetryBoundedBackOff(3, BackOff.ZERO_BACKOFF), + Sleeper.DEFAULT); + fail(); + } catch (IOException e) { + verify(response, times(1)).getStatusCode(); + verify(response, times(1)).getContent(); + verify(response, times(1)).getContentType(); + throw e; + } + } + + /** + * Tests that table creation succeeds when the table already exists. + */ + @Test + public void testCreateTableSucceedsAlreadyExists() throws IOException { + TableReference ref = + new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table"); + TableSchema schema = new TableSchema().setFields(ImmutableList.of( + new TableFieldSchema().setName("column1").setType("String"), + new TableFieldSchema().setName("column2").setType("Integer"))); + Table testTable = new Table().setTableReference(ref).setSchema(schema); + + when(response.getStatusCode()).thenReturn(409); // 409 means already exists + + BigQueryServicesImpl.DatasetServiceImpl services = + new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); + Table ret = + services.tryCreateTable( + testTable, + new RetryBoundedBackOff(0, BackOff.ZERO_BACKOFF), + Sleeper.DEFAULT); + + assertNull(ret); + verify(response, times(1)).getStatusCode(); + verify(response, times(1)).getContent(); + verify(response, times(1)).getContentType(); + } + + /** + * Tests that {@link BigQueryServicesImpl} retries quota rate limited attempts. + */ + @Test + public void testCreateTableRetry() throws IOException { + TableReference ref = + new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table"); + Table testTable = new Table().setTableReference(ref); + + // First response is 403 rate limited, second response has valid payload. + when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); + when(response.getStatusCode()).thenReturn(403).thenReturn(200); + when(response.getContent()) + .thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403))) + .thenReturn(toStream(testTable)); + + BigQueryServicesImpl.DatasetServiceImpl services = + new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); + Table ret = + services.tryCreateTable( + testTable, + new RetryBoundedBackOff(3, BackOff.ZERO_BACKOFF), + Sleeper.DEFAULT); + assertEquals(testTable, ret); + verify(response, times(2)).getStatusCode(); + verify(response, times(2)).getContent(); + verify(response, times(2)).getContentType(); + verifyNotNull(ret.getTableReference()); + expectedLogs.verifyInfo( + "Quota limit reached when creating table project:dataset.table, " + + "retrying up to 5.0 minutes"); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51900830/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserterTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserterTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserterTest.java deleted file mode 100644 index fb79c74..0000000 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserterTest.java +++ /dev/null @@ -1,245 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.bigquery; - -import static com.google.common.base.Verify.verifyNotNull; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; - -import com.google.api.client.googleapis.json.GoogleJsonError; -import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo; -import com.google.api.client.googleapis.json.GoogleJsonErrorContainer; -import com.google.api.client.googleapis.json.GoogleJsonResponseException; -import com.google.api.client.http.LowLevelHttpResponse; -import com.google.api.client.json.GenericJson; -import com.google.api.client.json.Json; -import com.google.api.client.json.jackson2.JacksonFactory; -import com.google.api.client.testing.http.MockHttpTransport; -import com.google.api.client.testing.http.MockLowLevelHttpRequest; -import com.google.api.client.util.BackOff; -import com.google.api.client.util.Sleeper; -import com.google.api.services.bigquery.Bigquery; -import com.google.api.services.bigquery.model.Table; -import com.google.api.services.bigquery.model.TableReference; -import com.google.cloud.hadoop.util.RetryBoundedBackOff; -import com.google.common.collect.ImmutableList; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.ExpectedLogs; -import org.apache.beam.sdk.util.RetryHttpRequestInitializer; -import org.apache.beam.sdk.util.Transport; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -/** - * Tests of {@link BigQueryTableInserter}. - */ -@RunWith(JUnit4.class) -public class BigQueryTableInserterTest { - @Rule public ExpectedException thrown = ExpectedException.none(); - @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(BigQueryTableInserter.class); - @Mock private LowLevelHttpResponse response; - private Bigquery bigquery; - private PipelineOptions options; - - @Before - public void setUp() { - MockitoAnnotations.initMocks(this); - - // A mock transport that lets us mock the API responses. - MockHttpTransport transport = - new MockHttpTransport.Builder() - .setLowLevelHttpRequest( - new MockLowLevelHttpRequest() { - @Override - public LowLevelHttpResponse execute() throws IOException { - return response; - } - }) - .build(); - - // A sample BigQuery API client that uses default JsonFactory and RetryHttpInitializer. - bigquery = - new Bigquery.Builder( - transport, Transport.getJsonFactory(), new RetryHttpRequestInitializer()) - .build(); - - options = PipelineOptionsFactory.create(); - } - - @After - public void tearDown() throws IOException { - // These three interactions happen for every request in the normal response parsing. - verify(response, atLeastOnce()).getContentEncoding(); - verify(response, atLeastOnce()).getHeaderCount(); - verify(response, atLeastOnce()).getReasonPhrase(); - verifyNoMoreInteractions(response); - } - - /** A helper to wrap a {@link GenericJson} object in a content stream. */ - private static InputStream toStream(GenericJson content) throws IOException { - return new ByteArrayInputStream(JacksonFactory.getDefaultInstance().toByteArray(content)); - } - - /** A helper that generates the error JSON payload that Google APIs produce. */ - private static GoogleJsonErrorContainer errorWithReasonAndStatus(String reason, int status) { - ErrorInfo info = new ErrorInfo(); - info.setReason(reason); - info.setDomain("global"); - // GoogleJsonError contains one or more ErrorInfo objects; our utiities read the first one. - GoogleJsonError error = new GoogleJsonError(); - error.setErrors(ImmutableList.of(info)); - error.setCode(status); - // The actual JSON response is an error container. - GoogleJsonErrorContainer container = new GoogleJsonErrorContainer(); - container.setError(error); - return container; - } - - /** - * Tests that {@link BigQueryTableInserter} succeeds on the first try. - */ - @Test - public void testCreateTableSucceeds() throws IOException { - Table testTable = new Table().setDescription("a table"); - - when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); - when(response.getStatusCode()).thenReturn(200); - when(response.getContent()).thenReturn(toStream(testTable)); - - BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options); - Table ret = - inserter.tryCreateTable( - new Table(), - "project", - "dataset", - new RetryBoundedBackOff(0, BackOff.ZERO_BACKOFF), - Sleeper.DEFAULT); - assertEquals(testTable, ret); - verify(response, times(1)).getStatusCode(); - verify(response, times(1)).getContent(); - verify(response, times(1)).getContentType(); - } - - /** - * Tests that {@link BigQueryTableInserter} succeeds when the table already exists. - */ - @Test - public void testCreateTableSucceedsAlreadyExists() throws IOException { - when(response.getStatusCode()).thenReturn(409); // 409 means already exists - - BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options); - Table ret = - inserter.tryCreateTable( - new Table(), - "project", - "dataset", - new RetryBoundedBackOff(0, BackOff.ZERO_BACKOFF), - Sleeper.DEFAULT); - - assertNull(ret); - verify(response, times(1)).getStatusCode(); - verify(response, times(1)).getContent(); - verify(response, times(1)).getContentType(); - } - - /** - * Tests that {@link BigQueryTableInserter} retries quota rate limited attempts. - */ - @Test - public void testCreateTableRetry() throws IOException { - TableReference ref = - new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table"); - Table testTable = new Table().setTableReference(ref); - - // First response is 403 rate limited, second response has valid payload. - when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); - when(response.getStatusCode()).thenReturn(403).thenReturn(200); - when(response.getContent()) - .thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403))) - .thenReturn(toStream(testTable)); - - BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options); - Table ret = - inserter.tryCreateTable( - testTable, - "project", - "dataset", - new RetryBoundedBackOff(3, BackOff.ZERO_BACKOFF), - Sleeper.DEFAULT); - assertEquals(testTable, ret); - verify(response, times(2)).getStatusCode(); - verify(response, times(2)).getContent(); - verify(response, times(2)).getContentType(); - verifyNotNull(ret.getTableReference()); - expectedLogs.verifyInfo( - "Quota limit reached when creating table project:dataset.table, " - + "retrying up to 5.0 minutes"); - } - - /** - * Tests that {@link BigQueryTableInserter} does not retry non-rate-limited attempts. - */ - @Test - public void testCreateTableDoesNotRetry() throws IOException { - Table testTable = new Table().setDescription("a table"); - - // First response is 403 not-rate-limited, second response has valid payload but should not - // be invoked. - when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); - when(response.getStatusCode()).thenReturn(403).thenReturn(200); - when(response.getContent()) - .thenReturn(toStream(errorWithReasonAndStatus("actually forbidden", 403))) - .thenReturn(toStream(testTable)); - - thrown.expect(GoogleJsonResponseException.class); - thrown.expectMessage("actually forbidden"); - - BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options); - try { - inserter.tryCreateTable( - new Table(), - "project", - "dataset", - new RetryBoundedBackOff(3, BackOff.ZERO_BACKOFF), - Sleeper.DEFAULT); - fail(); - } catch (IOException e) { - verify(response, times(1)).getStatusCode(); - verify(response, times(1)).getContent(); - verify(response, times(1)).getContentType(); - throw e; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51900830/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java index e539b33..8130238 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java @@ -361,60 +361,18 @@ public class BigQueryUtilTest { } @Test - public void testWriteAppend() throws IOException { - onTableGet(basicTableSchema()); - - TableReference ref = BigQueryIO - .parseTableSpec("project:dataset.table"); - - BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, options); - - inserter.getOrCreateTable(ref, BigQueryIO.Write.WriteDisposition.WRITE_APPEND, - BigQueryIO.Write.CreateDisposition.CREATE_NEVER, null); - - verifyTableGet(); - } - - @Test - public void testWriteEmpty() throws IOException { + public void testTableGet() throws InterruptedException, IOException { onTableGet(basicTableSchema()); TableDataList dataList = new TableDataList().setTotalRows(0L); onTableList(dataList); - TableReference ref = BigQueryIO - .parseTableSpec("project:dataset.table"); - - BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, options); + BigQueryServicesImpl.DatasetServiceImpl services = + new BigQueryServicesImpl.DatasetServiceImpl(mockClient, options); - inserter.getOrCreateTable(ref, BigQueryIO.Write.WriteDisposition.WRITE_EMPTY, - BigQueryIO.Write.CreateDisposition.CREATE_NEVER, null); + services.getTable("project", "dataset", "table"); verifyTableGet(); - verifyTabledataList(); - } - - @Test - public void testWriteEmptyFail() throws IOException { - thrown.expect(IOException.class); - - onTableGet(basicTableSchema()); - - TableDataList dataList = rawDataList(rawRow("Arthur", 42)); - onTableList(dataList); - - TableReference ref = BigQueryIO - .parseTableSpec("project:dataset.table"); - - BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, options); - - try { - inserter.getOrCreateTable(ref, BigQueryIO.Write.WriteDisposition.WRITE_EMPTY, - BigQueryIO.Write.CreateDisposition.CREATE_NEVER, null); - } finally { - verifyTableGet(); - verifyTabledataList(); - } } @Test