Repository: beam Updated Branches: refs/heads/master 1cd87e325 -> 097aec7a3
[BEAM-2753] Fixes translation of WriteFiles side inputs Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/783f26f3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/783f26f3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/783f26f3 Branch: refs/heads/master Commit: 783f26f3a80a3f2a9d5a0fafc33778e046fe6b36 Parents: 1cd87e3 Author: Eugene Kirpichov <kirpic...@google.com> Authored: Fri Aug 25 14:49:07 2017 -0700 Committer: Eugene Kirpichov <ekirpic...@gmail.com> Committed: Wed Aug 30 16:29:05 2017 -0700 ---------------------------------------------------------------------- .../core/construction/PipelineTranslation.java | 55 ++++++---- .../direct/WriteWithShardingFactory.java | 13 +-- .../java/org/apache/beam/sdk/io/AvroIOTest.java | 106 +++++++++++++------ 3 files changed, 112 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/783f26f3/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java index d928338..8a2faf3 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java @@ -152,30 +152,24 @@ public class PipelineTranslation { RunnerApi.FunctionSpec transformSpec = transformProto.getSpec(); // By default, no "additional" inputs, since that is an SDK-specific thing. - // Only ParDo really separates main from side inputs + // Only ParDo and WriteFiles really separate main from side inputs Map<TupleTag<?>, PValue> additionalInputs = Collections.emptyMap(); - // TODO: ParDoTranslator should own it - https://issues.apache.org/jira/browse/BEAM-2674 + // TODO: ParDoTranslation should own it - https://issues.apache.org/jira/browse/BEAM-2674 if (transformSpec.getUrn().equals(PTransformTranslation.PAR_DO_TRANSFORM_URN)) { - RunnerApi.ParDoPayload payload = - RunnerApi.ParDoPayload.parseFrom(transformSpec.getPayload()); - - List<PCollectionView<?>> views = new ArrayList<>(); - for (Map.Entry<String, RunnerApi.SideInput> sideInputEntry : - payload.getSideInputsMap().entrySet()) { - String localName = sideInputEntry.getKey(); - RunnerApi.SideInput sideInput = sideInputEntry.getValue(); - PCollection<?> pCollection = - (PCollection<?>) checkNotNull(rehydratedInputs.get(new TupleTag<>(localName))); - views.add( - ParDoTranslation.viewFromProto( - sideInputEntry.getValue(), - sideInputEntry.getKey(), - pCollection, - transformProto, - rehydratedComponents)); - } - additionalInputs = PCollectionViews.toAdditionalInputs(views); + RunnerApi.ParDoPayload payload = RunnerApi.ParDoPayload.parseFrom(transformSpec.getPayload()); + additionalInputs = + sideInputMapToAdditionalInputs( + transformProto, rehydratedComponents, rehydratedInputs, payload.getSideInputsMap()); + } + + // TODO: WriteFilesTranslation should own it - https://issues.apache.org/jira/browse/BEAM-2674 + if (transformSpec.getUrn().equals(PTransformTranslation.WRITE_FILES_TRANSFORM_URN)) { + RunnerApi.WriteFilesPayload payload = + RunnerApi.WriteFilesPayload.parseFrom(transformSpec.getPayload()); + additionalInputs = + sideInputMapToAdditionalInputs( + transformProto, rehydratedComponents, rehydratedInputs, payload.getSideInputsMap()); } // TODO: CombineTranslator should own it - https://issues.apache.org/jira/browse/BEAM-2674 @@ -216,6 +210,25 @@ public class PipelineTranslation { } } + private static Map<TupleTag<?>, PValue> sideInputMapToAdditionalInputs( + RunnerApi.PTransform transformProto, + RehydratedComponents rehydratedComponents, + Map<TupleTag<?>, PValue> rehydratedInputs, + Map<String, RunnerApi.SideInput> sideInputsMap) + throws IOException { + List<PCollectionView<?>> views = new ArrayList<>(); + for (Map.Entry<String, RunnerApi.SideInput> sideInputEntry : sideInputsMap.entrySet()) { + String localName = sideInputEntry.getKey(); + RunnerApi.SideInput sideInput = sideInputEntry.getValue(); + PCollection<?> pCollection = + (PCollection<?>) checkNotNull(rehydratedInputs.get(new TupleTag<>(localName))); + views.add( + ParDoTranslation.viewFromProto( + sideInput, localName, pCollection, transformProto, rehydratedComponents)); + } + return PCollectionViews.toAdditionalInputs(views); + } + // A primitive transform is one with outputs that are not in its input and also // not produced by a subtransform. private static boolean isPrimitive(RunnerApi.PTransform transformProto) { http://git-wip-us.apache.org/repos/asf/beam/blob/783f26f3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java index 3557c5d..605ef64 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java @@ -24,12 +24,10 @@ import com.google.common.base.Suppliers; import java.io.IOException; import java.io.Serializable; import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.runners.core.construction.PTransformReplacements; import org.apache.beam.runners.core.construction.WriteFilesTranslation; -import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.WriteFiles; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.PTransformOverrideFactory; @@ -63,16 +61,15 @@ class WriteWithShardingFactory<InputT> AppliedPTransform<PCollection<InputT>, PDone, PTransform<PCollection<InputT>, PDone>> transform) { try { - List<PCollectionView<?>> sideInputs = - WriteFilesTranslation.getDynamicDestinationSideInputs(transform); - FileBasedSink sink = WriteFilesTranslation.getSink(transform); - WriteFiles<InputT, ?, ?> replacement = WriteFiles.to(sink).withSideInputs(sideInputs); + WriteFiles<InputT, ?, ?> replacement = + WriteFiles.to(WriteFilesTranslation.getSink(transform)) + .withSideInputs(WriteFilesTranslation.getDynamicDestinationSideInputs(transform)) + .withSharding(new LogElementShardsWithDrift<InputT>()); if (WriteFilesTranslation.isWindowedWrites(transform)) { replacement = replacement.withWindowedWrites(); } return PTransformReplacement.of( - PTransformReplacements.getSingletonMainInput(transform), - replacement.withSharding(new LogElementShardsWithDrift<InputT>())); + PTransformReplacements.getSingletonMainInput(transform), replacement); } catch (IOException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/beam/blob/783f26f3/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 8870dd8..58af1d1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.io; import static com.google.common.base.MoreObjects.firstNonNull; import static org.apache.avro.file.DataFileConstants.SNAPPY_CODEC; +import static org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_FILE; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.hasItem; @@ -28,13 +29,14 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import com.google.common.base.MoreObjects; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; import java.io.File; import java.io.FileInputStream; import java.io.IOException; @@ -63,7 +65,6 @@ import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints; -import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.testing.NeedsRunner; @@ -525,7 +526,7 @@ public class AvroIOTest { outputFileHints.getSuggestedFilenameSuffix()); return outputFilePrefix .getCurrentDirectory() - .resolve(filename, StandardResolveOptions.RESOLVE_FILE); + .resolve(filename, RESOLVE_FILE); } @Override @@ -709,16 +710,20 @@ public class AvroIOTest { public FilenamePolicy getFilenamePolicy(String destination) { return DefaultFilenamePolicy.fromStandardParameters( StaticValueProvider.of( - baseDir.resolve("file_" + destination + ".txt", StandardResolveOptions.RESOLVE_FILE)), + baseDir.resolve("file_" + destination + ".txt", RESOLVE_FILE)), null, null, false); } } - @Test - @Category(NeedsRunner.class) - public void testDynamicDestinations() throws Exception { + private enum Sharding { + RUNNER_DETERMINED, + WITHOUT_SHARDING, + FIXED_3_SHARDS + } + + private void testDynamicDestinationsWithSharding(Sharding sharding) throws Exception { ResourceId baseDir = FileSystems.matchNewResource( Files.createTempDirectory(tmpFolder.getRoot().toPath(), "testDynamicDestinations") @@ -726,13 +731,14 @@ public class AvroIOTest { true); List<String> elements = Lists.newArrayList("aaaa", "aaab", "baaa", "baab", "caaa", "caab"); - List<GenericRecord> expectedElements = Lists.newArrayListWithExpectedSize(elements.size()); + Multimap<String, GenericRecord> expectedElements = ArrayListMultimap.create(); Map<String, String> schemaMap = Maps.newHashMap(); for (String element : elements) { String prefix = element.substring(0, 1); String jsonSchema = schemaFromPrefix(prefix); schemaMap.put(prefix, jsonSchema); - expectedElements.add(createRecord(element, prefix, new Schema.Parser().parse(jsonSchema))); + expectedElements.put( + prefix, createRecord(element, prefix, new Schema.Parser().parse(jsonSchema))); } PCollectionView<Map<String, String>> schemaView = writePipeline @@ -741,38 +747,72 @@ public class AvroIOTest { PCollection<String> input = writePipeline.apply("createInput", Create.of(elements).withCoder(StringUtf8Coder.of())); - input.apply( + AvroIO.TypedWrite<String, GenericRecord> write = AvroIO.<String>writeCustomTypeToGenericRecords() .to(new TestDynamicDestinations(baseDir, schemaView)) - .withoutSharding() - .withTempDirectory(baseDir)); + .withTempDirectory(baseDir); + + switch (sharding) { + case RUNNER_DETERMINED: + break; + case WITHOUT_SHARDING: + write = write.withoutSharding(); + break; + case FIXED_3_SHARDS: + write = write.withNumShards(3); + break; + default: + throw new IllegalArgumentException("Unknown sharding " + sharding); + } + + input.apply(write); writePipeline.run(); // Validate that the data written matches the expected elements in the expected order. - List<String> prefixes = Lists.newArrayList(); - for (String element : elements) { - prefixes.add(element.substring(0, 1)); - } - prefixes = ImmutableSet.copyOf(prefixes).asList(); - - List<GenericRecord> actualElements = new ArrayList<>(); - for (String prefix : prefixes) { - File expectedFile = - new File( - baseDir - .resolve( - "file_" + prefix + ".txt-00000-of-00001", StandardResolveOptions.RESOLVE_FILE) - .toString()); - assertTrue("Expected output file " + expectedFile.getAbsolutePath(), expectedFile.exists()); - Schema schema = new Schema.Parser().parse(schemaFromPrefix(prefix)); - try (DataFileReader<GenericRecord> reader = - new DataFileReader<>(expectedFile, new GenericDatumReader<GenericRecord>(schema))) { - Iterators.addAll(actualElements, reader); + for (String prefix : expectedElements.keySet()) { + String shardPattern; + switch (sharding) { + case RUNNER_DETERMINED: + shardPattern = "*"; + break; + case WITHOUT_SHARDING: + shardPattern = "00000-of-00001"; + break; + case FIXED_3_SHARDS: + shardPattern = "*-of-00003"; + break; + default: + throw new IllegalArgumentException("Unknown sharding " + sharding); } - expectedFile.delete(); + String expectedFilepattern = + baseDir.resolve("file_" + prefix + ".txt-" + shardPattern, RESOLVE_FILE).toString(); + + PCollection<GenericRecord> records = + readPipeline.apply( + "read_" + prefix, + AvroIO.readGenericRecords(schemaFromPrefix(prefix)).from(expectedFilepattern)); + PAssert.that(records).containsInAnyOrder(expectedElements.get(prefix)); } - assertThat(actualElements, containsInAnyOrder(expectedElements.toArray())); + readPipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testDynamicDestinationsRunnerDeterminedSharding() throws Exception { + testDynamicDestinationsWithSharding(Sharding.RUNNER_DETERMINED); + } + + @Test + @Category(NeedsRunner.class) + public void testDynamicDestinationsWithoutSharding() throws Exception { + testDynamicDestinationsWithSharding(Sharding.WITHOUT_SHARDING); + } + + @Test + @Category(NeedsRunner.class) + public void testDynamicDestinationsWithNumShards() throws Exception { + testDynamicDestinationsWithSharding(Sharding.FIXED_3_SHARDS); } @Test