Repository: beam Updated Branches: refs/heads/master 66b4a1be0 -> 889776fca
Implement retries in the read connector. Respect non-retryable error codes from Datastore. Add unit tests to check that retryable errors are retried. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/016baf34 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/016baf34 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/016baf34 Branch: refs/heads/master Commit: 016baf3465bbccbc9d3df6999b38b1b2533aee8c Parents: 66b4a1b Author: Colin Phipps <fi...@google.com> Authored: Mon Jul 10 16:09:23 2017 +0000 Committer: Colin Phipps <fi...@google.com> Committed: Thu Jul 13 11:11:21 2017 +0000 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/datastore/DatastoreV1.java | 45 ++++++++++++++++- .../sdk/io/gcp/datastore/DatastoreV1Test.java | 51 +++++++++++++++++++- 2 files changed, 94 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/016baf34/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java index 5f65428..1ed6430 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java @@ -40,6 +40,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.datastore.v1.CommitRequest; import com.google.datastore.v1.Entity; import com.google.datastore.v1.EntityResult; @@ -65,6 +66,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.NoSuchElementException; +import java.util.Set; import javax.annotation.Nullable; import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.annotations.Experimental; @@ -238,6 +240,14 @@ public class DatastoreV1 { static final int DATASTORE_BATCH_UPDATE_BYTES_LIMIT = 9_000_000; /** + * Non-retryable errors. + * See https://cloud.google.com/datastore/docs/concepts/errors#Error_Codes . + */ + private static final Set<Code> NON_RETRYABLE_ERRORS = + ImmutableSet.of(Code.FAILED_PRECONDITION, Code.INVALID_ARGUMENT, Code.PERMISSION_DENIED, + Code.UNAUTHENTICATED); + + /** * Returns an empty {@link DatastoreV1.Read} builder. Configure the source {@code projectId}, * {@code query}, and optionally {@code namespace} and {@code numQuerySplits} using * {@link DatastoreV1.Read#withProjectId}, {@link DatastoreV1.Read#withQuery}, @@ -840,6 +850,14 @@ public class DatastoreV1 { private final V1DatastoreFactory datastoreFactory; // Datastore client private transient Datastore datastore; + private final Counter rpcErrors = + Metrics.counter(DatastoreWriterFn.class, "datastoreRpcErrors"); + private final Counter rpcSuccesses = + Metrics.counter(DatastoreWriterFn.class, "datastoreRpcSuccesses"); + private static final int MAX_RETRIES = 5; + private static final FluentBackoff RUNQUERY_BACKOFF = + FluentBackoff.DEFAULT + .withMaxRetries(MAX_RETRIES).withInitialBackoff(Duration.standardSeconds(5)); public ReadFn(V1Options options) { this(options, new V1DatastoreFactory()); @@ -857,6 +875,28 @@ public class DatastoreV1 { options.getLocalhost()); } + private RunQueryResponse runQueryWithRetries(RunQueryRequest request) throws Exception { + Sleeper sleeper = Sleeper.DEFAULT; + BackOff backoff = RUNQUERY_BACKOFF.backoff(); + while (true) { + try { + RunQueryResponse response = datastore.runQuery(request); + rpcSuccesses.inc(); + return response; + } catch (DatastoreException exception) { + rpcErrors.inc(); + + if (NON_RETRYABLE_ERRORS.contains(exception.getCode())) { + throw exception; + } + if (!BackOffUtils.next(sleeper, backoff)) { + LOG.error("Aborting after {} retries.", MAX_RETRIES); + throw exception; + } + } + } + } + /** Read and output entities for the given query. */ @ProcessElement public void processElement(ProcessContext context) throws Exception { @@ -878,7 +918,7 @@ public class DatastoreV1 { } RunQueryRequest request = makeRequest(queryBuilder.build(), namespace); - RunQueryResponse response = datastore.runQuery(request); + RunQueryResponse response = runQueryWithRetries(request); currentBatch = response.getBatch(); @@ -1328,6 +1368,9 @@ public class DatastoreV1 { exception.getCode(), exception.getMessage()); rpcErrors.inc(); + if (NON_RETRYABLE_ERRORS.contains(exception.getCode())) { + throw exception; + } if (!BackOffUtils.next(sleeper, backoff)) { LOG.error("Aborting after {} retries.", MAX_RETRIES); throw exception; http://git-wip-us.apache.org/repos/asf/beam/blob/016baf34/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java index a3f5d38..cfba1d6 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java @@ -51,6 +51,7 @@ import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; import com.google.datastore.v1.CommitRequest; +import com.google.datastore.v1.CommitResponse; import com.google.datastore.v1.Entity; import com.google.datastore.v1.EntityResult; import com.google.datastore.v1.GqlQuery; @@ -682,6 +683,29 @@ public class DatastoreV1Test { } } + /** Tests {@link DatastoreWriterFn} with a failed request which is retried. */ + @Test + public void testDatatoreWriterFnRetriesErrors() throws Exception { + List<Mutation> mutations = new ArrayList<>(); + int numRpcs = 2; + for (int i = 0; i < DATASTORE_BATCH_UPDATE_ENTITIES_START * numRpcs; ++i) { + mutations.add( + makeUpsert(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build()).build()); + } + + CommitResponse successfulCommit = CommitResponse.getDefaultInstance(); + when(mockDatastore.commit(any(CommitRequest.class))).thenReturn(successfulCommit) + .thenThrow( + new DatastoreException("commit", Code.DEADLINE_EXCEEDED, "", null)) + .thenReturn(successfulCommit); + + DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(StaticValueProvider.of(PROJECT_ID), + null, mockDatastoreFactory, new FakeWriteBatcher()); + DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter); + doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); + doFnTester.processBundle(mutations); + } + /** * Tests {@link DatastoreV1.Read#getEstimatedSizeBytes} to fetch and return estimated size for a * query. @@ -816,6 +840,31 @@ public class DatastoreV1Test { readFnTest(5 * QUERY_BATCH_LIMIT); } + /** Tests that {@link ReadFn} retries after an error. */ + @Test + public void testReadFnRetriesErrors() throws Exception { + // An empty query to read entities. + Query query = Query.newBuilder().setLimit( + Int32Value.newBuilder().setValue(1)).build(); + + // Use mockResponseForQuery to generate results. + when(mockDatastore.runQuery(any(RunQueryRequest.class))) + .thenThrow( + new DatastoreException("RunQuery", Code.DEADLINE_EXCEEDED, "", null)) + .thenAnswer(new Answer<RunQueryResponse>() { + @Override + public RunQueryResponse answer(InvocationOnMock invocationOnMock) throws Throwable { + Query q = ((RunQueryRequest) invocationOnMock.getArguments()[0]).getQuery(); + return mockResponseForQuery(q); + } + }); + + ReadFn readFn = new ReadFn(V_1_OPTIONS, mockDatastoreFactory); + DoFnTester<Query, Entity> doFnTester = DoFnTester.of(readFn); + doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); + List<Entity> entities = doFnTester.processBundle(query); + } + @Test public void testTranslateGqlQueryWithLimit() throws Exception { String gql = "SELECT * from DummyKind LIMIT 10"; @@ -1096,7 +1145,7 @@ public class DatastoreV1Test { } @Override public int nextBatchSize(long timeSinceEpochMillis) { - return DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START; + return DATASTORE_BATCH_UPDATE_ENTITIES_START; } } }