Comply with byte limit for Datastore Commit.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c4d14f8b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c4d14f8b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c4d14f8b Branch: refs/heads/master Commit: c4d14f8be7bb72fd653f1ab8e8080fc2b65f6672 Parents: ce00d24 Author: Colin Phipps <c...@moria.org.uk> Authored: Tue Apr 25 15:28:28 2017 +0000 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Fri May 19 13:11:17 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/datastore/DatastoreV1.java | 16 +++++++++ .../sdk/io/gcp/datastore/DatastoreV1Test.java | 34 ++++++++++++++++++++ 2 files changed, 50 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c4d14f8b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java index 16bb1b4..4cfb801 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java @@ -208,6 +208,13 @@ public class DatastoreV1 { static final int DATASTORE_BATCH_UPDATE_LIMIT = 500; /** + * Cloud Datastore has a limit of 10MB per RPC, so we also flush if the total size of mutations + * exceeds this limit. This is set lower than the 10MB limit on the RPC, as this only accounts for + * the mutations themselves and not the CommitRequest wrapper around them. + */ + static final int DATASTORE_BATCH_UPDATE_BYTES_LIMIT = 5_000_000; + + /** * Returns an empty {@link DatastoreV1.Read} builder. Configure the source {@code projectId}, * {@code query}, and optionally {@code namespace} and {@code numQuerySplits} using * {@link DatastoreV1.Read#withProjectId}, {@link DatastoreV1.Read#withQuery}, @@ -1123,6 +1130,7 @@ public class DatastoreV1 { private final V1DatastoreFactory datastoreFactory; // Current batch of mutations to be written. private final List<Mutation> mutations = new ArrayList<>(); + private int mutationsSize = 0; // Accumulated size of protos in mutations. private static final int MAX_RETRIES = 5; private static final FluentBackoff BUNDLE_WRITE_BACKOFF = @@ -1152,7 +1160,14 @@ public class DatastoreV1 { @ProcessElement public void processElement(ProcessContext c) throws Exception { + Mutation write = c.element(); + int size = write.getSerializedSize(); + if (mutations.size() > 0 + && mutationsSize + size >= DatastoreV1.DATASTORE_BATCH_UPDATE_BYTES_LIMIT) { + flushBatch(); + } mutations.add(c.element()); + mutationsSize += size; if (mutations.size() >= DatastoreV1.DATASTORE_BATCH_UPDATE_LIMIT) { flushBatch(); } @@ -1203,6 +1218,7 @@ public class DatastoreV1 { } LOG.debug("Successfully wrote {} mutations", mutations.size()); mutations.clear(); + mutationsSize = 0; } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/c4d14f8b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java index ba8ac84..3597b54 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java @@ -60,6 +60,7 @@ import com.google.datastore.v1.Query; import com.google.datastore.v1.QueryResultBatch; import com.google.datastore.v1.RunQueryRequest; import com.google.datastore.v1.RunQueryResponse; +import com.google.datastore.v1.Value; import com.google.datastore.v1.client.Datastore; import com.google.datastore.v1.client.DatastoreException; import com.google.datastore.v1.client.QuerySplitter; @@ -645,6 +646,39 @@ public class DatastoreV1Test { } /** + * Tests {@link DatastoreWriterFn} with large entities that need to be split into more batches. + */ + @Test + public void testDatatoreWriterFnWithLargeEntities() throws Exception { + List<Mutation> mutations = new ArrayList<>(); + for (int i = 0; i < 12; ++i) { + Entity.Builder entity = Entity.newBuilder().setKey(makeKey("key" + i, i + 1)); + entity.putProperties("long", Value.newBuilder().setStringValue(new String(new char[1_000_000]) + ).setExcludeFromIndexes(true).build()); + mutations.add(makeUpsert(entity.build()).build()); + } + + DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(StaticValueProvider.of(PROJECT_ID), + null, mockDatastoreFactory); + DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter); + doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); + doFnTester.processBundle(mutations); + + // This test is over-specific currently; it requires that we split the 12 entity writes into 3 + // requests, but we only need each CommitRequest to be less than 10MB in size. + int start = 0; + while (start < mutations.size()) { + int end = Math.min(mutations.size(), start + 4); + CommitRequest.Builder commitRequest = CommitRequest.newBuilder(); + commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL); + commitRequest.addAllMutations(mutations.subList(start, end)); + // Verify all the batch requests were made with the expected mutations. + verify(mockDatastore).commit(commitRequest.build()); + start = end; + } + } + + /** * Tests {@link DatastoreV1.Read#getEstimatedSizeBytes} to fetch and return estimated size for a * query. */