Repository: beam Updated Branches: refs/heads/master 9d48bd5e8 -> c14a3184e
http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java index 9468893..8797ff7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java @@ -42,7 +42,9 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import com.google.common.base.Function; +import com.google.common.base.Functions; import com.google.common.base.Predicate; +import com.google.common.base.Predicates; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; @@ -69,22 +71,31 @@ import java.util.zip.GZIPOutputStream; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.BoundedSource.BoundedReader; +import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params; +import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations; +import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory; import org.apache.beam.sdk.io.TextIO.CompressionType; import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; +import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.SourceTestUtils; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; import org.apache.beam.sdk.util.CoderUtils; @@ -205,7 +216,7 @@ public class TextIOTest { }); } - private <T> void runTestRead(String[] expected) throws Exception { + private void runTestRead(String[] expected) throws Exception { File tmpFile = Files.createTempFile(tempFolder, "file", "txt").toFile(); String filename = tmpFile.getPath(); @@ -274,6 +285,213 @@ public class TextIOTest { displayData, hasItem(hasDisplayItem(hasValue(startsWith("foobar"))))); } + static class TestDynamicDestinations extends DynamicDestinations<String, String> { + ResourceId baseDir; + + TestDynamicDestinations(ResourceId baseDir) { + this.baseDir = baseDir; + } + + @Override + public String getDestination(String element) { + // Destination is based on first character of string. + return element.substring(0, 1); + } + + @Override + public String getDefaultDestination() { + return ""; + } + + @Nullable + @Override + public Coder<String> getDestinationCoder() { + return StringUtf8Coder.of(); + } + + @Override + public FilenamePolicy getFilenamePolicy(String destination) { + return DefaultFilenamePolicy.fromStandardParameters( + StaticValueProvider.of( + baseDir.resolve("file_" + destination + ".txt", StandardResolveOptions.RESOLVE_FILE)), + null, + null, + false); + } + } + + class StartsWith implements Predicate<String> { + String prefix; + + StartsWith(String prefix) { + this.prefix = prefix; + } + + @Override + public boolean apply(@Nullable String input) { + return input.startsWith(prefix); + } + } + + @Test + @Category(NeedsRunner.class) + public void testDynamicDestinations() throws Exception { + ResourceId baseDir = + FileSystems.matchNewResource( + Files.createTempDirectory(tempFolder, "testDynamicDestinations").toString(), true); + + List<String> elements = Lists.newArrayList("aaaa", "aaab", "baaa", "baab", "caaa", "caab"); + PCollection<String> input = p.apply(Create.of(elements).withCoder(StringUtf8Coder.of())); + input.apply( + TextIO.write() + .to(new TestDynamicDestinations(baseDir)) + .withTempDirectory(FileSystems.matchNewResource(baseDir.toString(), true))); + p.run(); + + assertOutputFiles( + Iterables.toArray(Iterables.filter(elements, new StartsWith("a")), String.class), + null, + null, + 0, + baseDir.resolve("file_a.txt", StandardResolveOptions.RESOLVE_FILE), + DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE); + assertOutputFiles( + Iterables.toArray(Iterables.filter(elements, new StartsWith("b")), String.class), + null, + null, + 0, + baseDir.resolve("file_b.txt", StandardResolveOptions.RESOLVE_FILE), + DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE); + assertOutputFiles( + Iterables.toArray(Iterables.filter(elements, new StartsWith("c")), String.class), + null, + null, + 0, + baseDir.resolve("file_c.txt", StandardResolveOptions.RESOLVE_FILE), + DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE); + } + + @DefaultCoder(AvroCoder.class) + private static class UserWriteType { + String destination; + String metadata; + + UserWriteType() { + this.destination = ""; + this.metadata = ""; + } + + UserWriteType(String destination, String metadata) { + this.destination = destination; + this.metadata = metadata; + } + + @Override + public String toString() { + return String.format("destination: %s metadata : %s", destination, metadata); + } + } + + private static class SerializeUserWrite implements SerializableFunction<UserWriteType, String> { + @Override + public String apply(UserWriteType input) { + return input.toString(); + } + } + + private static class UserWriteDestination implements SerializableFunction<UserWriteType, Params> { + private ResourceId baseDir; + + UserWriteDestination(ResourceId baseDir) { + this.baseDir = baseDir; + } + + @Override + public Params apply(UserWriteType input) { + return new Params() + .withBaseFilename( + baseDir.resolve( + "file_" + input.destination.substring(0, 1) + ".txt", + StandardResolveOptions.RESOLVE_FILE)); + } + } + + private static class ExtractWriteDestination implements Function<UserWriteType, String> { + @Override + public String apply(@Nullable UserWriteType input) { + return input.destination; + } + } + + @Test + @Category(NeedsRunner.class) + public void testDynamicDefaultFilenamePolicy() throws Exception { + ResourceId baseDir = + FileSystems.matchNewResource( + Files.createTempDirectory(tempFolder, "testDynamicDestinations").toString(), true); + + List<UserWriteType> elements = + Lists.newArrayList( + new UserWriteType("aaaa", "first"), + new UserWriteType("aaab", "second"), + new UserWriteType("baaa", "third"), + new UserWriteType("baab", "fourth"), + new UserWriteType("caaa", "fifth"), + new UserWriteType("caab", "sixth")); + PCollection<UserWriteType> input = p.apply(Create.of(elements)); + input.apply( + TextIO.writeCustomType(new SerializeUserWrite()) + .to(new UserWriteDestination(baseDir), new Params()) + .withTempDirectory(FileSystems.matchNewResource(baseDir.toString(), true))); + p.run(); + + String[] aElements = + Iterables.toArray( + Iterables.transform( + Iterables.filter( + elements, + Predicates.compose(new StartsWith("a"), new ExtractWriteDestination())), + Functions.toStringFunction()), + String.class); + String[] bElements = + Iterables.toArray( + Iterables.transform( + Iterables.filter( + elements, + Predicates.compose(new StartsWith("b"), new ExtractWriteDestination())), + Functions.toStringFunction()), + String.class); + String[] cElements = + Iterables.toArray( + Iterables.transform( + Iterables.filter( + elements, + Predicates.compose(new StartsWith("c"), new ExtractWriteDestination())), + Functions.toStringFunction()), + String.class); + assertOutputFiles( + aElements, + null, + null, + 0, + baseDir.resolve("file_a.txt", StandardResolveOptions.RESOLVE_FILE), + DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE); + assertOutputFiles( + bElements, + null, + null, + 0, + baseDir.resolve("file_b.txt", StandardResolveOptions.RESOLVE_FILE), + DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE); + assertOutputFiles( + cElements, + null, + null, + 0, + baseDir.resolve("file_c.txt", StandardResolveOptions.RESOLVE_FILE), + DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE); + } + private void runTestWrite(String[] elems) throws Exception { runTestWrite(elems, null, null, 1); } @@ -291,7 +509,8 @@ public class TextIOTest { String[] elems, String header, String footer, int numShards) throws Exception { String outputName = "file.txt"; Path baseDir = Files.createTempDirectory(tempFolder, "testwrite"); - String baseFilename = baseDir.resolve(outputName).toString(); + ResourceId baseFilename = + FileBasedSink.convertToFileResourceIfPossible(baseDir.resolve(outputName).toString()); PCollection<String> input = p.apply(Create.of(Arrays.asList(elems)).withCoder(StringUtf8Coder.of())); @@ -311,8 +530,14 @@ public class TextIOTest { p.run(); - assertOutputFiles(elems, header, footer, numShards, baseDir, outputName, - firstNonNull(write.getShardTemplate(), + assertOutputFiles( + elems, + header, + footer, + numShards, + baseFilename, + firstNonNull( + write.inner.getShardTemplate(), DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE)); } @@ -321,13 +546,12 @@ public class TextIOTest { final String header, final String footer, int numShards, - Path rootLocation, - String outputName, + ResourceId outputPrefix, String shardNameTemplate) throws Exception { List<File> expectedFiles = new ArrayList<>(); if (numShards == 0) { - String pattern = rootLocation.toAbsolutePath().resolve(outputName + "*").toString(); + String pattern = outputPrefix.toString() + "*"; List<MatchResult> matches = FileSystems.match(Collections.singletonList(pattern)); for (Metadata expectedFile : Iterables.getOnlyElement(matches).metadata()) { expectedFiles.add(new File(expectedFile.resourceId().toString())); @@ -336,9 +560,9 @@ public class TextIOTest { for (int i = 0; i < numShards; i++) { expectedFiles.add( new File( - rootLocation.toString(), DefaultFilenamePolicy.constructName( - outputName, shardNameTemplate, "", i, numShards, null, null))); + outputPrefix, shardNameTemplate, "", i, numShards, null, null) + .toString())); } } @@ -456,14 +680,19 @@ public class TextIOTest { public void testWriteWithWritableByteChannelFactory() throws Exception { Coder<String> coder = StringUtf8Coder.of(); String outputName = "file.txt"; - Path baseDir = Files.createTempDirectory(tempFolder, "testwrite"); + ResourceId baseDir = + FileSystems.matchNewResource( + Files.createTempDirectory(tempFolder, "testwrite").toString(), true); PCollection<String> input = p.apply(Create.of(Arrays.asList(LINES2_ARRAY)).withCoder(coder)); final WritableByteChannelFactory writableByteChannelFactory = new DrunkWritableByteChannelFactory(); - TextIO.Write write = TextIO.write().to(baseDir.resolve(outputName).toString()) - .withoutSharding().withWritableByteChannelFactory(writableByteChannelFactory); + TextIO.Write write = + TextIO.write() + .to(baseDir.resolve(outputName, StandardResolveOptions.RESOLVE_FILE).toString()) + .withoutSharding() + .withWritableByteChannelFactory(writableByteChannelFactory); DisplayData displayData = DisplayData.from(write); assertThat(displayData, hasDisplayItem("writableByteChannelFactory", "DRUNK")); @@ -476,8 +705,15 @@ public class TextIOTest { drunkElems.add(elem); drunkElems.add(elem); } - assertOutputFiles(drunkElems.toArray(new String[0]), null, null, 1, baseDir, - outputName + writableByteChannelFactory.getFilenameSuffix(), write.getShardTemplate()); + assertOutputFiles( + drunkElems.toArray(new String[0]), + null, + null, + 1, + baseDir.resolve( + outputName + writableByteChannelFactory.getSuggestedFilenameSuffix(), + StandardResolveOptions.RESOLVE_FILE), + write.inner.getShardTemplate()); } @Test http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java index e6a0dcf..55f2a87 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io; +import static com.google.common.base.MoreObjects.firstNonNull; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -41,7 +42,11 @@ import java.util.List; import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params; +import org.apache.beam.sdk.io.FileBasedSink.CompressionType; +import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; +import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints; import org.apache.beam.sdk.io.SimpleSink.SimpleWriter; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; @@ -58,16 +63,20 @@ import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.Top; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayData.Builder; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PCollectionView; import org.joda.time.Duration; import org.joda.time.format.DateTimeFormatter; @@ -164,7 +173,11 @@ public class WriteFilesTest { public void testWrite() throws IOException { List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle", "Intimidating pigeon", "Pedantic gull", "Frisky finch"); - runWrite(inputs, IDENTITY_MAP, getBaseOutputFilename(), WriteFiles.to(makeSimpleSink())); + runWrite( + inputs, + IDENTITY_MAP, + getBaseOutputFilename(), + WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity())); } /** @@ -173,8 +186,11 @@ public class WriteFilesTest { @Test @Category(NeedsRunner.class) public void testEmptyWrite() throws IOException { - runWrite(Collections.<String>emptyList(), IDENTITY_MAP, getBaseOutputFilename(), - WriteFiles.to(makeSimpleSink())); + runWrite( + Collections.<String>emptyList(), + IDENTITY_MAP, + getBaseOutputFilename(), + WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity())); checkFileContents(getBaseOutputFilename(), Collections.<String>emptyList(), Optional.of(1)); } @@ -190,7 +206,7 @@ public class WriteFilesTest { Arrays.asList("one", "two", "three", "four", "five", "six"), IDENTITY_MAP, getBaseOutputFilename(), - WriteFiles.to(makeSimpleSink()).withNumShards(1)); + WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity()).withNumShards(1)); } private ResourceId getBaseOutputDirectory() { @@ -198,9 +214,13 @@ public class WriteFilesTest { .resolve("output", StandardResolveOptions.RESOLVE_DIRECTORY); } - private SimpleSink makeSimpleSink() { - FilenamePolicy filenamePolicy = new PerWindowFiles("file", "simple"); - return new SimpleSink(getBaseOutputDirectory(), filenamePolicy); + + private SimpleSink<Void> makeSimpleSink() { + FilenamePolicy filenamePolicy = + new PerWindowFiles( + getBaseOutputDirectory().resolve("file", StandardResolveOptions.RESOLVE_FILE), + "simple"); + return SimpleSink.makeSimpleSink(getBaseOutputDirectory(), filenamePolicy); } @Test @@ -219,8 +239,10 @@ public class WriteFilesTest { timestamps.add(i + 1); } - SimpleSink sink = makeSimpleSink(); - WriteFiles<String> write = WriteFiles.to(sink).withSharding(new LargestInt()); + SimpleSink<Void> sink = makeSimpleSink(); + WriteFiles<String, ?, String> write = + WriteFiles.to(sink, SerializableFunctions.<String>identity()) + .withSharding(new LargestInt()); p.apply(Create.timestamped(inputs, timestamps).withCoder(StringUtf8Coder.of())) .apply(IDENTITY_MAP) .apply(write); @@ -241,7 +263,8 @@ public class WriteFilesTest { Arrays.asList("one", "two", "three", "four", "five", "six"), IDENTITY_MAP, getBaseOutputFilename(), - WriteFiles.to(makeSimpleSink()).withNumShards(20)); + WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity()) + .withNumShards(20)); } /** @@ -251,7 +274,11 @@ public class WriteFilesTest { @Category(NeedsRunner.class) public void testWriteWithEmptyPCollection() throws IOException { List<String> inputs = new ArrayList<>(); - runWrite(inputs, IDENTITY_MAP, getBaseOutputFilename(), WriteFiles.to(makeSimpleSink())); + runWrite( + inputs, + IDENTITY_MAP, + getBaseOutputFilename(), + WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity())); } /** @@ -263,8 +290,10 @@ public class WriteFilesTest { List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle", "Intimidating pigeon", "Pedantic gull", "Frisky finch"); runWrite( - inputs, new WindowAndReshuffle<>(Window.<String>into(FixedWindows.of(Duration.millis(2)))), - getBaseOutputFilename(), WriteFiles.to(makeSimpleSink())); + inputs, + new WindowAndReshuffle<>(Window.<String>into(FixedWindows.of(Duration.millis(2)))), + getBaseOutputFilename(), + WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity())); } /** @@ -278,10 +307,9 @@ public class WriteFilesTest { runWrite( inputs, - new WindowAndReshuffle<>( - Window.<String>into(Sessions.withGapDuration(Duration.millis(1)))), + new WindowAndReshuffle<>(Window.<String>into(Sessions.withGapDuration(Duration.millis(1)))), getBaseOutputFilename(), - WriteFiles.to(makeSimpleSink())); + WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity())); } @Test @@ -292,15 +320,19 @@ public class WriteFilesTest { inputs.add("mambo_number_" + i); } runWrite( - inputs, Window.<String>into(FixedWindows.of(Duration.millis(2))), + inputs, + Window.<String>into(FixedWindows.of(Duration.millis(2))), getBaseOutputFilename(), - WriteFiles.to(makeSimpleSink()).withMaxNumWritersPerBundle(2).withWindowedWrites()); + WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity()) + .withMaxNumWritersPerBundle(2) + .withWindowedWrites()); } public void testBuildWrite() { - SimpleSink sink = makeSimpleSink(); - WriteFiles<String> write = WriteFiles.to(sink).withNumShards(3); - assertThat((SimpleSink) write.getSink(), is(sink)); + SimpleSink<Void> sink = makeSimpleSink(); + WriteFiles<String, ?, String> write = + WriteFiles.to(sink, SerializableFunctions.<String>identity()).withNumShards(3); + assertThat((SimpleSink<Void>) write.getSink(), is(sink)); PTransform<PCollection<String>, PCollectionView<Integer>> originalSharding = write.getSharding(); @@ -309,25 +341,37 @@ public class WriteFilesTest { assertThat(write.getNumShards().get(), equalTo(3)); assertThat(write.getSharding(), equalTo(originalSharding)); - WriteFiles<String> write2 = write.withSharding(SHARDING_TRANSFORM); - assertThat((SimpleSink) write2.getSink(), is(sink)); + WriteFiles<String, ?, ?> write2 = write.withSharding(SHARDING_TRANSFORM); + assertThat((SimpleSink<Void>) write2.getSink(), is(sink)); assertThat(write2.getSharding(), equalTo(SHARDING_TRANSFORM)); // original unchanged - WriteFiles<String> writeUnsharded = write2.withRunnerDeterminedSharding(); + WriteFiles<String, ?, ?> writeUnsharded = write2.withRunnerDeterminedSharding(); assertThat(writeUnsharded.getSharding(), nullValue()); assertThat(write.getSharding(), equalTo(originalSharding)); } @Test public void testDisplayData() { - SimpleSink sink = new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", "") { - @Override - public void populateDisplayData(DisplayData.Builder builder) { - builder.add(DisplayData.item("foo", "bar")); - } - }; - WriteFiles<String> write = WriteFiles.to(sink); + DynamicDestinations<String, Void> dynamicDestinations = + DynamicFileDestinations.constant( + DefaultFilenamePolicy.fromParams( + new Params() + .withBaseFilename( + getBaseOutputDirectory() + .resolve("file", StandardResolveOptions.RESOLVE_FILE)) + .withShardTemplate("-SS-of-NN"))); + SimpleSink<Void> sink = + new SimpleSink<Void>( + getBaseOutputDirectory(), dynamicDestinations, CompressionType.UNCOMPRESSED) { + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add(DisplayData.item("foo", "bar")); + } + }; + WriteFiles<String, ?, String> write = + WriteFiles.to(sink, SerializableFunctions.<String>identity()); + DisplayData displayData = DisplayData.from(write); assertThat(displayData, hasDisplayItem("sink", sink.getClass())); @@ -335,14 +379,145 @@ public class WriteFilesTest { } @Test + @Category(NeedsRunner.class) + public void testUnboundedNeedsWindowed() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "Must use windowed writes when applying WriteFiles to an unbounded PCollection"); + + SimpleSink<Void> sink = makeSimpleSink(); + p.apply(Create.of("foo")) + .setIsBoundedInternal(IsBounded.UNBOUNDED) + .apply(WriteFiles.to(sink, SerializableFunctions.<String>identity())); + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testUnboundedNeedsSharding() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "When applying WriteFiles to an unbounded PCollection, " + + "must specify number of output shards explicitly"); + + SimpleSink<Void> sink = makeSimpleSink(); + p.apply(Create.of("foo")) + .setIsBoundedInternal(IsBounded.UNBOUNDED) + .apply(WriteFiles.to(sink, SerializableFunctions.<String>identity()).withWindowedWrites()); + p.run(); + } + + // Test DynamicDestinations class. Expects user values to be string-encoded integers. + // Stores the integer mod 5 as the destination, and uses that in the file prefix. + static class TestDestinations extends DynamicDestinations<String, Integer> { + private ResourceId baseOutputDirectory; + + TestDestinations(ResourceId baseOutputDirectory) { + this.baseOutputDirectory = baseOutputDirectory; + } + + @Override + public Integer getDestination(String element) { + return Integer.valueOf(element) % 5; + } + + @Override + public Integer getDefaultDestination() { + return 0; + } + + @Override + public FilenamePolicy getFilenamePolicy(Integer destination) { + return new PerWindowFiles( + baseOutputDirectory.resolve("file_" + destination, StandardResolveOptions.RESOLVE_FILE), + "simple"); + } + + @Override + public void populateDisplayData(Builder builder) { + super.populateDisplayData(builder); + } + } + + // Test format function. Prepend a string to each record before writing. + static class TestDynamicFormatFunction implements SerializableFunction<String, String> { + @Override + public String apply(String input) { + return "record_" + input; + } + } + + @Test + @Category(NeedsRunner.class) + public void testDynamicDestinationsBounded() throws Exception { + testDynamicDestinationsHelper(true); + } + + @Test + @Category(NeedsRunner.class) + public void testDynamicDestinationsUnbounded() throws Exception { + testDynamicDestinationsHelper(false); + } + + private void testDynamicDestinationsHelper(boolean bounded) throws IOException { + TestDestinations dynamicDestinations = new TestDestinations(getBaseOutputDirectory()); + SimpleSink<Integer> sink = + new SimpleSink<>( + getBaseOutputDirectory(), dynamicDestinations, CompressionType.UNCOMPRESSED); + + // Flag to validate that the pipeline options are passed to the Sink. + WriteOptions options = TestPipeline.testingPipelineOptions().as(WriteOptions.class); + options.setTestFlag("test_value"); + Pipeline p = TestPipeline.create(options); + + List<String> inputs = Lists.newArrayList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); + // Prepare timestamps for the elements. + List<Long> timestamps = new ArrayList<>(); + for (long i = 0; i < inputs.size(); i++) { + timestamps.add(i + 1); + } + + WriteFiles<String, Integer, String> writeFiles = + WriteFiles.to(sink, new TestDynamicFormatFunction()).withNumShards(1); + + PCollection<String> input = p.apply(Create.timestamped(inputs, timestamps)); + if (!bounded) { + input.setIsBoundedInternal(IsBounded.UNBOUNDED); + input = input.apply(Window.<String>into(FixedWindows.of(Duration.standardDays(1)))); + input.apply(writeFiles.withWindowedWrites()); + } else { + input.apply(writeFiles); + } + p.run(); + + for (int i = 0; i < 5; ++i) { + ResourceId base = + getBaseOutputDirectory().resolve("file_" + i, StandardResolveOptions.RESOLVE_FILE); + List<String> expected = Lists.newArrayList("record_" + i, "record_" + (i + 5)); + checkFileContents(base.toString(), expected, Optional.of(1)); + } + } + + @Test public void testShardedDisplayData() { - SimpleSink sink = new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", "") { - @Override - public void populateDisplayData(DisplayData.Builder builder) { - builder.add(DisplayData.item("foo", "bar")); - } - }; - WriteFiles<String> write = WriteFiles.to(sink).withNumShards(1); + DynamicDestinations<String, Void> dynamicDestinations = + DynamicFileDestinations.constant( + DefaultFilenamePolicy.fromParams( + new Params() + .withBaseFilename( + getBaseOutputDirectory() + .resolve("file", StandardResolveOptions.RESOLVE_FILE)) + .withShardTemplate("-SS-of-NN"))); + SimpleSink<Void> sink = + new SimpleSink<Void>( + getBaseOutputDirectory(), dynamicDestinations, CompressionType.UNCOMPRESSED) { + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add(DisplayData.item("foo", "bar")); + } + }; + WriteFiles<String, ?, String> write = + WriteFiles.to(sink, SerializableFunctions.<String>identity()).withNumShards(1); DisplayData displayData = DisplayData.from(write); assertThat(displayData, hasDisplayItem("sink", sink.getClass())); assertThat(displayData, includesDisplayDataFor("sink", sink)); @@ -351,14 +526,24 @@ public class WriteFilesTest { @Test public void testCustomShardStrategyDisplayData() { - SimpleSink sink = new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", "") { - @Override - public void populateDisplayData(DisplayData.Builder builder) { - builder.add(DisplayData.item("foo", "bar")); - } - }; - WriteFiles<String> write = - WriteFiles.to(sink) + DynamicDestinations<String, Void> dynamicDestinations = + DynamicFileDestinations.constant( + DefaultFilenamePolicy.fromParams( + new Params() + .withBaseFilename( + getBaseOutputDirectory() + .resolve("file", StandardResolveOptions.RESOLVE_FILE)) + .withShardTemplate("-SS-of-NN"))); + SimpleSink<Void> sink = + new SimpleSink<Void>( + getBaseOutputDirectory(), dynamicDestinations, CompressionType.UNCOMPRESSED) { + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add(DisplayData.item("foo", "bar")); + } + }; + WriteFiles<String, ?, String> write = + WriteFiles.to(sink, SerializableFunctions.<String>identity()) .withSharding( new PTransform<PCollection<String>, PCollectionView<Integer>>() { @Override @@ -383,59 +568,77 @@ public class WriteFilesTest { * PCollection are written to the sink. */ private void runWrite( - List<String> inputs, PTransform<PCollection<String>, PCollection<String>> transform, - String baseName, WriteFiles<String> write) throws IOException { + List<String> inputs, + PTransform<PCollection<String>, PCollection<String>> transform, + String baseName, + WriteFiles<String, ?, String> write) + throws IOException { runShardedWrite(inputs, transform, baseName, write); } private static class PerWindowFiles extends FilenamePolicy { private static final DateTimeFormatter FORMATTER = ISODateTimeFormat.hourMinuteSecondMillis(); - private final String prefix; + private final ResourceId baseFilename; private final String suffix; - public PerWindowFiles(String prefix, String suffix) { - this.prefix = prefix; + public PerWindowFiles(ResourceId baseFilename, String suffix) { + this.baseFilename = baseFilename; this.suffix = suffix; } public String filenamePrefixForWindow(IntervalWindow window) { + String prefix = + baseFilename.isDirectory() ? "" : firstNonNull(baseFilename.getFilename(), ""); return String.format("%s%s-%s", prefix, FORMATTER.print(window.start()), FORMATTER.print(window.end())); } @Override - public ResourceId windowedFilename( - ResourceId outputDirectory, WindowedContext context, String extension) { + public ResourceId windowedFilename(WindowedContext context, OutputFileHints outputFileHints) { IntervalWindow window = (IntervalWindow) context.getWindow(); - String filename = String.format( - "%s-%s-of-%s%s%s", - filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards(), - extension, suffix); - return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE); + String filename = + String.format( + "%s-%s-of-%s%s%s", + filenamePrefixForWindow(window), + context.getShardNumber(), + context.getNumShards(), + outputFileHints.getSuggestedFilenameSuffix(), + suffix); + return baseFilename + .getCurrentDirectory() + .resolve(filename, StandardResolveOptions.RESOLVE_FILE); } @Override - public ResourceId unwindowedFilename( - ResourceId outputDirectory, Context context, String extension) { - String filename = String.format( - "%s%s-of-%s%s%s", - prefix, context.getShardNumber(), context.getNumShards(), - extension, suffix); - return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE); + public ResourceId unwindowedFilename(Context context, OutputFileHints outputFileHints) { + String prefix = + baseFilename.isDirectory() ? "" : firstNonNull(baseFilename.getFilename(), ""); + String filename = + String.format( + "%s-%s-of-%s%s%s", + prefix, + context.getShardNumber(), + context.getNumShards(), + outputFileHints.getSuggestedFilenameSuffix(), + suffix); + return baseFilename + .getCurrentDirectory() + .resolve(filename, StandardResolveOptions.RESOLVE_FILE); } } /** * Performs a WriteFiles transform with the desired number of shards. Verifies the WriteFiles * transform calls the appropriate methods on a test sink in the correct order, as well as - * verifies that the elements of a PCollection are written to the sink. If numConfiguredShards - * is not null, also verifies that the output number of shards is correct. + * verifies that the elements of a PCollection are written to the sink. If numConfiguredShards is + * not null, also verifies that the output number of shards is correct. */ private void runShardedWrite( List<String> inputs, PTransform<PCollection<String>, PCollection<String>> transform, String baseName, - WriteFiles<String> write) throws IOException { + WriteFiles<String, ?, String> write) + throws IOException { // Flag to validate that the pipeline options are passed to the Sink WriteOptions options = TestPipeline.testingPipelineOptions().as(WriteOptions.class); options.setTestFlag("test_value"); http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 4393a63..e46b1d3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -32,6 +32,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.ShardedKeyCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; @@ -57,6 +58,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.ShardedKey; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.TypeDescriptor; http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java index edb1e0d..c5c2462 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java @@ -23,8 +23,7 @@ import static com.google.common.base.Preconditions.checkArgument; import com.google.api.services.bigquery.model.TableSchema; import com.google.common.collect.Lists; import java.io.Serializable; -import java.lang.reflect.ParameterizedType; -import java.lang.reflect.Type; +import java.lang.reflect.TypeVariable; import java.util.List; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.CannotProvideCoderException; @@ -32,6 +31,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.ValueInSingleWindow; /** @@ -158,21 +158,16 @@ public abstract class DynamicDestinations<T, DestinationT> implements Serializab } // If dynamicDestinations doesn't provide a coder, try to find it in the coder registry. // We must first use reflection to figure out what the type parameter is. - for (Type superclass = getClass().getGenericSuperclass(); - superclass != null; - superclass = ((Class) superclass).getGenericSuperclass()) { - if (superclass instanceof ParameterizedType) { - ParameterizedType parameterized = (ParameterizedType) superclass; - if (parameterized.getRawType() == DynamicDestinations.class) { - // DestinationT is the second parameter. - Type parameter = parameterized.getActualTypeArguments()[1]; - @SuppressWarnings("unchecked") - Class<DestinationT> parameterClass = (Class<DestinationT>) parameter; - return registry.getCoder(parameterClass); - } - } + TypeDescriptor<?> superDescriptor = + TypeDescriptor.of(getClass()).getSupertype(DynamicDestinations.class); + if (!superDescriptor.getRawType().equals(DynamicDestinations.class)) { + throw new AssertionError( + "Couldn't find the DynamicDestinations superclass of " + this.getClass()); } - throw new AssertionError( - "Couldn't find the DynamicDestinations superclass of " + this.getClass()); + TypeVariable typeVariable = superDescriptor.getTypeParameter("DestinationT"); + @SuppressWarnings("unchecked") + TypeDescriptor<DestinationT> descriptor = + (TypeDescriptor<DestinationT>) superDescriptor.resolveType(typeVariable); + return registry.getCoder(descriptor); } } http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java index 90d41a0..55672ff 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java @@ -23,6 +23,7 @@ import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.ShardedKey; /** * Given a write to a specific table, assign that to one of the http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java deleted file mode 100644 index c2b739f..0000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.io.gcp.bigquery; - -import java.io.Serializable; -import java.util.Objects; - -/** - * A key and a shard number. - */ -class ShardedKey<K> implements Serializable { - private static final long serialVersionUID = 1L; - private final K key; - private final int shardNumber; - - public static <K> ShardedKey<K> of(K key, int shardNumber) { - return new ShardedKey<>(key, shardNumber); - } - - ShardedKey(K key, int shardNumber) { - this.key = key; - this.shardNumber = shardNumber; - } - - public K getKey() { - return key; - } - - public int getShardNumber() { - return shardNumber; - } - - @Override - public String toString() { - return "key: " + key + " shard: " + shardNumber; - } - - @Override - public boolean equals(Object o) { - if (!(o instanceof ShardedKey)) { - return false; - } - ShardedKey<K> other = (ShardedKey<K>) o; - return Objects.equals(key, other.key) && Objects.equals(shardNumber, other.shardNumber); - } - - @Override - public int hashCode() { - return Objects.hash(key, shardNumber); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java deleted file mode 100644 index c2b62b7..0000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.io.gcp.bigquery; - -import com.google.common.annotations.VisibleForTesting; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Arrays; -import java.util.List; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.StructuredCoder; -import org.apache.beam.sdk.coders.VarIntCoder; - - -/** - * A {@link Coder} for {@link ShardedKey}, using a wrapped key {@link Coder}. - */ -@VisibleForTesting -class ShardedKeyCoder<KeyT> - extends StructuredCoder<ShardedKey<KeyT>> { - public static <KeyT> ShardedKeyCoder<KeyT> of(Coder<KeyT> keyCoder) { - return new ShardedKeyCoder<>(keyCoder); - } - - private final Coder<KeyT> keyCoder; - private final VarIntCoder shardNumberCoder; - - protected ShardedKeyCoder(Coder<KeyT> keyCoder) { - this.keyCoder = keyCoder; - this.shardNumberCoder = VarIntCoder.of(); - } - - @Override - public List<? extends Coder<?>> getCoderArguments() { - return Arrays.asList(keyCoder); - } - - @Override - public void encode(ShardedKey<KeyT> key, OutputStream outStream) - throws IOException { - keyCoder.encode(key.getKey(), outStream); - shardNumberCoder.encode(key.getShardNumber(), outStream); - } - - @Override - public ShardedKey<KeyT> decode(InputStream inStream) - throws IOException { - return new ShardedKey<>( - keyCoder.decode(inStream), - shardNumberCoder.decode(inStream)); - } - - @Override - public void verifyDeterministic() throws NonDeterministicException { - keyCoder.verifyDeterministic(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java index 63e5bc1..a210858 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java @@ -33,6 +33,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.ShardedKey; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.ValueInSingleWindow; http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java index 18b2033..fa5b3ce 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.gcp.bigquery; import com.google.api.services.bigquery.model.TableRow; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ShardedKeyCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -29,6 +30,7 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.ShardedKey; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java index cd88222..51b9375 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java @@ -26,6 +26,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.ShardedKey; /** * Fn that tags each table row with a unique id and destination table. To avoid calling http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java index d68779a..e1ed746 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static com.google.common.base.Preconditions.checkNotNull; + import com.google.api.services.bigquery.model.TableRow; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -40,6 +41,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.ShardedKey; import org.apache.beam.sdk.values.TupleTag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java index 45dc2a8..887cb93 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java @@ -22,6 +22,7 @@ import com.google.api.services.bigquery.model.TableRow; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.ShardedKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java index acd1132..451d1bd 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java @@ -26,6 +26,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.ShardedKey; import org.apache.beam.sdk.values.TupleTag; /** http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index c5494d8..9ed2916 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -42,6 +42,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.ShardedKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index bfd260a..d31f3a0 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -82,6 +82,7 @@ import org.apache.beam.sdk.coders.Coder.Context; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ShardedKeyCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.io.BoundedSource; @@ -131,6 +132,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PCollectionViews; +import org.apache.beam.sdk.values.ShardedKey; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.ValueInSingleWindow; http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java index 7255a94..442fba5 100644 --- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java +++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java @@ -36,6 +36,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -521,7 +522,8 @@ public class XmlIO { @Override public PDone expand(PCollection<T> input) { - return input.apply(org.apache.beam.sdk.io.WriteFiles.to(createSink())); + return input.apply( + org.apache.beam.sdk.io.WriteFiles.to(createSink(), SerializableFunctions.<T>identity())); } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java index 6ae83f2..74e0bda 100644 --- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java +++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java @@ -25,6 +25,7 @@ import javax.xml.bind.JAXBContext; import javax.xml.bind.Marshaller; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.DefaultFilenamePolicy; +import org.apache.beam.sdk.io.DynamicFileDestinations; import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.ShardNameTemplate; import org.apache.beam.sdk.io.fs.ResourceId; @@ -34,18 +35,18 @@ import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.MimeTypes; /** Implementation of {@link XmlIO#write}. */ -class XmlSink<T> extends FileBasedSink<T> { +class XmlSink<T> extends FileBasedSink<T, Void> { private static final String XML_EXTENSION = ".xml"; private final XmlIO.Write<T> spec; - private static DefaultFilenamePolicy makeFilenamePolicy(XmlIO.Write<?> spec) { - return DefaultFilenamePolicy.constructUsingStandardParameters( + private static <T> DefaultFilenamePolicy makeFilenamePolicy(XmlIO.Write<T> spec) { + return DefaultFilenamePolicy.fromStandardParameters( spec.getFilenamePrefix(), ShardNameTemplate.INDEX_OF_MAX, XML_EXTENSION, false); } XmlSink(XmlIO.Write<T> spec) { - super(spec.getFilenamePrefix(), makeFilenamePolicy(spec)); + super(spec.getFilenamePrefix(), DynamicFileDestinations.constant(makeFilenamePolicy(spec))); this.spec = spec; } @@ -75,10 +76,8 @@ class XmlSink<T> extends FileBasedSink<T> { super.populateDisplayData(builder); } - /** - * {@link WriteOperation} for XML {@link FileBasedSink}s. - */ - protected static final class XmlWriteOperation<T> extends WriteOperation<T> { + /** {@link WriteOperation} for XML {@link FileBasedSink}s. */ + protected static final class XmlWriteOperation<T> extends WriteOperation<T, Void> { public XmlWriteOperation(XmlSink<T> sink) { super(sink); } @@ -112,10 +111,8 @@ class XmlSink<T> extends FileBasedSink<T> { } } - /** - * A {@link Writer} that can write objects as XML elements. - */ - protected static final class XmlWriter<T> extends Writer<T> { + /** A {@link Writer} that can write objects as XML elements. */ + protected static final class XmlWriter<T> extends Writer<T, Void> { final Marshaller marshaller; private OutputStream os = null; http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java index aa0c1c3..d1584dc 100644 --- a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java +++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java @@ -197,8 +197,8 @@ public class XmlSinkTest { .withRecordClass(Integer.class); DisplayData displayData = DisplayData.from(write); - - assertThat(displayData, hasDisplayItem("filenamePattern", "file-SSSSS-of-NNNNN.xml")); + assertThat( + displayData, hasDisplayItem("filenamePattern", "/path/to/file-SSSSS-of-NNNNN" + ".xml")); assertThat(displayData, hasDisplayItem("rootElement", "bird")); assertThat(displayData, hasDisplayItem("recordClass", Integer.class)); }